Package logilab-common-0 :: Package 39 :: Package 0 :: Module testlib
[frames] | no frames]

Source Code for Module logilab-common-0.39.0.testlib

   1  # -*- coding: utf-8 -*- 
   2  """Run tests. 
   3   
   4  This will find all modules whose name match a given prefix in the test 
   5  directory, and run them. Various command line options provide 
   6  additional facilities. 
   7   
   8  Command line options: 
   9   
  10   -v: verbose -- run tests in verbose mode with output to stdout 
  11   -q: quiet   -- don't print anything except if a test fails 
  12   -t: testdir -- directory where the tests will be found 
  13   -x: exclude -- add a test to exclude 
  14   -p: profile -- profiled execution 
  15   -c: capture -- capture standard out/err during tests 
  16   -d: dbc     -- enable design-by-contract 
  17   -m: match   -- only run test matching the tag pattern which follow 
  18   
  19  If no non-option arguments are present, prefixes used are 'test', 
  20  'regrtest', 'smoketest' and 'unittest'. 
  21   
  22  :copyright: 2003-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved. 
  23  :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr 
  24  :license: General Public License version 2 - http://www.gnu.org/licenses 
  25  """ 
  26  __docformat__ = "restructuredtext en" 
  27  # modified copy of some functions from test/regrtest.py from PyXml 
  28  # disable camel case warning 
  29  # pylint: disable-msg=C0103 
  30   
  31  import sys 
  32  import os, os.path as osp 
  33  import re 
  34  import time 
  35  import getopt 
  36  import traceback 
  37  import inspect 
  38  import unittest 
  39  import difflib 
  40  import types 
  41  import tempfile 
  42  import math 
  43  from shutil import rmtree 
  44  from operator import itemgetter 
  45  from warnings import warn 
  46  from compiler.consts import CO_GENERATOR 
  47  from ConfigParser import ConfigParser 
  48   
  49  try: 
  50      from test import test_support 
  51  except ImportError: 
  52      # not always available 
53 - class TestSupport:
54 - def unload(self, test):
55 pass
56 test_support = TestSupport() 57 58 from logilab.common.deprecation import class_renamed, deprecated_function, \ 59 obsolete 60 # pylint: disable-msg=W0622 61 from logilab.common.compat import set, enumerate, any, sorted 62 # pylint: enable-msg=W0622 63 from logilab.common.modutils import load_module_from_name 64 from logilab.common.debugger import Debugger, colorize_source 65 from logilab.common.decorators import cached 66 from logilab.common import textutils 67 68 69 __all__ = ['main', 'unittest_main', 'find_tests', 'run_test', 'spawn'] 70 71 DEFAULT_PREFIXES = ('test', 'regrtest', 'smoketest', 'unittest', 72 'func', 'validation') 73 74 ENABLE_DBC = False 75 76 FILE_RESTART = ".pytest.restart" 77 78 # used by unittest to count the number of relevant levels in the traceback 79 __unittest = 1 80 81
82 -def with_tempdir(callable):
83 """A decorator ensuring no temporary file left when the function return 84 Work only for temporary file create with the tempfile module""" 85 def proxy(*args, **kargs): 86 87 old_tmpdir = tempfile.gettempdir() 88 new_tmpdir = tempfile.mkdtemp("-logilab-common-testlib","temp-dir-") 89 tempfile.tempdir = new_tmpdir 90 try: 91 return callable(*args, **kargs) 92 finally: 93 try: 94 rmtree(new_tmpdir, ignore_errors=True) 95 finally: 96 tempfile.tempdir = old_tmpdir
97 return proxy 98 99
100 -def main(testdir=None, exitafter=True):
101 """Execute a test suite. 102 103 This also parses command-line options and modifies its behaviour 104 accordingly. 105 106 tests -- a list of strings containing test names (optional) 107 testdir -- the directory in which to look for tests (optional) 108 109 Users other than the Python test suite will certainly want to 110 specify testdir; if it's omitted, the directory containing the 111 Python test suite is searched for. 112 113 If the tests argument is omitted, the tests listed on the 114 command-line will be used. If that's empty, too, then all *.py 115 files beginning with test_ will be used. 116 117 """ 118 119 try: 120 opts, args = getopt.getopt(sys.argv[1:], 'hvqxr:t:pcd', ['help']) 121 except getopt.error, msg: 122 print msg 123 print __doc__ 124 return 2 125 verbose = 0 126 quiet = False 127 profile = False 128 exclude = [] 129 capture = 0 130 for o, a in opts: 131 if o == '-v': 132 verbose += 1 133 elif o == '-q': 134 quiet = True 135 verbose = 0 136 elif o == '-x': 137 exclude.append(a) 138 elif o == '-t': 139 testdir = a 140 elif o == '-p': 141 profile = True 142 elif o == '-c': 143 capture += 1 144 elif o == '-d': 145 global ENABLE_DBC 146 ENABLE_DBC = True 147 elif o in ('-h', '--help'): 148 print __doc__ 149 sys.exit(0) 150 151 args = [item.rstrip('.py') for item in args] 152 exclude = [item.rstrip('.py') for item in exclude] 153 154 if testdir is not None: 155 os.chdir(testdir) 156 sys.path.insert(0, '') 157 tests = find_tests('.', args or DEFAULT_PREFIXES, excludes=exclude) 158 # Tell tests to be moderately quiet 159 test_support.verbose = verbose 160 if profile: 161 print >> sys.stderr, '** profiled run' 162 from hotshot import Profile 163 prof = Profile('stones.prof') 164 start_time, start_ctime = time.time(), time.clock() 165 good, bad, skipped, all_result = prof.runcall(run_tests, tests, quiet, 166 verbose, None, capture) 167 end_time, end_ctime = time.time(), time.clock() 168 prof.close() 169 else: 170 start_time, start_ctime = time.time(), time.clock() 171 good, bad, skipped, all_result = run_tests(tests, quiet, verbose, None, 172 capture) 173 end_time, end_ctime = time.time(), time.clock() 174 if not quiet: 175 print '*'*80 176 if all_result: 177 print 'Ran %s test cases in %0.2fs (%0.2fs CPU)' % ( 178 all_result.testsRun, end_time - start_time, 179 end_ctime - start_ctime), 180 if all_result.errors: 181 print ', %s errors' % len(all_result.errors), 182 if all_result.failures: 183 print ', %s failed' % len(all_result.failures), 184 if all_result.skipped: 185 print ', %s skipped' % len(all_result.skipped), 186 print 187 if good: 188 if not bad and not skipped and len(good) > 1: 189 print "All", 190 print _count(len(good), "test"), "OK." 191 if bad: 192 print _count(len(bad), "test"), "failed:", 193 print ', '.join(bad) 194 if skipped: 195 print _count(len(skipped), "test"), "skipped:", 196 print ', '.join(['%s (%s)' % (test, msg) for test, msg in skipped]) 197 if profile: 198 from hotshot import stats 199 stats = stats.load('stones.prof') 200 stats.sort_stats('time', 'calls') 201 stats.print_stats(30) 202 if exitafter: 203 sys.exit(len(bad) + len(skipped)) 204 else: 205 sys.path.pop(0) 206 return len(bad)
207 main = obsolete("testlib.main() is obsolete, use the pytest tool instead")(main) 208 209
210 -def run_tests(tests, quiet, verbose, runner=None, capture=0):
211 """Execute a list of tests. 212 213 :rtype: tuple 214 :return: tuple (list of passed tests, list of failed tests, list of skipped tests) 215 """ 216 good = [] 217 bad = [] 218 skipped = [] 219 all_result = None 220 for test in tests: 221 if not quiet: 222 print 223 print '-'*80 224 print "Executing", test 225 result = run_test(test, verbose, runner, capture) 226 if type(result) is type(''): 227 # an unexpected error occured 228 skipped.append( (test, result)) 229 else: 230 if all_result is None: 231 all_result = result 232 else: 233 all_result.testsRun += result.testsRun 234 all_result.failures += result.failures 235 all_result.errors += result.errors 236 all_result.skipped += result.skipped 237 if result.errors or result.failures: 238 bad.append(test) 239 if verbose: 240 print "test", test, \ 241 "failed -- %s errors, %s failures" % ( 242 len(result.errors), len(result.failures)) 243 else: 244 good.append(test) 245 246 return good, bad, skipped, all_result
247
248 -def find_tests(testdir, 249 prefixes=DEFAULT_PREFIXES, suffix=".py", 250 excludes=(), 251 remove_suffix=True):
252 """ 253 Return a list of all applicable test modules. 254 """ 255 tests = [] 256 for name in os.listdir(testdir): 257 if not suffix or name.endswith(suffix): 258 for prefix in prefixes: 259 if name.startswith(prefix): 260 if remove_suffix and name.endswith(suffix): 261 name = name[:-len(suffix)] 262 if name not in excludes: 263 tests.append(name) 264 tests.sort() 265 return tests
266 267
268 -def run_test(test, verbose, runner=None, capture=0):
269 """ 270 Run a single test. 271 272 test -- the name of the test 273 verbose -- if true, print more messages 274 """ 275 test_support.unload(test) 276 try: 277 m = load_module_from_name(test, path=sys.path) 278 # m = __import__(test, globals(), locals(), sys.path) 279 try: 280 suite = m.suite 281 if callable(suite): 282 suite = suite() 283 except AttributeError: 284 loader = unittest.TestLoader() 285 suite = loader.loadTestsFromModule(m) 286 if runner is None: 287 runner = SkipAwareTextTestRunner(capture=capture) # verbosity=0) 288 return runner.run(suite) 289 except KeyboardInterrupt, v: 290 raise KeyboardInterrupt, v, sys.exc_info()[2] 291 except: 292 # raise 293 type, value = sys.exc_info()[:2] 294 msg = "test %s crashed -- %s : %s" % (test, type, value) 295 if verbose: 296 traceback.print_exc() 297 return msg
298
299 -def _count(n, word):
300 """format word according to n""" 301 if n == 1: 302 return "%d %s" % (n, word) 303 else: 304 return "%d %ss" % (n, word)
305 306 307 308 309 ## PostMortem Debug facilities #####
310 -def start_interactive_mode(result):
311 """starts an interactive shell so that the user can inspect errors 312 """ 313 debuggers = result.debuggers 314 descrs = result.error_descrs + result.fail_descrs 315 if len(debuggers) == 1: 316 # don't ask for test name if there's only one failure 317 debuggers[0].start() 318 else: 319 while True: 320 testindex = 0 321 print "Choose a test to debug:" 322 # order debuggers in the same way than errors were printed 323 print "\n".join(['\t%s : %s' % (i, descr) for i, (_, descr) 324 in enumerate(descrs)]) 325 print "Type 'exit' (or ^D) to quit" 326 print 327 try: 328 todebug = raw_input('Enter a test name: ') 329 if todebug.strip().lower() == 'exit': 330 print 331 break 332 else: 333 try: 334 testindex = int(todebug) 335 debugger = debuggers[descrs[testindex][0]] 336 except (ValueError, IndexError): 337 print "ERROR: invalid test number %r" % (todebug, ) 338 else: 339 debugger.start() 340 except (EOFError, KeyboardInterrupt): 341 print 342 break
343 344 345 # test utils ################################################################## 346 from cStringIO import StringIO 347
348 -class SkipAwareTestResult(unittest._TextTestResult):
349
350 - def __init__(self, stream, descriptions, verbosity, 351 exitfirst=False, capture=0, printonly=None, 352 pdbmode=False, cvg=None, colorize=False):
353 super(SkipAwareTestResult, self).__init__(stream, 354 descriptions, verbosity) 355 self.skipped = [] 356 self.debuggers = [] 357 self.fail_descrs = [] 358 self.error_descrs = [] 359 self.exitfirst = exitfirst 360 self.capture = capture 361 self.printonly = printonly 362 self.pdbmode = pdbmode 363 self.cvg = cvg 364 self.colorize = colorize 365 self.pdbclass = Debugger 366 self.verbose = verbosity > 1
367
368 - def descrs_for(self, flavour):
369 return getattr(self, '%s_descrs' % flavour.lower())
370
371 - def _create_pdb(self, test_descr, flavour):
372 self.descrs_for(flavour).append( (len(self.debuggers), test_descr) ) 373 if self.pdbmode: 374 self.debuggers.append(self.pdbclass(sys.exc_info()[2]))
375
376 - def _exc_info_to_string(self, err, test):
377 """Converts a sys.exc_info()-style tuple of values into a string. 378 379 This method is overridden here because we want to colorize 380 lines if --color is passed, and display local variables if 381 --verbose is passed 382 """ 383 exctype, exc, tb = err 384 output = ['Traceback (most recent call last)'] 385 frames = inspect.getinnerframes(tb) 386 colorize = self.colorize 387 # count number of relevant levels in the traceback: skip first since 388 # it's our own _proceed function, and then start counting, using 389 # unittest's heuristic 390 nb_frames_skipped = self._count_relevant_tb_levels(tb.tb_next) 391 for index, (frame, filename, lineno, funcname, ctx, ctxindex) in enumerate(frames): 392 if not (0 < index <= nb_frames_skipped): 393 continue 394 filename = osp.abspath(filename) 395 if ctx is None: # pyc files or C extensions for instance 396 source = '<no source available>' 397 else: 398 source = ''.join(ctx) 399 if colorize: 400 filename = textutils.colorize_ansi(filename, 'magenta') 401 source = colorize_source(source) 402 output.append(' File "%s", line %s, in %s' % (filename, lineno, funcname)) 403 output.append(' %s' % source.strip()) 404 if self.verbose: 405 output.append('%r == %r' % (dir(frame), test.__module__)) 406 output.append('') 407 output.append(' ' + ' local variables '.center(66, '-')) 408 for varname, value in sorted(frame.f_locals.items()): 409 output.append(' %s: %r' % (varname, value)) 410 if varname == 'self': # special handy processing for self 411 for varname, value in sorted(vars(value).items()): 412 output.append(' self.%s: %r' % (varname, value)) 413 output.append(' ' + '-' * 66) 414 output.append('') 415 output.append(''.join(traceback.format_exception_only(exctype, exc))) 416 return '\n'.join(output)
417
418 - def addError(self, test, err):
419 """err == (exc_type, exc, tcbk)""" 420 exc_type, exc, _ = err # 421 if exc_type == TestSkipped: 422 self.addSkipped(test, exc) 423 else: 424 if self.exitfirst: 425 self.shouldStop = True 426 descr = self.getDescription(test) 427 super(SkipAwareTestResult, self).addError(test, err) 428 self._create_pdb(descr, 'error')
429
430 - def addFailure(self, test, err):
431 if self.exitfirst: 432 self.shouldStop = True 433 descr = self.getDescription(test) 434 super(SkipAwareTestResult, self).addFailure(test, err) 435 self._create_pdb(descr, 'fail')
436
437 - def addSkipped(self, test, reason):
438 self.skipped.append((test, self.getDescription(test), reason)) 439 if self.showAll: 440 self.stream.writeln("SKIPPED") 441 elif self.dots: 442 self.stream.write('S')
443
444 - def printErrors(self):
445 super(SkipAwareTestResult, self).printErrors() 446 self.printSkippedList()
447
448 - def printSkippedList(self):
449 for _, descr, err in self.skipped: # test, descr, err 450 self.stream.writeln(self.separator1) 451 self.stream.writeln("%s: %s" % ('SKIPPED', descr)) 452 self.stream.writeln("\t%s" % err)
453
454 - def printErrorList(self, flavour, errors):
455 for (_, descr), (test, err) in zip(self.descrs_for(flavour), errors): 456 self.stream.writeln(self.separator1) 457 if self.colorize: 458 self.stream.writeln("%s: %s" % ( 459 textutils.colorize_ansi(flavour, color='red'), descr)) 460 else: 461 self.stream.writeln("%s: %s" % (flavour, descr)) 462 463 self.stream.writeln(self.separator2) 464 self.stream.writeln(err) 465 466 try: 467 output, errput = test.captured_output() 468 except AttributeError: 469 pass # original unittest 470 else: 471 if output: 472 self.stream.writeln(self.separator2) 473 self.stream.writeln("captured stdout".center( 474 len(self.separator2))) 475 self.stream.writeln(self.separator2) 476 self.stream.writeln(output) 477 else: 478 self.stream.writeln('no stdout'.center( 479 len(self.separator2))) 480 if errput: 481 self.stream.writeln(self.separator2) 482 self.stream.writeln("captured stderr".center( 483 len(self.separator2))) 484 self.stream.writeln(self.separator2) 485 self.stream.writeln(errput) 486 else: 487 self.stream.writeln('no stderr'.center( 488 len(self.separator2)))
489 490
491 -def run(self, result, runcondition=None, options=None):
492 for test in self._tests: 493 if result.shouldStop: 494 break 495 try: 496 test(result, runcondition, options) 497 except TypeError: 498 # this might happen if a raw unittest.TestCase is defined 499 # and used with python (and not pytest) 500 warn("%s should extend lgc.testlib.TestCase instead of unittest.TestCase" 501 % test) 502 test(result) 503 return result
504 unittest.TestSuite.run = run 505 506 # backward compatibility: TestSuite might be imported from lgc.testlib 507 TestSuite = unittest.TestSuite 508 509 # python2.3 compat
510 -def __call__(self, *args, **kwds):
511 return self.run(*args, **kwds)
512 unittest.TestSuite.__call__ = __call__ 513 514
515 -class SkipAwareTextTestRunner(unittest.TextTestRunner):
516
517 - def __init__(self, stream=sys.stderr, verbosity=1, 518 exitfirst=False, capture=False, printonly=None, 519 pdbmode=False, cvg=None, test_pattern=None, 520 skipped_patterns=(), colorize=False, options=None):
521 super(SkipAwareTextTestRunner, self).__init__(stream=stream, 522 verbosity=verbosity) 523 self.exitfirst = exitfirst 524 self.capture = capture 525 self.printonly = printonly 526 self.pdbmode = pdbmode 527 self.cvg = cvg 528 self.test_pattern = test_pattern 529 self.skipped_patterns = skipped_patterns 530 self.colorize = colorize 531 self.options = options
532
533 - def _this_is_skipped(self, testedname):
534 return any([(pat in testedname) for pat in self.skipped_patterns])
535
536 - def _runcondition(self, test, skipgenerator=True):
537 if isinstance(test, InnerTest): 538 testname = test.name 539 else: 540 if isinstance(test, TestCase): 541 meth = test._get_test_method() 542 func = meth.im_func 543 testname = '%s.%s' % (meth.im_class.__name__, func.__name__) 544 elif isinstance(test, types.FunctionType): 545 func = test 546 testname = func.__name__ 547 elif isinstance(test, types.MethodType): 548 func = test.im_func 549 testname = '%s.%s' % (test.im_class.__name__, func.__name__) 550 else: 551 return True # Not sure when this happens 552 553 if is_generator(func) and skipgenerator: 554 return self.does_match_tags(func) # Let inner tests decide at run time 555 556 # print 'testname', testname, self.test_pattern 557 if self._this_is_skipped(testname): 558 return False # this was explicitly skipped 559 if self.test_pattern is not None: 560 try: 561 classpattern, testpattern = self.test_pattern.split('.') 562 klass, name = testname.split('.') 563 if classpattern not in klass or testpattern not in name: 564 return False 565 except ValueError: 566 if self.test_pattern not in testname: 567 return False 568 569 return self.does_match_tags(test)
570
571 - def does_match_tags(self, test):
572 if self.options is not None: 573 tags_pattern = getattr(self.options, 'tags_pattern', None) 574 if tags_pattern is not None: 575 tags = getattr(test, 'tags', Tags()) 576 return tags.match(tags_pattern) 577 return True # no pattern
578
579 - def _makeResult(self):
580 return SkipAwareTestResult(self.stream, self.descriptions, 581 self.verbosity, self.exitfirst, self.capture, 582 self.printonly, self.pdbmode, self.cvg, 583 self.colorize)
584
585 - def run(self, test):
586 "Run the given test case or test suite." 587 result = self._makeResult() 588 startTime = time.time() 589 test(result, self._runcondition, self.options) 590 stopTime = time.time() 591 timeTaken = stopTime - startTime 592 result.printErrors() 593 self.stream.writeln(result.separator2) 594 run = result.testsRun 595 self.stream.writeln("Ran %d test%s in %.3fs" % 596 (run, run != 1 and "s" or "", timeTaken)) 597 self.stream.writeln() 598 if not result.wasSuccessful(): 599 if self.colorize: 600 self.stream.write(textutils.colorize_ansi("FAILED", color='red')) 601 else: 602 self.stream.write("FAILED") 603 else: 604 if self.colorize: 605 self.stream.write(textutils.colorize_ansi("OK", color='green')) 606 else: 607 self.stream.write("OK") 608 failed, errored, skipped = map(len, (result.failures, result.errors, 609 result.skipped)) 610 611 det_results = [] 612 for name, value in (("failures", result.failures), 613 ("errors",result.errors), 614 ("skipped", result.skipped)): 615 if value: 616 det_results.append("%s=%i" % (name, len(value))) 617 if det_results: 618 self.stream.write(" (") 619 self.stream.write(', '.join(det_results)) 620 self.stream.write(")") 621 self.stream.writeln("") 622 return result
623 624
625 -class keywords(dict):
626 """Keyword args (**kwargs) support for generative tests."""
627
628 -class starargs(tuple):
629 """Variable arguments (*args) for generative tests."""
630 - def __new__(cls, *args):
631 return tuple.__new__(cls, args)
632 633 634
635 -class NonStrictTestLoader(unittest.TestLoader):
636 """ 637 Overrides default testloader to be able to omit classname when 638 specifying tests to run on command line. 639 640 For example, if the file test_foo.py contains :: 641 642 class FooTC(TestCase): 643 def test_foo1(self): # ... 644 def test_foo2(self): # ... 645 def test_bar1(self): # ... 646 647 class BarTC(TestCase): 648 def test_bar2(self): # ... 649 650 'python test_foo.py' will run the 3 tests in FooTC 651 'python test_foo.py FooTC' will run the 3 tests in FooTC 652 'python test_foo.py test_foo' will run test_foo1 and test_foo2 653 'python test_foo.py test_foo1' will run test_foo1 654 'python test_foo.py test_bar' will run FooTC.test_bar1 and BarTC.test_bar2 655 """ 656
657 - def __init__(self):
658 self.skipped_patterns = []
659
660 - def loadTestsFromNames(self, names, module=None):
661 suites = [] 662 for name in names: 663 suites.extend(self.loadTestsFromName(name, module)) 664 return self.suiteClass(suites)
665
666 - def _collect_tests(self, module):
667 tests = {} 668 for obj in vars(module).values(): 669 if (issubclass(type(obj), (types.ClassType, type)) and 670 issubclass(obj, unittest.TestCase)): 671 classname = obj.__name__ 672 if classname[0] == '_' or self._this_is_skipped(classname): 673 continue 674 methodnames = [] 675 # obj is a TestCase class 676 for attrname in dir(obj): 677 if attrname.startswith(self.testMethodPrefix): 678 attr = getattr(obj, attrname) 679 if callable(attr): 680 methodnames.append(attrname) 681 # keep track of class (obj) for convenience 682 tests[classname] = (obj, methodnames) 683 return tests
684
685 - def loadTestsFromSuite(self, module, suitename):
686 try: 687 suite = getattr(module, suitename)() 688 except AttributeError: 689 return [] 690 assert hasattr(suite, '_tests'), \ 691 "%s.%s is not a valid TestSuite" % (module.__name__, suitename) 692 # python2.3 does not implement __iter__ on suites, we need to return 693 # _tests explicitly 694 return suite._tests
695
696 - def loadTestsFromName(self, name, module=None):
697 parts = name.split('.') 698 if module is None or len(parts) > 2: 699 # let the base class do its job here 700 return [super(NonStrictTestLoader, self).loadTestsFromName(name)] 701 tests = self._collect_tests(module) 702 # import pprint 703 # pprint.pprint(tests) 704 collected = [] 705 if len(parts) == 1: 706 pattern = parts[0] 707 if callable(getattr(module, pattern, None) 708 ) and pattern not in tests: 709 # consider it as a suite 710 return self.loadTestsFromSuite(module, pattern) 711 if pattern in tests: 712 # case python unittest_foo.py MyTestTC 713 klass, methodnames = tests[pattern] 714 for methodname in methodnames: 715 collected = [klass(methodname) 716 for methodname in methodnames] 717 else: 718 # case python unittest_foo.py something 719 for klass, methodnames in tests.values(): 720 collected += [klass(methodname) 721 for methodname in methodnames] 722 elif len(parts) == 2: 723 # case "MyClass.test_1" 724 classname, pattern = parts 725 klass, methodnames = tests.get(classname, (None, [])) 726 for methodname in methodnames: 727 collected = [klass(methodname) for methodname in methodnames] 728 return collected
729
730 - def _this_is_skipped(self, testedname):
731 return any([(pat in testedname) for pat in self.skipped_patterns])
732
733 - def getTestCaseNames(self, testCaseClass):
734 """Return a sorted sequence of method names found within testCaseClass 735 """ 736 is_skipped = self._this_is_skipped 737 classname = testCaseClass.__name__ 738 if classname[0] == '_' or is_skipped(classname): 739 return [] 740 testnames = super(NonStrictTestLoader, self).getTestCaseNames( 741 testCaseClass) 742 return [testname for testname in testnames if not is_skipped(testname)]
743 744
745 -class SkipAwareTestProgram(unittest.TestProgram):
746 # XXX: don't try to stay close to unittest.py, use optparse 747 USAGE = """\ 748 Usage: %(progName)s [options] [test] [...] 749 750 Options: 751 -h, --help Show this message 752 -v, --verbose Verbose output 753 -i, --pdb Enable test failure inspection 754 -x, --exitfirst Exit on first failure 755 -c, --capture Captures and prints standard out/err only on errors 756 -p, --printonly Only prints lines matching specified pattern 757 (implies capture) 758 -s, --skip skip test matching this pattern (no regexp for now) 759 -q, --quiet Minimal output 760 --color colorize tracebacks 761 762 -m, --match Run only test whose tag match this pattern 763 764 -P, --profile FILE: Run the tests using cProfile and saving results 765 in FILE 766 767 Examples: 768 %(progName)s - run default set of tests 769 %(progName)s MyTestSuite - run suite 'MyTestSuite' 770 %(progName)s MyTestCase.testSomething - run MyTestCase.testSomething 771 %(progName)s MyTestCase - run all 'test*' test methods 772 in MyTestCase 773 """
774 - def __init__(self, module='__main__', defaultTest=None, batchmode=False, 775 cvg=None, options=None, outstream=sys.stderr):
776 self.batchmode = batchmode 777 self.cvg = cvg 778 self.options = options 779 self.outstream = outstream 780 super(SkipAwareTestProgram, self).__init__( 781 module=module, defaultTest=defaultTest, 782 testLoader=NonStrictTestLoader())
783
784 - def parseArgs(self, argv):
785 self.pdbmode = False 786 self.exitfirst = False 787 self.capture = 0 788 self.printonly = None 789 self.skipped_patterns = [] 790 self.test_pattern = None 791 self.tags_pattern = None 792 self.colorize = False 793 self.profile_name = None 794 import getopt 795 try: 796 options, args = getopt.getopt(argv[1:], 'hHvixrqcp:s:m:P:', 797 ['help', 'verbose', 'quiet', 'pdb', 798 'exitfirst', 'restart', 'capture', 'printonly=', 799 'skip=', 'color', 'match=', 'profile=']) 800 for opt, value in options: 801 if opt in ('-h', '-H', '--help'): 802 self.usageExit() 803 if opt in ('-i', '--pdb'): 804 self.pdbmode = True 805 if opt in ('-x', '--exitfirst'): 806 self.exitfirst = True 807 if opt in ('-r', '--restart'): 808 self.restart = True 809 self.exitfirst = True 810 if opt in ('-q', '--quiet'): 811 self.verbosity = 0 812 if opt in ('-v', '--verbose'): 813 self.verbosity = 2 814 if opt in ('-c', '--capture'): 815 self.capture += 1 816 if opt in ('-p', '--printonly'): 817 self.printonly = re.compile(value) 818 if opt in ('-s', '--skip'): 819 self.skipped_patterns = [pat.strip() for pat in 820 value.split(', ')] 821 if opt == '--color': 822 self.colorize = True 823 if opt in ('-m', '--match'): 824 #self.tags_pattern = value 825 self.options["tag_pattern"] = value 826 if opt in ('-P', '--profile'): 827 self.profile_name = value 828 self.testLoader.skipped_patterns = self.skipped_patterns 829 if self.printonly is not None: 830 self.capture += 1 831 if len(args) == 0 and self.defaultTest is None: 832 suitefunc = getattr(self.module, 'suite', None) 833 if isinstance(suitefunc, (types.FunctionType, 834 types.MethodType)): 835 self.test = self.module.suite() 836 else: 837 self.test = self.testLoader.loadTestsFromModule(self.module) 838 return 839 if len(args) > 0: 840 self.test_pattern = args[0] 841 self.testNames = args 842 else: 843 self.testNames = (self.defaultTest, ) 844 self.createTests() 845 except getopt.error, msg: 846 self.usageExit(msg)
847 848
849 - def runTests(self):
850 if self.profile_name: 851 import cProfile 852 cProfile.runctx('self._runTests()', globals(), locals(), self.profile_name ) 853 else: 854 return self._runTests()
855
856 - def _runTests(self):
857 if hasattr(self.module, 'setup_module'): 858 try: 859 self.module.setup_module(self.options) 860 except Exception, exc: 861 print 'setup_module error:', exc 862 sys.exit(1) 863 self.testRunner = SkipAwareTextTestRunner(verbosity=self.verbosity, 864 stream=self.outstream, 865 exitfirst=self.exitfirst, 866 capture=self.capture, 867 printonly=self.printonly, 868 pdbmode=self.pdbmode, 869 cvg=self.cvg, 870 test_pattern=self.test_pattern, 871 skipped_patterns=self.skipped_patterns, 872 colorize=self.colorize, 873 options=self.options) 874 875 def removeSucceededTests(obj, succTests): 876 """ Recurcive function that removes succTests from 877 a TestSuite or TestCase 878 """ 879 if isinstance(obj, TestSuite): 880 removeSucceededTests(obj._tests, succTests) 881 if isinstance(obj, list): 882 for el in obj[:]: 883 if isinstance(el, TestSuite): 884 removeSucceededTests(el, succTests) 885 elif isinstance(el, TestCase): 886 descr = '.'.join((el.__class__.__module__, 887 el.__class__.__name__, 888 el._testMethodName)) 889 if descr in succTests: 890 obj.remove(el)
891 # take care, self.options may be None 892 if getattr(self.options, 'restart', False): 893 # retrieve succeeded tests from FILE_RESTART 894 try: 895 restartfile = open(FILE_RESTART, 'r') 896 try: 897 try: 898 succeededtests = list(elem.rstrip('\n\r') for elem in 899 restartfile.readlines()) 900 removeSucceededTests(self.test, succeededtests) 901 except Exception, e: 902 raise e 903 finally: 904 restartfile.close() 905 except Exception ,e: 906 raise "Error while reading \ 907 succeeded tests into", osp.join(os.getcwd(),FILE_RESTART) 908 909 result = self.testRunner.run(self.test) 910 if hasattr(self.module, 'teardown_module'): 911 try: 912 self.module.teardown_module(self.options, result) 913 except Exception, exc: 914 print 'teardown_module error:', exc 915 sys.exit(1) 916 if os.environ.get('PYDEBUG'): 917 warn("PYDEBUG usage is deprecated, use -i / --pdb instead", 918 DeprecationWarning) 919 self.pdbmode = True 920 if result.debuggers and self.pdbmode: 921 start_interactive_mode(result) 922 if not self.batchmode: 923 sys.exit(not result.wasSuccessful()) 924 self.result = result
925 926 927 928
929 -class FDCapture:
930 """adapted from py lib (http://codespeak.net/py) 931 Capture IO to/from a given os-level filedescriptor. 932 """
933 - def __init__(self, fd, attr='stdout', printonly=None):
934 self.targetfd = fd 935 self.tmpfile = os.tmpfile() # self.maketempfile() 936 self.printonly = printonly 937 # save original file descriptor 938 self._savefd = os.dup(fd) 939 # override original file descriptor 940 os.dup2(self.tmpfile.fileno(), fd) 941 # also modify sys module directly 942 self.oldval = getattr(sys, attr) 943 setattr(sys, attr, self) # self.tmpfile) 944 self.attr = attr
945
946 - def write(self, msg):
947 # msg might be composed of several lines 948 for line in msg.splitlines(): 949 line += '\n' # keepdend=True is not enough 950 if self.printonly is None or self.printonly.search(line) is None: 951 self.tmpfile.write(line) 952 else: 953 os.write(self._savefd, line)
954 955 ## def maketempfile(self): 956 ## tmpf = os.tmpfile() 957 ## fd = os.dup(tmpf.fileno()) 958 ## newf = os.fdopen(fd, tmpf.mode, 0) # No buffering 959 ## tmpf.close() 960 ## return newf 961
962 - def restore(self):
963 """restore original fd and returns captured output""" 964 #XXX: hack hack hack 965 self.tmpfile.flush() 966 try: 967 ref_file = getattr(sys, '__%s__' % self.attr) 968 ref_file.flush() 969 except AttributeError: 970 pass 971 if hasattr(self.oldval, 'flush'): 972 self.oldval.flush() 973 # restore original file descriptor 974 os.dup2(self._savefd, self.targetfd) 975 # restore sys module 976 setattr(sys, self.attr, self.oldval) 977 # close backup descriptor 978 os.close(self._savefd) 979 # go to beginning of file and read it 980 self.tmpfile.seek(0) 981 return self.tmpfile.read()
982 983
984 -def _capture(which='stdout', printonly=None):
985 """private method, should not be called directly 986 (cf. capture_stdout() and capture_stderr()) 987 """ 988 assert which in ('stdout', 'stderr' 989 ), "Can only capture stdout or stderr, not %s" % which 990 if which == 'stdout': 991 fd = 1 992 else: 993 fd = 2 994 return FDCapture(fd, which, printonly)
995
996 -def capture_stdout(printonly=None):
997 """captures the standard output 998 999 returns a handle object which has a `restore()` method. 1000 The restore() method returns the captured stdout and restores it 1001 """ 1002 return _capture('stdout', printonly)
1003
1004 -def capture_stderr(printonly=None):
1005 """captures the standard error output 1006 1007 returns a handle object which has a `restore()` method. 1008 The restore() method returns the captured stderr and restores it 1009 """ 1010 return _capture('stderr', printonly)
1011 1012
1013 -def unittest_main(module='__main__', defaultTest=None, 1014 batchmode=False, cvg=None, options=None, 1015 outstream=sys.stderr):
1016 """use this functon if you want to have the same functionality 1017 as unittest.main""" 1018 return SkipAwareTestProgram(module, defaultTest, batchmode, 1019 cvg, options, outstream)
1020
1021 -class TestSkipped(Exception):
1022 """raised when a test is skipped"""
1023
1024 -def is_generator(function):
1025 flags = function.func_code.co_flags 1026 return flags & CO_GENERATOR
1027 1028
1029 -def parse_generative_args(params):
1030 args = [] 1031 varargs = () 1032 kwargs = {} 1033 flags = 0 # 2 <=> starargs, 4 <=> kwargs 1034 for param in params: 1035 if isinstance(param, starargs): 1036 varargs = param 1037 if flags: 1038 raise TypeError('found starargs after keywords !') 1039 flags |= 2 1040 args += list(varargs) 1041 elif isinstance(param, keywords): 1042 kwargs = param 1043 if flags & 4: 1044 raise TypeError('got multiple keywords parameters') 1045 flags |= 4 1046 elif flags & 2 or flags & 4: 1047 raise TypeError('found parameters after kwargs or args') 1048 else: 1049 args.append(param) 1050 1051 return args, kwargs
1052
1053 -class InnerTest(tuple):
1054 - def __new__(cls, name, *data):
1055 instance = tuple.__new__(cls, data) 1056 instance.name = name 1057 return instance
1058
1059 -class ClassGetProperty(object):
1060 """this is a simple property-like class but for 1061 class attributes. 1062 """ 1063
1064 - def __init__(self, getter):
1065 self.getter = getter
1066
1067 - def __get__(self, obj, objtype): # pylint: disable-msg=W0613
1068 "__get__(objn objtype) -> objtype" 1069 return self.getter(objtype)
1070 1071
1072 -class TestCase(unittest.TestCase):
1073 """unittest.TestCase with some additional methods""" 1074 1075 capture = False 1076 pdbclass = Debugger 1077
1078 - def __init__(self, methodName='runTest'):
1079 super(TestCase, self).__init__(methodName) 1080 # internal API changed in python2.5 1081 if sys.version_info >= (2, 5): 1082 self.__exc_info = self._exc_info 1083 self.__testMethodName = self._testMethodName 1084 else: 1085 # let's give easier access to _testMethodName to every subclasses 1086 self._testMethodName = self.__testMethodName 1087 self._captured_stdout = "" 1088 self._captured_stderr = "" 1089 self._out = [] 1090 self._err = [] 1091 self._current_test_descr = None 1092 self._options_ = None
1093
1094 - def datadir(cls): # pylint: disable-msg=E0213
1095 """helper attribute holding the standard test's data directory 1096 1097 NOTE: this is a logilab's standard 1098 """ 1099 mod = __import__(cls.__module__) 1100 return osp.join(osp.dirname(osp.abspath(mod.__file__)), 'data')
1101 # cache it (use a class method to cache on class since TestCase is 1102 # instantiated for each test run) 1103 datadir = ClassGetProperty(cached(datadir)) 1104
1105 - def datapath(self, fname):
1106 """joins the object's datadir and `fname`""" 1107 return osp.join(self.datadir, fname)
1108
1109 - def set_description(self, descr):
1110 """sets the current test's description. 1111 This can be useful for generative tests because it allows to specify 1112 a description per yield 1113 """ 1114 self._current_test_descr = descr
1115 1116 # override default's unittest.py feature
1117 - def shortDescription(self):
1118 """override default unitest shortDescription to handle correctly 1119 generative tests 1120 """ 1121 if self._current_test_descr is not None: 1122 return self._current_test_descr 1123 return super(TestCase, self).shortDescription()
1124 1125
1126 - def captured_output(self):
1127 """return a two tuple with standard output and error stripped""" 1128 return self._captured_stdout.strip(), self._captured_stderr.strip()
1129
1130 - def _start_capture(self):
1131 """start_capture if enable""" 1132 if self.capture: 1133 self.start_capture()
1134
1135 - def _stop_capture(self):
1136 """stop_capture and restore previous output""" 1137 self._force_output_restore()
1138
1139 - def start_capture(self, printonly=None):
1140 """start_capture""" 1141 self._out.append(capture_stdout(printonly or self._printonly)) 1142 self._err.append(capture_stderr(printonly or self._printonly))
1143
1144 - def printonly(self, pattern, flags=0):
1145 """set the pattern of line to print""" 1146 rgx = re.compile(pattern, flags) 1147 if self._out: 1148 self._out[-1].printonly = rgx 1149 self._err[-1].printonly = rgx 1150 else: 1151 self.start_capture(printonly=rgx)
1152
1153 - def stop_capture(self):
1154 """stop output and error capture""" 1155 if self._out: 1156 _out = self._out.pop() 1157 _err = self._err.pop() 1158 return _out.restore(), _err.restore() 1159 return '', ''
1160
1161 - def _force_output_restore(self):
1162 """remove all capture set""" 1163 while self._out: 1164 self._captured_stdout += self._out.pop().restore() 1165 self._captured_stderr += self._err.pop().restore()
1166
1167 - def quiet_run(self, result, func, *args, **kwargs):
1168 self._start_capture() 1169 try: 1170 func(*args, **kwargs) 1171 except (KeyboardInterrupt, SystemExit): 1172 self._stop_capture() 1173 raise 1174 except: 1175 self._stop_capture() 1176 result.addError(self, self.__exc_info()) 1177 return False 1178 self._stop_capture() 1179 return True
1180
1181 - def _get_test_method(self):
1182 """return the test method""" 1183 return getattr(self, self.__testMethodName)
1184 1185
1186 - def optval(self, option, default=None):
1187 """return the option value or default if the option is not define""" 1188 return getattr(self._options_, option, default)
1189
1190 - def __call__(self, result=None, runcondition=None, options=None):
1191 """rewrite TestCase.__call__ to support generative tests 1192 This is mostly a copy/paste from unittest.py (i.e same 1193 variable names, same logic, except for the generative tests part) 1194 """ 1195 if result is None: 1196 result = self.defaultTestResult() 1197 result.pdbclass = self.pdbclass 1198 # if self.capture is True here, it means it was explicitly specified 1199 # in the user's TestCase class. If not, do what was asked on cmd line 1200 self.capture = self.capture or getattr(result, 'capture', False) 1201 self._options_ = options 1202 self._printonly = getattr(result, 'printonly', None) 1203 # if result.cvg: 1204 # result.cvg.start() 1205 testMethod = self._get_test_method() 1206 if runcondition and not runcondition(testMethod): 1207 return # test is skipped 1208 result.startTest(self) 1209 try: 1210 if not self.quiet_run(result, self.setUp): 1211 return 1212 # generative tests 1213 if is_generator(testMethod.im_func): 1214 success = self._proceed_generative(result, testMethod, 1215 runcondition) 1216 else: 1217 status = self._proceed(result, testMethod) 1218 success = (status == 0) 1219 if not self.quiet_run(result, self.tearDown): 1220 return 1221 if success: 1222 if hasattr(options, "exitfirst") and options.exitfirst: 1223 # add this test to restart file 1224 try: 1225 restartfile = open(FILE_RESTART, 'a') 1226 try: 1227 try: 1228 descr = '.'.join((self.__class__.__module__, 1229 self.__class__.__name__, 1230 self._testMethodName)) 1231 restartfile.write(descr+os.linesep) 1232 except Exception, e: 1233 raise e 1234 finally: 1235 restartfile.close() 1236 except Exception, e: 1237 print >> sys.__stderr__, "Error while saving \ 1238 succeeded test into", osp.join(os.getcwd(),FILE_RESTART) 1239 raise e 1240 result.addSuccess(self) 1241 finally: 1242 # if result.cvg: 1243 # result.cvg.stop() 1244 result.stopTest(self)
1245 1246 1247
1248 - def _proceed_generative(self, result, testfunc, runcondition=None):
1249 # cancel startTest()'s increment 1250 result.testsRun -= 1 1251 self._start_capture() 1252 success = True 1253 try: 1254 for params in testfunc(): 1255 if runcondition and not runcondition(testfunc, 1256 skipgenerator=False): 1257 if not (isinstance(params, InnerTest) 1258 and runcondition(params)): 1259 continue 1260 if not isinstance(params, (tuple, list)): 1261 params = (params, ) 1262 func = params[0] 1263 args, kwargs = parse_generative_args(params[1:]) 1264 # increment test counter manually 1265 result.testsRun += 1 1266 status = self._proceed(result, func, args, kwargs) 1267 if status == 0: 1268 result.addSuccess(self) 1269 success = True 1270 else: 1271 success = False 1272 if status == 2: 1273 result.shouldStop = True 1274 if result.shouldStop: # either on error or on exitfirst + error 1275 break 1276 except: 1277 # if an error occurs between two yield 1278 result.addError(self, self.__exc_info()) 1279 success = False 1280 self._stop_capture() 1281 return success
1282
1283 - def _proceed(self, result, testfunc, args=(), kwargs=None):
1284 """proceed the actual test 1285 returns 0 on success, 1 on failure, 2 on error 1286 1287 Note: addSuccess can't be called here because we have to wait 1288 for tearDown to be successfully executed to declare the test as 1289 successful 1290 """ 1291 self._start_capture() 1292 kwargs = kwargs or {} 1293 try: 1294 testfunc(*args, **kwargs) 1295 self._stop_capture() 1296 except self.failureException: 1297 self._stop_capture() 1298 result.addFailure(self, self.__exc_info()) 1299 return 1 1300 except KeyboardInterrupt: 1301 self._stop_capture() 1302 raise 1303 except: 1304 self._stop_capture() 1305 result.addError(self, self.__exc_info()) 1306 return 2 1307 return 0
1308
1309 - def defaultTestResult(self):
1310 """return a new instance of the defaultTestResult""" 1311 return SkipAwareTestResult()
1312
1313 - def skip(self, msg=None):
1314 """mark a test as skipped for the <msg> reason""" 1315 msg = msg or 'test was skipped' 1316 raise TestSkipped(msg)
1317 skipped_test = deprecated_function(skip) 1318
1319 - def assertIn(self, object, set):
1320 """assert <object> are in <set>""" 1321 self.assert_(object in set, "%s not in %s" % (object, set))
1322
1323 - def assertNotIn(self, object, set):
1324 """assert <object> are not in <set>""" 1325 self.assert_(object not in set, "%s in %s" % (object, set))
1326
1327 - def assertDictEquals(self, dict1, dict2):
1328 """compares two dicts 1329 1330 If the two dict differ, the first difference is shown in the error 1331 message 1332 """ 1333 dict1 = dict(dict1) 1334 msgs = [] 1335 for key, value in dict2.items(): 1336 try: 1337 if dict1[key] != value: 1338 msgs.append('%r != %r for key %r' % (dict1[key], value, 1339 key)) 1340 del dict1[key] 1341 except KeyError: 1342 msgs.append('missing %r key' % key) 1343 if dict1: 1344 msgs.append('dict2 is lacking %r' % dict1) 1345 if msgs: 1346 self.fail(''.join(msgs))
1347 assertDictEqual = assertDictEquals 1348 1349 1350
1351 - def assertUnorderedIterableEquals(self, got, expected, msg=None):
1352 """compares two iterable and shows difference between both""" 1353 got, expected = list(got), list(expected) 1354 self.assertSetEqual(set(got), set(expected), msg) 1355 if len(got) != len(expected): 1356 if msg is None: 1357 msg = ['Iterable have the same elements but not the same number', 1358 '\t<element>\t<expected>i\t<got>'] 1359 got_count = {} 1360 expected_count = {} 1361 for element in got: 1362 got_count[element] = got_count.get(element,0) + 1 1363 for element in expected: 1364 expected_count[element] = expected_count.get(element,0) + 1 1365 # we know that got_count.key() == expected_count.key() 1366 # because of assertSetEquals 1367 for element, count in got_count.iteritems(): 1368 other_count = expected_count[element] 1369 if other_count != count: 1370 msg.append('\t%s\t%s\t%s' % (element, other_count, count)) 1371 1372 self.fail(msg)
1373 1374 assertUnorderedIterableEqual = assertUnorderedIterableEquals 1375 assertUnordIterEquals = assertUnordIterEqual = assertUnorderedIterableEqual 1376
1377 - def assertSetEquals(self,got,expected, msg=None):
1378 if not(isinstance(got, set) and isinstance(expected, set)): 1379 warn("the assertSetEquals function if now intended for set only."\ 1380 "use assertUnorderedIterableEquals instead.", 1381 DeprecationWarning, 2) 1382 return self.assertUnorderedIterableEquals(got,expected, msg) 1383 1384 items={} 1385 items['missing'] = expected - got 1386 items['unexpected'] = got - expected 1387 if any(items.itervalues()): 1388 if msg is None: 1389 msg = '\n'.join('%s:\n\t%s' % (key,"\n\t".join(str(value) for value in values)) 1390 for key, values in items.iteritems() if values) 1391 self.fail(msg)
1392 1393 1394 assertSetEqual = assertSetEquals 1395
1396 - def assertListEquals(self, list_1, list_2, msg=None):
1397 """compares two lists 1398 1399 If the two list differ, the first difference is shown in the error 1400 message 1401 """ 1402 _l1 = list_1[:] 1403 for i, value in enumerate(list_2): 1404 try: 1405 if _l1[0] != value: 1406 from pprint import pprint 1407 pprint(list_1) 1408 pprint(list_2) 1409 self.fail('%r != %r for index %d' % (_l1[0], value, i)) 1410 del _l1[0] 1411 except IndexError: 1412 if msg is None: 1413 msg = 'list_1 has only %d elements, not %s '\ 1414 '(at least %r missing)'% (i, len(list_2), value) 1415 self.fail(msg) 1416 if _l1: 1417 if msg is None: 1418 msg = 'list_2 is lacking %r' % _l1 1419 self.fail(msg)
1420 assertListEqual = assertListEquals 1421
1422 - def assertLinesEquals(self, list_1, list_2, msg=None):
1423 """assert list of lines are equal""" 1424 self.assertListEquals(list_1.splitlines(), list_2.splitlines(), msg)
1425 assertLineEqual = assertLinesEquals 1426
1427 - def assertXMLWellFormed(self, stream, msg=None):
1428 """asserts the XML stream is well-formed (no DTD conformance check)""" 1429 from xml.sax import make_parser, SAXParseException 1430 parser = make_parser() 1431 try: 1432 parser.parse(stream) 1433 except SAXParseException: 1434 if msg is None: 1435 msg = 'XML stream not well formed' 1436 self.fail(msg)
1437 assertXMLValid = deprecated_function(assertXMLWellFormed, 1438 'assertXMLValid renamed to more precise assertXMLWellFormed') 1439
1440 - def assertXMLStringWellFormed(self, xml_string, msg=None):
1441 """asserts the XML string is well-formed (no DTD conformance check)""" 1442 stream = StringIO(xml_string) 1443 self.assertXMLWellFormed(stream, msg)
1444 1445 assertXMLStringValid = deprecated_function( 1446 assertXMLStringWellFormed, 1447 'assertXMLStringValid renamed to more precise assertXMLStringWellFormed' 1448 ) 1449
1450 - def assertXMLEqualsTuple(self, element, tup):
1451 """compare an ElementTree Element to a tuple formatted as follow: 1452 (tagname, [attrib[, children[, text[, tail]]]])""" 1453 # check tag 1454 self.assertTextEquals(element.tag, tup[0]) 1455 # check attrib 1456 if len(element.attrib) or len(tup)>1: 1457 if len(tup)<=1: 1458 self.fail( "tuple %s has no attributes (%s expected)"%(tup, 1459 dict(element.attrib))) 1460 self.assertDictEquals(element.attrib, tup[1]) 1461 # check childrend 1462 if len(element) or len(tup)>2: 1463 if len(tup)<=2: 1464 self.fail( "tuple %s has no children (%i expected)"%(tup, 1465 len(element))) 1466 if len(element) != len(tup[2]): 1467 self.fail( "tuple %s has %i children%s (%i expected)"%(tup, 1468 len(tup[2]), 1469 ('', 's')[len(tup[2])>1], len(element))) 1470 for index in xrange(len(tup[2])): 1471 self.assertXMLEqualsTuple(element[index], tup[2][index]) 1472 #check text 1473 if element.text or len(tup)>3: 1474 if len(tup)<=3: 1475 self.fail( "tuple %s has no text value (%r expected)"%(tup, 1476 element.text)) 1477 self.assertTextEquals(element.text, tup[3]) 1478 #check tail 1479 if element.tail or len(tup)>4: 1480 if len(tup)<=4: 1481 self.fail( "tuple %s has no tail value (%r expected)"%(tup, 1482 element.tail)) 1483 self.assertTextEquals(element.tail, tup[4])
1484
1485 - def _difftext(self, lines1, lines2, junk=None, msg_prefix='Texts differ'):
1486 junk = junk or (' ', '\t') 1487 # result is a generator 1488 result = difflib.ndiff(lines1, lines2, charjunk=lambda x: x in junk) 1489 read = [] 1490 for line in result: 1491 read.append(line) 1492 # lines that don't start with a ' ' are diff ones 1493 if not line.startswith(' '): 1494 self.fail('\n'.join(['%s\n'%msg_prefix]+read + list(result)))
1495
1496 - def assertTextEquals(self, text1, text2, junk=None, 1497 msg_prefix='Text differ'):
1498 """compare two multiline strings (using difflib and splitlines())""" 1499 msg = [] 1500 if not isinstance(text1, basestring): 1501 msg.append('text1 is not a string (%s)'%(type(text1))) 1502 if not isinstance(text2, basestring): 1503 msg.append('text2 is not a string (%s)'%(type(text2))) 1504 if msg: 1505 self.fail('\n'.join(msg)) 1506 self._difftext(text1.strip().splitlines(True), text2.strip().splitlines(True), 1507 junk, msg_prefix)
1508 assertTextEqual = assertTextEquals 1509
1510 - def assertStreamEquals(self, stream1, stream2, junk=None, 1511 msg_prefix='Stream differ'):
1512 """compare two streams (using difflib and readlines())""" 1513 # if stream2 is stream2, readlines() on stream1 will also read lines 1514 # in stream2, so they'll appear different, although they're not 1515 if stream1 is stream2: 1516 return 1517 # make sure we compare from the beginning of the stream 1518 stream1.seek(0) 1519 stream2.seek(0) 1520 # ocmpare 1521 self._difftext(stream1.readlines(), stream2.readlines(), junk, 1522 msg_prefix)
1523 1524 assertStreamEqual = assertStreamEquals
1525 - def assertFileEquals(self, fname1, fname2, junk=(' ', '\t')):
1526 """compares two files using difflib""" 1527 self.assertStreamEqual(file(fname1), file(fname2), junk, 1528 msg_prefix='Files differs\n-:%s\n+:%s\n'%(fname1, fname2))
1529 assertFileEqual = assertFileEquals 1530 1531
1532 - def assertDirEquals(self, path_a, path_b):
1533 """compares two files using difflib""" 1534 assert osp.exists(path_a), "%s doesn't exists" % path_a 1535 assert osp.exists(path_b), "%s doesn't exists" % path_b 1536 1537 all_a = [ (ipath[len(path_a):].lstrip('/'), idirs, ifiles) 1538 for ipath, idirs, ifiles in os.walk(path_a)] 1539 all_a.sort(key=itemgetter(0)) 1540 1541 all_b = [ (ipath[len(path_b):].lstrip('/'), idirs, ifiles) 1542 for ipath, idirs, ifiles in os.walk(path_b)] 1543 all_b.sort(key=itemgetter(0)) 1544 1545 iter_a, iter_b = iter(all_a), iter(all_b) 1546 partial_iter = True 1547 ipath_a, idirs_a, ifiles_a = data_a = None, None, None 1548 while True: 1549 try: 1550 ipath_a, idirs_a, ifiles_a = datas_a = iter_a.next() 1551 partial_iter = False 1552 ipath_b, idirs_b, ifiles_b = datas_b = iter_b.next() 1553 partial_iter = True 1554 1555 1556 self.assert_(ipath_a == ipath_b, 1557 "unexpected %s in %s while looking %s from %s" % 1558 (ipath_a, path_a, ipath_b, path_b)) 1559 1560 1561 errors = {} 1562 sdirs_a = set(idirs_a) 1563 sdirs_b = set(idirs_b) 1564 errors["unexpected directories"] = sdirs_a - sdirs_b 1565 errors["missing directories"] = sdirs_b - sdirs_a 1566 1567 sfiles_a = set(ifiles_a) 1568 sfiles_b = set(ifiles_b) 1569 errors["unexpected files"] = sfiles_a - sfiles_b 1570 errors["missing files"] = sfiles_b - sfiles_a 1571 1572 1573 msgs = [ "%s: %s"% (name, items) 1574 for name, items in errors.iteritems() if items] 1575 1576 if msgs: 1577 msgs.insert(0,"%s and %s differ :" % ( 1578 osp.join(path_a, ipath_a), 1579 osp.join(path_b, ipath_b), 1580 )) 1581 self.fail("\n".join(msgs)) 1582 1583 for files in (ifiles_a, ifiles_b): 1584 files.sort() 1585 1586 for index, path in enumerate(ifiles_a): 1587 self.assertFileEquals(osp.join(path_a, ipath_a, path), 1588 osp.join(path_b, ipath_b, ifiles_b[index])) 1589 1590 except StopIteration: 1591 break
1592 1593 1594 assertDirEqual = assertDirEquals 1595 1596
1597 - def assertIsInstance(self, obj, klass, msg=None, strict=False):
1598 """compares two files using difflib""" 1599 if msg is None: 1600 if strict: 1601 msg = '%r is not of class %s but of %s' 1602 else: 1603 msg = '%r is not an instance of %s but of %s' 1604 msg = msg % (obj, klass, type(obj)) 1605 if strict: 1606 self.assert_(obj.__class__ is klass, msg) 1607 else: 1608 self.assert_(isinstance(obj, klass), msg)
1609
1610 - def assertIs(self, obj, other, msg=None):
1611 """compares identity of two reference""" 1612 if msg is None: 1613 msg = "%r is not %r"%(obj, other) 1614 self.assert_(obj is other, msg)
1615 1616
1617 - def assertIsNot(self, obj, other, msg=None):
1618 """compares identity of two reference""" 1619 if msg is None: 1620 msg = "%r is %r"%(obj, other) 1621 self.assert_(obj is not other, msg )
1622
1623 - def assertNone(self, obj, msg=None):
1624 """assert obj is None""" 1625 if msg is None: 1626 msg = "reference to %r when None expected"%(obj,) 1627 self.assert_( obj is None, msg )
1628
1629 - def assertNotNone(self, obj, msg=None):
1630 """assert obj is not None""" 1631 if msg is None: 1632 msg = "unexpected reference to None" 1633 self.assert_( obj is not None, msg )
1634
1635 - def assertFloatAlmostEquals(self, obj, other, prec=1e-5, msg=None):
1636 """compares two floats""" 1637 if msg is None: 1638 msg = "%r != %r" % (obj, other) 1639 self.assert_(math.fabs(obj - other) < prec, msg)
1640
1641 - def failUnlessRaises(self, excClass, callableObj, *args, **kwargs):
1642 """override default failUnlessRaise method to return the raised 1643 exception instance. 1644 1645 Fail unless an exception of class excClass is thrown 1646 by callableObj when invoked with arguments args and keyword 1647 arguments kwargs. If a different type of exception is 1648 thrown, it will not be caught, and the test case will be 1649 deemed to have suffered an error, exactly as for an 1650 unexpected exception. 1651 """ 1652 try: 1653 callableObj(*args, **kwargs) 1654 except excClass, exc: 1655 return exc 1656 else: 1657 if hasattr(excClass, '__name__'): 1658 excName = excClass.__name__ 1659 else: 1660 excName = str(excClass) 1661 raise self.failureException, "%s not raised" % excName
1662 1663 assertRaises = failUnlessRaises 1664 1665 import doctest 1666
1667 -class SkippedSuite(unittest.TestSuite):
1668 - def test(self):
1669 """just there to trigger test execution""" 1670 self.skipped_test('doctest module has no DocTestSuite class')
1671 1672 1673 # DocTestFinder was introduced in python2.4 1674 if sys.version_info >= (2, 4):
1675 - class DocTestFinder(doctest.DocTestFinder):
1676
1677 - def __init__(self, *args, **kwargs):
1678 self.skipped = kwargs.pop('skipped', ()) 1679 doctest.DocTestFinder.__init__(self, *args, **kwargs)
1680
1681 - def _get_test(self, obj, name, module, globs, source_lines):
1682 """override default _get_test method to be able to skip tests 1683 according to skipped attribute's value 1684 1685 Note: Python (<=2.4) use a _name_filter which could be used for that 1686 purpose but it's no longer available in 2.5 1687 Python 2.5 seems to have a [SKIP] flag 1688 """ 1689 if getattr(obj, '__name__', '') in self.skipped: 1690 return None 1691 return doctest.DocTestFinder._get_test(self, obj, name, module, 1692 globs, source_lines)
1693 else: 1694 # this is a hack to make skipped work with python <= 2.3
1695 - class DocTestFinder(object):
1696 - def __init__(self, skipped):
1697 self.skipped = skipped 1698 self.original_find_tests = doctest._find_tests 1699 doctest._find_tests = self._find_tests
1700
1701 - def _find_tests(self, module, prefix=None):
1702 tests = [] 1703 for testinfo in self.original_find_tests(module, prefix): 1704 testname, _, _, _ = testinfo 1705 # testname looks like A.B.C.function_name 1706 testname = testname.split('.')[-1] 1707 if testname not in self.skipped: 1708 tests.append(testinfo) 1709 return tests
1710 1711
1712 -class DocTest(TestCase):
1713 """trigger module doctest 1714 I don't know how to make unittest.main consider the DocTestSuite instance 1715 without this hack 1716 """ 1717 skipped = ()
1718 - def __call__(self, result=None, runcondition=None, options=None):\ 1719 # pylint: disable-msg=W0613
1720 try: 1721 finder = DocTestFinder(skipped=self.skipped) 1722 if sys.version_info >= (2, 4): 1723 suite = doctest.DocTestSuite(self.module, test_finder=finder) 1724 else: 1725 suite = doctest.DocTestSuite(self.module) 1726 except AttributeError: 1727 suite = SkippedSuite() 1728 return suite.run(result)
1729 run = __call__ 1730
1731 - def test(self):
1732 """just there to trigger test execution"""
1733 1734 MAILBOX = None 1735
1736 -class MockSMTP:
1737 """fake smtplib.SMTP""" 1738
1739 - def __init__(self, host, port):
1740 self.host = host 1741 self.port = port 1742 global MAILBOX 1743 self.reveived = MAILBOX = []
1744
1745 - def set_debuglevel(self, debuglevel):
1746 """ignore debug level"""
1747
1748 - def sendmail(self, fromaddr, toaddres, body):
1749 """push sent mail in the mailbox""" 1750 self.reveived.append((fromaddr, toaddres, body))
1751
1752 - def quit(self):
1753 """ignore quit"""
1754 1755
1756 -class MockConfigParser(ConfigParser):
1757 """fake ConfigParser.ConfigParser""" 1758
1759 - def __init__(self, options):
1760 ConfigParser.__init__(self) 1761 for section, pairs in options.iteritems(): 1762 self.add_section(section) 1763 for key, value in pairs.iteritems(): 1764 self.set(section,key,value)
1765 - def write(self, _):
1766 raise NotImplementedError()
1767 1768
1769 -class MockConnection:
1770 """fake DB-API 2.0 connexion AND cursor (i.e. cursor() return self)""" 1771
1772 - def __init__(self, results):
1773 self.received = [] 1774 self.states = [] 1775 self.results = results
1776
1777 - def cursor(self):
1778 """Mock cursor method""" 1779 return self
1780 - def execute(self, query, args=None):
1781 """Mock execute method""" 1782 self.received.append( (query, args) )
1783 - def fetchone(self):
1784 """Mock fetchone method""" 1785 return self.results[0]
1786 - def fetchall(self):
1787 """Mock fetchall method""" 1788 return self.results
1789 - def commit(self):
1790 """Mock commiy method""" 1791 self.states.append( ('commit', len(self.received)) )
1792 - def rollback(self):
1793 """Mock rollback method""" 1794 self.states.append( ('rollback', len(self.received)) )
1795 - def close(self):
1796 """Mock close method""" 1797 pass
1798 1799 MockConnexion = class_renamed('MockConnexion', MockConnection) 1800
1801 -def mock_object(**params):
1802 """creates an object using params to set attributes 1803 >>> option = mock_object(verbose=False, index=range(5)) 1804 >>> option.verbose 1805 False 1806 >>> option.index 1807 [0, 1, 2, 3, 4] 1808 """ 1809 return type('Mock', (), params)()
1810 1811
1812 -def create_files(paths, chroot):
1813 """Creates directories and files found in <path>. 1814 1815 :param paths: list of relative paths to files or directories 1816 :param chroot: the root directory in which paths will be created 1817 1818 >>> from os.path import isdir, isfile 1819 >>> isdir('/tmp/a') 1820 False 1821 >>> create_files(['a/b/foo.py', 'a/b/c/', 'a/b/c/d/e.py'], '/tmp') 1822 >>> isdir('/tmp/a') 1823 True 1824 >>> isdir('/tmp/a/b/c') 1825 True 1826 >>> isfile('/tmp/a/b/c/d/e.py') 1827 True 1828 >>> isfile('/tmp/a/b/foo.py') 1829 True 1830 """ 1831 dirs, files = set(), set() 1832 for path in paths: 1833 path = osp.join(chroot, path) 1834 filename = osp.basename(path) 1835 # path is a directory path 1836 if filename == '': 1837 dirs.add(path) 1838 # path is a filename path 1839 else: 1840 dirs.add(osp.dirname(path)) 1841 files.add(path) 1842 for dirpath in dirs: 1843 if not osp.isdir(dirpath): 1844 os.makedirs(dirpath) 1845 for filepath in files: 1846 file(filepath, 'w').close()
1847
1848 -def enable_dbc(*args):
1849 """ 1850 Without arguments, return True if contracts can be enabled and should be 1851 enabled (see option -d), return False otherwise. 1852 1853 With arguments, return False if contracts can't or shouldn't be enabled, 1854 otherwise weave ContractAspect with items passed as arguments. 1855 """ 1856 if not ENABLE_DBC: 1857 return False 1858 try: 1859 from logilab.aspects.weaver import weaver 1860 from logilab.aspects.lib.contracts import ContractAspect 1861 except ImportError: 1862 sys.stderr.write( 1863 'Warning: logilab.aspects is not available. Contracts disabled.') 1864 return False 1865 for arg in args: 1866 weaver.weave_module(arg, ContractAspect) 1867 return True
1868 1869
1870 -class AttrObject: # XXX cf mock_object
1871 - def __init__(self, **kwargs):
1872 self.__dict__.update(kwargs)
1873
1874 -def tag(*args):
1875 """descriptor adding tag to a function""" 1876 def desc(func): 1877 assert not hasattr(func, 'tags') 1878 func.tags = Tags(args) 1879 return func
1880 return desc 1881
1882 -class Tags(set):
1883 """A set of tag able validate an expression"""
1884 - def __getitem__(self, key):
1885 return key in self
1886
1887 - def match(self, exp):
1888 return eval(exp, {}, self)
1889
1890 -def require_version(version):
1891 """ Compare version of python interpretor to the given one. Skip the test 1892 if older. 1893 """ 1894 def check_require_version(f): 1895 version_elements = version.split('.') 1896 try: 1897 compare = tuple([int(v) for v in version_elements]) 1898 except ValueError: 1899 raise ValueError('%s is not a correct version : should be X.Y[.Z].' % version) 1900 current = sys.version_info[:3] 1901 #print 'comp', current, compare 1902 if current < compare: 1903 #print 'version too old' 1904 def new_f(self, *args, **kwargs): 1905 self.skip('Need at least %s version of python. Current version is %s.' % (version, '.'.join([str(element) for element in current])))
1906 new_f.__name__ = f.__name__ 1907 return new_f 1908 else: 1909 #print 'version young enough' 1910 return f 1911 return check_require_version 1912
1913 -def require_module(module):
1914 """ Check if the given module is loaded. Skip the test if not. 1915 """ 1916 def check_require_module(f): 1917 try: 1918 __import__(module) 1919 #print module, 'imported' 1920 return f 1921 except ImportError: 1922 #print module, 'can not be imported' 1923 def new_f(self, *args, **kwargs): 1924 self.skip('%s can not be imported.' % module)
1925 new_f.__name__ = f.__name__ 1926 return new_f 1927 return check_require_module 1928