Package logilab-common-0 :: Package 36 :: Package 1 :: Module testlib
[frames] | no frames]

Source Code for Module logilab-common-0.36.1.testlib

   1  # -*- coding: utf-8 -*- 
   2  """Run tests. 
   3   
   4  This will find all modules whose name match a given prefix in the test 
   5  directory, and run them. Various command line options provide 
   6  additional facilities. 
   7   
   8  Command line options: 
   9   
  10   -v: verbose -- run tests in verbose mode with output to stdout 
  11   -q: quiet   -- don't print anything except if a test fails 
  12   -t: testdir -- directory where the tests will be found 
  13   -x: exclude -- add a test to exclude 
  14   -p: profile -- profiled execution 
  15   -c: capture -- capture standard out/err during tests 
  16   -d: dbc     -- enable design-by-contract 
  17   -m: match   -- only run test matching the tag pattern which follow 
  18   
  19  If no non-option arguments are present, prefixes used are 'test', 
  20  'regrtest', 'smoketest' and 'unittest'. 
  21   
  22  :copyright: 2003-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved. 
  23  :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr 
  24  :license: General Public License version 2 - http://www.gnu.org/licenses 
  25  """ 
  26  __docformat__ = "restructuredtext en" 
  27  # modified copy of some functions from test/regrtest.py from PyXml 
  28  # disable camel case warning 
  29  # pylint: disable-msg=C0103 
  30   
  31  import sys 
  32  import os, os.path as osp 
  33  import re 
  34  import time 
  35  import getopt 
  36  import traceback 
  37  import unittest 
  38  import difflib 
  39  import types 
  40  import tempfile 
  41  from shutil import rmtree 
  42  from operator import itemgetter 
  43  from warnings import warn 
  44  from compiler.consts import CO_GENERATOR 
  45  from ConfigParser import ConfigParser 
  46   
  47   
  48  # PRINT_ = file('stdout.txt', 'w').write 
  49   
  50  try: 
  51      from test import test_support 
  52  except ImportError: 
  53      # not always available 
54 - class TestSupport:
55 - def unload(self, test):
56 pass
57 test_support = TestSupport() 58 59 try: 60 from pygments import highlight, lexers, formatters 61 # only print in color if executed from a terminal 62 PYGMENTS_FOUND = True 63 except ImportError: 64 PYGMENTS_FOUND = False 65 66 from logilab.common.deprecation import class_renamed, deprecated_function, \ 67 obsolete 68 # pylint: disable-msg=W0622 69 from logilab.common.compat import set, enumerate, any 70 # pylint: enable-msg=W0622 71 from logilab.common.modutils import load_module_from_name 72 from logilab.common.debugger import Debugger 73 from logilab.common.decorators import cached 74 from logilab.common import textutils 75 76 __all__ = ['main', 'unittest_main', 'find_tests', 'run_test', 'spawn'] 77 78 DEFAULT_PREFIXES = ('test', 'regrtest', 'smoketest', 'unittest', 79 'func', 'validation') 80 81 ENABLE_DBC = False 82 83 FILE_RESTART = ".pytest.restart" 84 85
86 -def with_tempdir(callable):
87 """A decorator ensuring no temporary file left when the function return 88 Work only for temporary file create with the tempfile module""" 89 def proxy(*args, **kargs): 90 91 old_tmpdir = tempfile.gettempdir() 92 new_tmpdir = tempfile.mkdtemp("-logilab-common-testlib","temp-dir-") 93 tempfile.tempdir = new_tmpdir 94 try: 95 return callable(*args, **kargs) 96 finally: 97 try: 98 rmtree(new_tmpdir, ignore_errors=True) 99 finally: 100 tempfile.tempdir = old_tmpdir
101 return proxy 102
103 -def main(testdir=None, exitafter=True):
104 """Execute a test suite. 105 106 This also parses command-line options and modifies its behaviour 107 accordingly. 108 109 tests -- a list of strings containing test names (optional) 110 testdir -- the directory in which to look for tests (optional) 111 112 Users other than the Python test suite will certainly want to 113 specify testdir; if it's omitted, the directory containing the 114 Python test suite is searched for. 115 116 If the tests argument is omitted, the tests listed on the 117 command-line will be used. If that's empty, too, then all *.py 118 files beginning with test_ will be used. 119 120 """ 121 122 try: 123 opts, args = getopt.getopt(sys.argv[1:], 'hvqxr:t:pcd', ['help']) 124 except getopt.error, msg: 125 print msg 126 print __doc__ 127 return 2 128 verbose = 0 129 quiet = False 130 profile = False 131 exclude = [] 132 capture = 0 133 for o, a in opts: 134 if o == '-v': 135 verbose += 1 136 elif o == '-q': 137 quiet = True 138 verbose = 0 139 elif o == '-x': 140 exclude.append(a) 141 elif o == '-t': 142 testdir = a 143 elif o == '-p': 144 profile = True 145 elif o == '-c': 146 capture += 1 147 elif o == '-d': 148 global ENABLE_DBC 149 ENABLE_DBC = True 150 elif o in ('-h', '--help'): 151 print __doc__ 152 sys.exit(0) 153 154 args = [item.rstrip('.py') for item in args] 155 exclude = [item.rstrip('.py') for item in exclude] 156 157 if testdir is not None: 158 os.chdir(testdir) 159 sys.path.insert(0, '') 160 tests = find_tests('.', args or DEFAULT_PREFIXES, excludes=exclude) 161 # Tell tests to be moderately quiet 162 test_support.verbose = verbose 163 if profile: 164 print >> sys.stderr, '** profiled run' 165 from hotshot import Profile 166 prof = Profile('stones.prof') 167 start_time, start_ctime = time.time(), time.clock() 168 good, bad, skipped, all_result = prof.runcall(run_tests, tests, quiet, 169 verbose, None, capture) 170 end_time, end_ctime = time.time(), time.clock() 171 prof.close() 172 else: 173 start_time, start_ctime = time.time(), time.clock() 174 good, bad, skipped, all_result = run_tests(tests, quiet, verbose, None, 175 capture) 176 end_time, end_ctime = time.time(), time.clock() 177 if not quiet: 178 print '*'*80 179 if all_result: 180 print 'Ran %s test cases in %0.2fs (%0.2fs CPU)' % ( 181 all_result.testsRun, end_time - start_time, 182 end_ctime - start_ctime), 183 if all_result.errors: 184 print ', %s errors' % len(all_result.errors), 185 if all_result.failures: 186 print ', %s failed' % len(all_result.failures), 187 if all_result.skipped: 188 print ', %s skipped' % len(all_result.skipped), 189 print 190 if good: 191 if not bad and not skipped and len(good) > 1: 192 print "All", 193 print _count(len(good), "test"), "OK." 194 if bad: 195 print _count(len(bad), "test"), "failed:", 196 print ', '.join(bad) 197 if skipped: 198 print _count(len(skipped), "test"), "skipped:", 199 print ', '.join(['%s (%s)' % (test, msg) for test, msg in skipped]) 200 if profile: 201 from hotshot import stats 202 stats = stats.load('stones.prof') 203 stats.sort_stats('time', 'calls') 204 stats.print_stats(30) 205 if exitafter: 206 sys.exit(len(bad) + len(skipped)) 207 else: 208 sys.path.pop(0) 209 return len(bad)
210 main = obsolete("testlib.main() is obsolete, use the pytest tool instead")(main) 211 212
213 -def run_tests(tests, quiet, verbose, runner=None, capture=0):
214 """Execute a list of tests. 215 216 :rtype: tuple 217 :return: tuple (list of passed tests, list of failed tests, list of skipped tests) 218 """ 219 good = [] 220 bad = [] 221 skipped = [] 222 all_result = None 223 for test in tests: 224 if not quiet: 225 print 226 print '-'*80 227 print "Executing", test 228 result = run_test(test, verbose, runner, capture) 229 if type(result) is type(''): 230 # an unexpected error occured 231 skipped.append( (test, result)) 232 else: 233 if all_result is None: 234 all_result = result 235 else: 236 all_result.testsRun += result.testsRun 237 all_result.failures += result.failures 238 all_result.errors += result.errors 239 all_result.skipped += result.skipped 240 if result.errors or result.failures: 241 bad.append(test) 242 if verbose: 243 print "test", test, \ 244 "failed -- %s errors, %s failures" % ( 245 len(result.errors), len(result.failures)) 246 else: 247 good.append(test) 248 249 return good, bad, skipped, all_result
250
251 -def find_tests(testdir, 252 prefixes=DEFAULT_PREFIXES, suffix=".py", 253 excludes=(), 254 remove_suffix=True):
255 """ 256 Return a list of all applicable test modules. 257 """ 258 tests = [] 259 for name in os.listdir(testdir): 260 if not suffix or name.endswith(suffix): 261 for prefix in prefixes: 262 if name.startswith(prefix): 263 if remove_suffix and name.endswith(suffix): 264 name = name[:-len(suffix)] 265 if name not in excludes: 266 tests.append(name) 267 tests.sort() 268 return tests
269 270
271 -def run_test(test, verbose, runner=None, capture=0):
272 """ 273 Run a single test. 274 275 test -- the name of the test 276 verbose -- if true, print more messages 277 """ 278 test_support.unload(test) 279 try: 280 m = load_module_from_name(test, path=sys.path) 281 # m = __import__(test, globals(), locals(), sys.path) 282 try: 283 suite = m.suite 284 if callable(suite): 285 suite = suite() 286 except AttributeError: 287 loader = unittest.TestLoader() 288 suite = loader.loadTestsFromModule(m) 289 if runner is None: 290 runner = SkipAwareTextTestRunner(capture=capture) # verbosity=0) 291 return runner.run(suite) 292 except KeyboardInterrupt, v: 293 raise KeyboardInterrupt, v, sys.exc_info()[2] 294 except: 295 # raise 296 type, value = sys.exc_info()[:2] 297 msg = "test %s crashed -- %s : %s" % (test, type, value) 298 if verbose: 299 traceback.print_exc() 300 return msg
301
302 -def _count(n, word):
303 """format word according to n""" 304 if n == 1: 305 return "%d %s" % (n, word) 306 else: 307 return "%d %ss" % (n, word)
308 309 310 311 312 ## PostMortem Debug facilities #####
313 -def start_interactive_mode(result):
314 """starts an interactive shell so that the user can inspect errors 315 """ 316 debuggers = result.debuggers 317 descrs = result.error_descrs + result.fail_descrs 318 if len(debuggers) == 1: 319 # don't ask for test name if there's only one failure 320 debuggers[0].start() 321 else: 322 while True: 323 testindex = 0 324 print "Choose a test to debug:" 325 # order debuggers in the same way than errors were printed 326 print "\n".join(['\t%s : %s' % (i, descr) for i, (_, descr) 327 in enumerate(descrs)]) 328 print "Type 'exit' (or ^D) to quit" 329 print 330 try: 331 todebug = raw_input('Enter a test name: ') 332 if todebug.strip().lower() == 'exit': 333 print 334 break 335 else: 336 try: 337 testindex = int(todebug) 338 debugger = debuggers[descrs[testindex][0]] 339 except (ValueError, IndexError): 340 print "ERROR: invalid test number %r" % (todebug, ) 341 else: 342 debugger.start() 343 except (EOFError, KeyboardInterrupt): 344 print 345 break
346 347 348 # test utils ################################################################## 349 from cStringIO import StringIO 350
351 -class SkipAwareTestResult(unittest._TextTestResult):
352
353 - def __init__(self, stream, descriptions, verbosity, 354 exitfirst=False, capture=0, printonly=None, 355 pdbmode=False, cvg=None):
356 super(SkipAwareTestResult, self).__init__(stream, 357 descriptions, verbosity) 358 self.skipped = [] 359 self.debuggers = [] 360 self.fail_descrs = [] 361 self.error_descrs = [] 362 self.exitfirst = exitfirst 363 self.capture = capture 364 self.printonly = printonly 365 self.pdbmode = pdbmode 366 self.cvg = cvg 367 self.pdbclass = Debugger
368
369 - def descrs_for(self, flavour):
370 return getattr(self, '%s_descrs' % flavour.lower())
371
372 - def _create_pdb(self, test_descr, flavour):
373 self.descrs_for(flavour).append( (len(self.debuggers), test_descr) ) 374 if self.pdbmode: 375 self.debuggers.append(self.pdbclass(sys.exc_info()[2]))
376
377 - def addError(self, test, err):
378 """err == (exc_type, exc, tcbk)""" 379 exc_type, exc, _ = err # 380 if exc_type == TestSkipped: 381 self.addSkipped(test, exc) 382 else: 383 if self.exitfirst: 384 self.shouldStop = True 385 descr = self.getDescription(test) 386 super(SkipAwareTestResult, self).addError(test, err) 387 self._create_pdb(descr, 'error')
388
389 - def addFailure(self, test, err):
390 if self.exitfirst: 391 self.shouldStop = True 392 descr = self.getDescription(test) 393 super(SkipAwareTestResult, self).addFailure(test, err) 394 self._create_pdb(descr, 'fail')
395
396 - def addSkipped(self, test, reason):
397 self.skipped.append((test, self.getDescription(test), reason)) 398 if self.showAll: 399 self.stream.writeln("SKIPPED") 400 elif self.dots: 401 self.stream.write('S')
402
403 - def printErrors(self):
404 super(SkipAwareTestResult, self).printErrors() 405 self.printSkippedList()
406
407 - def printSkippedList(self):
408 for _, descr, err in self.skipped: # test, descr, err 409 self.stream.writeln(self.separator1) 410 self.stream.writeln("%s: %s" % ('SKIPPED', descr)) 411 self.stream.writeln("\t%s" % err)
412
413 - def printErrorList(self, flavour, errors):
414 for (_, descr), (test, err) in zip(self.descrs_for(flavour), errors): 415 self.stream.writeln(self.separator1) 416 if isatty(self.stream): 417 self.stream.writeln("%s: %s" % ( 418 textutils.colorize_ansi(flavour, color='red'), descr)) 419 else: 420 self.stream.writeln("%s: %s" % (flavour, descr)) 421 422 self.stream.writeln(self.separator2) 423 if PYGMENTS_FOUND and isatty(self.stream): 424 # ensure `err` is a unicode string before passing it to highlight 425 if isinstance(err, str): 426 try: 427 # encoded str, no encoding information, try to decode 428 err = err.decode('utf-8') 429 except UnicodeDecodeError: 430 err = err.decode('iso-8859-1') 431 err_color = highlight(err, lexers.PythonLexer(), 432 formatters.terminal.TerminalFormatter()) 433 # `err_color` is a unicode string, encode it before writing 434 # to stdout 435 if hasattr(self.stream, 'encoding'): 436 err_color = err_color.encode(self.stream.encoding, 'replace') 437 else: 438 # rare cases where test ouput has been hijacked, pick 439 # up a random encoding 440 err_color = err_color.encode('utf-8', 'replace') 441 self.stream.writeln(err_color) 442 else: 443 self.stream.writeln(err) 444 445 try: 446 output, errput = test.captured_output() 447 except AttributeError: 448 pass # original unittest 449 else: 450 if output: 451 self.stream.writeln(self.separator2) 452 self.stream.writeln("captured stdout".center( 453 len(self.separator2))) 454 self.stream.writeln(self.separator2) 455 self.stream.writeln(output) 456 else: 457 self.stream.writeln('no stdout'.center( 458 len(self.separator2))) 459 if errput: 460 self.stream.writeln(self.separator2) 461 self.stream.writeln("captured stderr".center( 462 len(self.separator2))) 463 self.stream.writeln(self.separator2) 464 self.stream.writeln(errput) 465 else: 466 self.stream.writeln('no stderr'.center( 467 len(self.separator2)))
468 469
470 -def isatty(stream):
471 return hasattr(stream, 'isatty') and stream.isatty()
472
473 -def run(self, result, runcondition=None, options=None):
474 for test in self._tests: 475 if result.shouldStop: 476 break 477 try: 478 test(result, runcondition, options) 479 except TypeError: 480 # this might happen if a raw unittest.TestCase is defined 481 # and used with python (and not pytest) 482 warn("%s should extend lgc.testlib.TestCase instead of unittest.TestCase" 483 % test) 484 test(result) 485 return result
486 unittest.TestSuite.run = run 487 488 # backward compatibility: TestSuite might be imported from lgc.testlib 489 TestSuite = unittest.TestSuite 490 491 # python2.3 compat
492 -def __call__(self, *args, **kwds):
493 return self.run(*args, **kwds)
494 unittest.TestSuite.__call__ = __call__ 495 496
497 -class SkipAwareTextTestRunner(unittest.TextTestRunner):
498
499 - def __init__(self, stream=sys.stderr, verbosity=1, 500 exitfirst=False, capture=False, printonly=None, 501 pdbmode=False, cvg=None, test_pattern=None, 502 skipped_patterns=(), options=None):
503 super(SkipAwareTextTestRunner, self).__init__(stream=stream, 504 verbosity=verbosity) 505 self.exitfirst = exitfirst 506 self.capture = capture 507 self.printonly = printonly 508 self.pdbmode = pdbmode 509 self.cvg = cvg 510 self.test_pattern = test_pattern 511 self.skipped_patterns = skipped_patterns 512 self.options = options
513
514 - def _this_is_skipped(self, testedname):
515 return any([(pat in testedname) for pat in self.skipped_patterns])
516
517 - def _runcondition(self, test, skipgenerator=True):
518 if isinstance(test, InnerTest): 519 testname = test.name 520 else: 521 if isinstance(test, TestCase): 522 meth = test._get_test_method() 523 func = meth.im_func 524 testname = '%s.%s' % (meth.im_class.__name__, func.__name__) 525 elif isinstance(test, types.FunctionType): 526 func = test 527 testname = func.__name__ 528 elif isinstance(test, types.MethodType): 529 func = test.im_func 530 testname = '%s.%s' % (test.im_class.__name__, func.__name__) 531 else: 532 return True # Not sure when this happens 533 534 if is_generator(func) and skipgenerator: 535 return self.does_match_tags(func) # Let inner tests decide at run time 536 537 # print 'testname', testname, self.test_pattern 538 if self._this_is_skipped(testname): 539 return False # this was explicitly skipped 540 if self.test_pattern is not None: 541 try: 542 classpattern, testpattern = self.test_pattern.split('.') 543 klass, name = testname.split('.') 544 if classpattern not in klass or testpattern not in name: 545 return False 546 except ValueError: 547 if self.test_pattern not in testname: 548 return False 549 550 return self.does_match_tags(test)
551
552 - def does_match_tags(self, test):
553 if self.options is not None: 554 tags_pattern = getattr(self.options, 'tags_pattern', None) 555 if tags_pattern is not None: 556 tags = getattr(test, 'tags', Tags()) 557 return tags.match(tags_pattern) 558 return True # no pattern
559
560 - def _makeResult(self):
561 return SkipAwareTestResult(self.stream, self.descriptions, 562 self.verbosity, self.exitfirst, self.capture, 563 self.printonly, self.pdbmode, self.cvg)
564
565 - def run(self, test):
566 "Run the given test case or test suite." 567 result = self._makeResult() 568 startTime = time.time() 569 test(result, self._runcondition, self.options) 570 stopTime = time.time() 571 timeTaken = stopTime - startTime 572 result.printErrors() 573 self.stream.writeln(result.separator2) 574 run = result.testsRun 575 self.stream.writeln("Ran %d test%s in %.3fs" % 576 (run, run != 1 and "s" or "", timeTaken)) 577 self.stream.writeln() 578 if not result.wasSuccessful(): 579 if isatty(self.stream): 580 self.stream.write(textutils.colorize_ansi("FAILED", color='red')) 581 else: 582 self.stream.write("FAILED") 583 else: 584 if isatty(self.stream): 585 self.stream.write(textutils.colorize_ansi("OK", color='green')) 586 else: 587 self.stream.write("OK") 588 failed, errored, skipped = map(len, (result.failures, result.errors, 589 result.skipped)) 590 591 det_results = [] 592 for name, value in (("failures", result.failures), 593 ("errors",result.errors), 594 ("skipped", result.skipped)): 595 if value: 596 det_results.append("%s=%i" % (name, len(value))) 597 if det_results: 598 self.stream.write(" (") 599 self.stream.write(', '.join(det_results)) 600 self.stream.write(")") 601 self.stream.writeln("") 602 return result
603 604
605 -class keywords(dict):
606 """Keyword args (**kwargs) support for generative tests."""
607
608 -class starargs(tuple):
609 """Variable arguments (*args) for generative tests."""
610 - def __new__(cls, *args):
611 return tuple.__new__(cls, args)
612 613 614
615 -class NonStrictTestLoader(unittest.TestLoader):
616 """ 617 Overrides default testloader to be able to omit classname when 618 specifying tests to run on command line. 619 620 For example, if the file test_foo.py contains :: 621 622 class FooTC(TestCase): 623 def test_foo1(self): # ... 624 def test_foo2(self): # ... 625 def test_bar1(self): # ... 626 627 class BarTC(TestCase): 628 def test_bar2(self): # ... 629 630 'python test_foo.py' will run the 3 tests in FooTC 631 'python test_foo.py FooTC' will run the 3 tests in FooTC 632 'python test_foo.py test_foo' will run test_foo1 and test_foo2 633 'python test_foo.py test_foo1' will run test_foo1 634 'python test_foo.py test_bar' will run FooTC.test_bar1 and BarTC.test_bar2 635 """ 636
637 - def __init__(self):
638 self.skipped_patterns = []
639
640 - def loadTestsFromNames(self, names, module=None):
641 suites = [] 642 for name in names: 643 suites.extend(self.loadTestsFromName(name, module)) 644 return self.suiteClass(suites)
645
646 - def _collect_tests(self, module):
647 tests = {} 648 for obj in vars(module).values(): 649 if (issubclass(type(obj), (types.ClassType, type)) and 650 issubclass(obj, unittest.TestCase)): 651 classname = obj.__name__ 652 if self._this_is_skipped(classname): 653 continue 654 methodnames = [] 655 # obj is a TestCase class 656 for attrname in dir(obj): 657 if attrname.startswith(self.testMethodPrefix): 658 attr = getattr(obj, attrname) 659 if callable(attr): 660 methodnames.append(attrname) 661 # keep track of class (obj) for convenience 662 tests[classname] = (obj, methodnames) 663 return tests
664
665 - def loadTestsFromSuite(self, module, suitename):
666 try: 667 suite = getattr(module, suitename)() 668 except AttributeError: 669 return [] 670 assert hasattr(suite, '_tests'), \ 671 "%s.%s is not a valid TestSuite" % (module.__name__, suitename) 672 # python2.3 does not implement __iter__ on suites, we need to return 673 # _tests explicitly 674 return suite._tests
675
676 - def loadTestsFromName(self, name, module=None):
677 parts = name.split('.') 678 if module is None or len(parts) > 2: 679 # let the base class do its job here 680 return [super(NonStrictTestLoader, self).loadTestsFromName(name)] 681 tests = self._collect_tests(module) 682 # import pprint 683 # pprint.pprint(tests) 684 collected = [] 685 if len(parts) == 1: 686 pattern = parts[0] 687 if callable(getattr(module, pattern, None) 688 ) and pattern not in tests: 689 # consider it as a suite 690 return self.loadTestsFromSuite(module, pattern) 691 if pattern in tests: 692 # case python unittest_foo.py MyTestTC 693 klass, methodnames = tests[pattern] 694 for methodname in methodnames: 695 collected = [klass(methodname) 696 for methodname in methodnames] 697 else: 698 # case python unittest_foo.py something 699 for klass, methodnames in tests.values(): 700 collected += [klass(methodname) 701 for methodname in methodnames] 702 elif len(parts) == 2: 703 # case "MyClass.test_1" 704 classname, pattern = parts 705 klass, methodnames = tests.get(classname, (None, [])) 706 for methodname in methodnames: 707 collected = [klass(methodname) for methodname in methodnames] 708 return collected
709
710 - def _this_is_skipped(self, testedname):
711 return any([(pat in testedname) for pat in self.skipped_patterns])
712
713 - def getTestCaseNames(self, testCaseClass):
714 """Return a sorted sequence of method names found within testCaseClass 715 """ 716 is_skipped = self._this_is_skipped 717 if is_skipped(testCaseClass.__name__): 718 return [] 719 testnames = super(NonStrictTestLoader, self).getTestCaseNames( 720 testCaseClass) 721 return [testname for testname in testnames if not is_skipped(testname)]
722 723
724 -class SkipAwareTestProgram(unittest.TestProgram):
725 # XXX: don't try to stay close to unittest.py, use optparse 726 USAGE = """\ 727 Usage: %(progName)s [options] [test] [...] 728 729 Options: 730 -h, --help Show this message 731 -v, --verbose Verbose output 732 -i, --pdb Enable test failure inspection 733 -x, --exitfirst Exit on first failure 734 -c, --capture Captures and prints standard out/err only on errors 735 -p, --printonly Only prints lines matching specified pattern 736 (implies capture) 737 -s, --skip skip test matching this pattern (no regexp for now) 738 -q, --quiet Minimal output 739 740 -m, --match Run only test whose tag match this pattern 741 742 Examples: 743 %(progName)s - run default set of tests 744 %(progName)s MyTestSuite - run suite 'MyTestSuite' 745 %(progName)s MyTestCase.testSomething - run MyTestCase.testSomething 746 %(progName)s MyTestCase - run all 'test*' test methods 747 in MyTestCase 748 """
749 - def __init__(self, module='__main__', defaultTest=None, batchmode=False, 750 cvg=None, options=None, outstream=sys.stderr):
751 self.batchmode = batchmode 752 self.cvg = cvg 753 self.options = options 754 self.outstream = outstream 755 super(SkipAwareTestProgram, self).__init__( 756 module=module, defaultTest=defaultTest, 757 testLoader=NonStrictTestLoader())
758
759 - def parseArgs(self, argv):
760 self.pdbmode = False 761 self.exitfirst = False 762 self.capture = 0 763 self.printonly = None 764 self.skipped_patterns = [] 765 self.test_pattern = None 766 self.tags_pattern = None 767 import getopt 768 try: 769 options, args = getopt.getopt(argv[1:], 'hHvixrqcp:s:m:', 770 ['help', 'verbose', 'quiet', 'pdb', 771 'exitfirst', 'restart', 'capture', 'printonly=', 772 'skip=', 'match=']) 773 for opt, value in options: 774 if opt in ('-h', '-H', '--help'): 775 self.usageExit() 776 if opt in ('-i', '--pdb'): 777 self.pdbmode = True 778 if opt in ('-x', '--exitfirst'): 779 self.exitfirst = True 780 if opt in ('-r', '--restart'): 781 self.restart = True 782 self.exitfirst = True 783 if opt in ('-q', '--quiet'): 784 self.verbosity = 0 785 if opt in ('-v', '--verbose'): 786 self.verbosity = 2 787 if opt in ('-c', '--capture'): 788 self.capture += 1 789 if opt in ('-p', '--printonly'): 790 self.printonly = re.compile(value) 791 if opt in ('-s', '--skip'): 792 self.skipped_patterns = [pat.strip() for pat in 793 value.split(', ')] 794 if opt in ('-m', '--match'): 795 #self.tags_pattern = value 796 self.options["tag_pattern"] = value 797 self.testLoader.skipped_patterns = self.skipped_patterns 798 if self.printonly is not None: 799 self.capture += 1 800 if len(args) == 0 and self.defaultTest is None: 801 suitefunc = getattr(self.module, 'suite', None) 802 if isinstance(suitefunc, (types.FunctionType, 803 types.MethodType)): 804 self.test = self.module.suite() 805 else: 806 self.test = self.testLoader.loadTestsFromModule(self.module) 807 return 808 if len(args) > 0: 809 self.test_pattern = args[0] 810 self.testNames = args 811 else: 812 self.testNames = (self.defaultTest, ) 813 self.createTests() 814 except getopt.error, msg: 815 self.usageExit(msg)
816 817
818 - def runTests(self):
819 if hasattr(self.module, 'setup_module'): 820 try: 821 self.module.setup_module(self.options) 822 except Exception, exc: 823 print 'setup_module error:', exc 824 sys.exit(1) 825 self.testRunner = SkipAwareTextTestRunner(verbosity=self.verbosity, 826 stream=self.outstream, 827 exitfirst=self.exitfirst, 828 capture=self.capture, 829 printonly=self.printonly, 830 pdbmode=self.pdbmode, 831 cvg=self.cvg, 832 test_pattern=self.test_pattern, 833 skipped_patterns=self.skipped_patterns, 834 options=self.options) 835 836 def removeSucceededTests(obj, succTests): 837 """ Recurcive function that removes succTests from 838 a TestSuite or TestCase 839 """ 840 if isinstance(obj, TestSuite): 841 removeSucceededTests(obj._tests, succTests) 842 if isinstance(obj, list): 843 for el in obj[:]: 844 if isinstance(el, TestSuite): 845 removeSucceededTests(el, succTests) 846 elif isinstance(el, TestCase): 847 descr = '.'.join((el.__class__.__module__, 848 el.__class__.__name__, 849 el._testMethodName)) 850 if descr in succTests: 851 obj.remove(el)
852 853 if self.options.restart: 854 # retrieve succeeded tests from FILE_RESTART 855 try: 856 restartfile = open(FILE_RESTART, 'r') 857 try: 858 try: 859 succeededtests = list(elem.rstrip('\n\r') for elem in 860 restartfile.readlines()) 861 removeSucceededTests(self.test, succeededtests) 862 except Exception, e: 863 raise e 864 finally: 865 restartfile.close() 866 except Exception ,e: 867 raise "Error while reading \ 868 succeeded tests into", osp.join(os.getcwd(),FILE_RESTART) 869 870 result = self.testRunner.run(self.test) 871 if hasattr(self.module, 'teardown_module'): 872 try: 873 self.module.teardown_module(self.options) 874 except Exception, exc: 875 print 'teardown_module error:', exc 876 sys.exit(1) 877 if os.environ.get('PYDEBUG'): 878 warn("PYDEBUG usage is deprecated, use -i / --pdb instead", 879 DeprecationWarning) 880 self.pdbmode = True 881 if result.debuggers and self.pdbmode: 882 start_interactive_mode(result) 883 if not self.batchmode: 884 sys.exit(not result.wasSuccessful()) 885 self.result = result
886 887 888 889
890 -class FDCapture:
891 """adapted from py lib (http://codespeak.net/py) 892 Capture IO to/from a given os-level filedescriptor. 893 """
894 - def __init__(self, fd, attr='stdout', printonly=None):
895 self.targetfd = fd 896 self.tmpfile = os.tmpfile() # self.maketempfile() 897 self.printonly = printonly 898 # save original file descriptor 899 self._savefd = os.dup(fd) 900 # override original file descriptor 901 os.dup2(self.tmpfile.fileno(), fd) 902 # also modify sys module directly 903 self.oldval = getattr(sys, attr) 904 setattr(sys, attr, self) # self.tmpfile) 905 self.attr = attr
906
907 - def write(self, msg):
908 # msg might be composed of several lines 909 for line in msg.splitlines(): 910 line += '\n' # keepdend=True is not enough 911 if self.printonly is None or self.printonly.search(line) is None: 912 self.tmpfile.write(line) 913 else: 914 os.write(self._savefd, line)
915 916 ## def maketempfile(self): 917 ## tmpf = os.tmpfile() 918 ## fd = os.dup(tmpf.fileno()) 919 ## newf = os.fdopen(fd, tmpf.mode, 0) # No buffering 920 ## tmpf.close() 921 ## return newf 922
923 - def restore(self):
924 """restore original fd and returns captured output""" 925 #XXX: hack hack hack 926 self.tmpfile.flush() 927 try: 928 ref_file = getattr(sys, '__%s__' % self.attr) 929 ref_file.flush() 930 except AttributeError: 931 pass 932 if hasattr(self.oldval, 'flush'): 933 self.oldval.flush() 934 # restore original file descriptor 935 os.dup2(self._savefd, self.targetfd) 936 # restore sys module 937 setattr(sys, self.attr, self.oldval) 938 # close backup descriptor 939 os.close(self._savefd) 940 # go to beginning of file and read it 941 self.tmpfile.seek(0) 942 return self.tmpfile.read()
943 944
945 -def _capture(which='stdout', printonly=None):
946 """private method, should not be called directly 947 (cf. capture_stdout() and capture_stderr()) 948 """ 949 assert which in ('stdout', 'stderr' 950 ), "Can only capture stdout or stderr, not %s" % which 951 if which == 'stdout': 952 fd = 1 953 else: 954 fd = 2 955 return FDCapture(fd, which, printonly)
956
957 -def capture_stdout(printonly=None):
958 """captures the standard output 959 960 returns a handle object which has a `restore()` method. 961 The restore() method returns the captured stdout and restores it 962 """ 963 return _capture('stdout', printonly)
964
965 -def capture_stderr(printonly=None):
966 """captures the standard error output 967 968 returns a handle object which has a `restore()` method. 969 The restore() method returns the captured stderr and restores it 970 """ 971 return _capture('stderr', printonly)
972 973
974 -def unittest_main(module='__main__', defaultTest=None, 975 batchmode=False, cvg=None, options=None, 976 outstream=sys.stderr):
977 """use this functon if you want to have the same functionality 978 as unittest.main""" 979 return SkipAwareTestProgram(module, defaultTest, batchmode, 980 cvg, options, outstream)
981
982 -class TestSkipped(Exception):
983 """raised when a test is skipped"""
984
985 -def is_generator(function):
986 flags = function.func_code.co_flags 987 return flags & CO_GENERATOR
988 989
990 -def parse_generative_args(params):
991 args = [] 992 varargs = () 993 kwargs = {} 994 flags = 0 # 2 <=> starargs, 4 <=> kwargs 995 for param in params: 996 if isinstance(param, starargs): 997 varargs = param 998 if flags: 999 raise TypeError('found starargs after keywords !') 1000 flags |= 2 1001 args += list(varargs) 1002 elif isinstance(param, keywords): 1003 kwargs = param 1004 if flags & 4: 1005 raise TypeError('got multiple keywords parameters') 1006 flags |= 4 1007 elif flags & 2 or flags & 4: 1008 raise TypeError('found parameters after kwargs or args') 1009 else: 1010 args.append(param) 1011 1012 return args, kwargs
1013
1014 -class InnerTest(tuple):
1015 - def __new__(cls, name, *data):
1016 instance = tuple.__new__(cls, data) 1017 instance.name = name 1018 return instance
1019
1020 -class ClassGetProperty(object):
1021 """this is a simple property-like class but for 1022 class attributes. 1023 """ 1024
1025 - def __init__(self, getter):
1026 self.getter = getter
1027
1028 - def __get__(self, obj, objtype): # pylint: disable-msg=W0613
1029 "__get__(objn objtype) -> objtype" 1030 return self.getter(objtype)
1031 1032
1033 -class TestCase(unittest.TestCase):
1034 """unittest.TestCase with some additional methods""" 1035 1036 capture = False 1037 pdbclass = Debugger 1038
1039 - def __init__(self, methodName='runTest'):
1040 super(TestCase, self).__init__(methodName) 1041 # internal API changed in python2.5 1042 if sys.version_info >= (2, 5): 1043 self.__exc_info = self._exc_info 1044 self.__testMethodName = self._testMethodName 1045 else: 1046 # let's give easier access to _testMethodName to every subclasses 1047 self._testMethodName = self.__testMethodName 1048 self._captured_stdout = "" 1049 self._captured_stderr = "" 1050 self._out = [] 1051 self._err = [] 1052 self._current_test_descr = None 1053 self._options_ = None
1054
1055 - def datadir(cls): # pylint: disable-msg=E0213
1056 """helper attribute holding the standard test's data directory 1057 1058 NOTE: this is a logilab's standard 1059 """ 1060 mod = __import__(cls.__module__) 1061 return osp.join(osp.dirname(osp.abspath(mod.__file__)), 'data')
1062 # cache it (use a class method to cache on class since TestCase is 1063 # instantiated for each test run) 1064 datadir = ClassGetProperty(cached(datadir)) 1065
1066 - def datapath(self, fname):
1067 """joins the object's datadir and `fname`""" 1068 return osp.join(self.datadir, fname)
1069
1070 - def set_description(self, descr):
1071 """sets the current test's description. 1072 This can be useful for generative tests because it allows to specify 1073 a description per yield 1074 """ 1075 self._current_test_descr = descr
1076 1077 # override default's unittest.py feature
1078 - def shortDescription(self):
1079 """override default unitest shortDescription to handle correctly 1080 generative tests 1081 """ 1082 if self._current_test_descr is not None: 1083 return self._current_test_descr 1084 return super(TestCase, self).shortDescription()
1085 1086
1087 - def captured_output(self):
1088 """return a two tuple with standard output and error stripped""" 1089 return self._captured_stdout.strip(), self._captured_stderr.strip()
1090
1091 - def _start_capture(self):
1092 """start_capture if enable""" 1093 if self.capture: 1094 self.start_capture()
1095
1096 - def _stop_capture(self):
1097 """stop_capture and restore previous output""" 1098 self._force_output_restore()
1099
1100 - def start_capture(self, printonly=None):
1101 """start_capture""" 1102 self._out.append(capture_stdout(printonly or self._printonly)) 1103 self._err.append(capture_stderr(printonly or self._printonly))
1104
1105 - def printonly(self, pattern, flags=0):
1106 """set the pattern of line to print""" 1107 rgx = re.compile(pattern, flags) 1108 if self._out: 1109 self._out[-1].printonly = rgx 1110 self._err[-1].printonly = rgx 1111 else: 1112 self.start_capture(printonly=rgx)
1113
1114 - def stop_capture(self):
1115 """stop output and error capture""" 1116 if self._out: 1117 _out = self._out.pop() 1118 _err = self._err.pop() 1119 return _out.restore(), _err.restore() 1120 return '', ''
1121
1122 - def _force_output_restore(self):
1123 """remove all capture set""" 1124 while self._out: 1125 self._captured_stdout += self._out.pop().restore() 1126 self._captured_stderr += self._err.pop().restore()
1127
1128 - def quiet_run(self, result, func, *args, **kwargs):
1129 self._start_capture() 1130 try: 1131 func(*args, **kwargs) 1132 except (KeyboardInterrupt, SystemExit): 1133 self._stop_capture() 1134 raise 1135 except: 1136 self._stop_capture() 1137 result.addError(self, self.__exc_info()) 1138 return False 1139 self._stop_capture() 1140 return True
1141
1142 - def _get_test_method(self):
1143 """return the test method""" 1144 return getattr(self, self.__testMethodName)
1145 1146
1147 - def optval(self, option, default=None):
1148 """return the option value or default if the option is not define""" 1149 return getattr(self._options_, option, default)
1150
1151 - def __call__(self, result=None, runcondition=None, options=None):
1152 """rewrite TestCase.__call__ to support generative tests 1153 This is mostly a copy/paste from unittest.py (i.e same 1154 variable names, same logic, except for the generative tests part) 1155 """ 1156 if result is None: 1157 result = self.defaultTestResult() 1158 result.pdbclass = self.pdbclass 1159 # if self.capture is True here, it means it was explicitly specified 1160 # in the user's TestCase class. If not, do what was asked on cmd line 1161 self.capture = self.capture or getattr(result, 'capture', False) 1162 self._options_ = options 1163 self._printonly = getattr(result, 'printonly', None) 1164 # if result.cvg: 1165 # result.cvg.start() 1166 testMethod = self._get_test_method() 1167 if runcondition and not runcondition(testMethod): 1168 return # test is skipped 1169 result.startTest(self) 1170 try: 1171 if not self.quiet_run(result, self.setUp): 1172 return 1173 # generative tests 1174 if is_generator(testMethod.im_func): 1175 success = self._proceed_generative(result, testMethod, 1176 runcondition) 1177 else: 1178 status = self._proceed(result, testMethod) 1179 success = (status == 0) 1180 if not self.quiet_run(result, self.tearDown): 1181 return 1182 if success: 1183 if hasattr(options, "exitfirst") and options.exitfirst: 1184 # add this test to restart file 1185 try: 1186 restartfile = open(FILE_RESTART, 'a') 1187 try: 1188 try: 1189 descr = '.'.join((self.__class__.__module__, 1190 self.__class__.__name__, 1191 self._testMethodName)) 1192 restartfile.write(descr+os.linesep) 1193 except Exception, e: 1194 raise e 1195 finally: 1196 restartfile.close() 1197 except Exception, e: 1198 print >> sys.__stderr__, "Error while saving \ 1199 succeeded test into", osp.join(os.getcwd(),FILE_RESTART) 1200 raise e 1201 result.addSuccess(self) 1202 finally: 1203 # if result.cvg: 1204 # result.cvg.stop() 1205 result.stopTest(self)
1206 1207 1208
1209 - def _proceed_generative(self, result, testfunc, runcondition=None):
1210 # cancel startTest()'s increment 1211 result.testsRun -= 1 1212 self._start_capture() 1213 success = True 1214 try: 1215 for params in testfunc(): 1216 if runcondition and not runcondition(testfunc, 1217 skipgenerator=False): 1218 if not (isinstance(params, InnerTest) 1219 and runcondition(params)): 1220 continue 1221 if not isinstance(params, (tuple, list)): 1222 params = (params, ) 1223 func = params[0] 1224 args, kwargs = parse_generative_args(params[1:]) 1225 # increment test counter manually 1226 result.testsRun += 1 1227 status = self._proceed(result, func, args, kwargs) 1228 if status == 0: 1229 result.addSuccess(self) 1230 success = True 1231 else: 1232 success = False 1233 if status == 2: 1234 result.shouldStop = True 1235 if result.shouldStop: # either on error or on exitfirst + error 1236 break 1237 except: 1238 # if an error occurs between two yield 1239 result.addError(self, self.__exc_info()) 1240 success = False 1241 self._stop_capture() 1242 return success
1243
1244 - def _proceed(self, result, testfunc, args=(), kwargs=None):
1245 """proceed the actual test 1246 returns 0 on success, 1 on failure, 2 on error 1247 1248 Note: addSuccess can't be called here because we have to wait 1249 for tearDown to be successfully executed to declare the test as 1250 successful 1251 """ 1252 self._start_capture() 1253 kwargs = kwargs or {} 1254 try: 1255 testfunc(*args, **kwargs) 1256 self._stop_capture() 1257 except self.failureException: 1258 self._stop_capture() 1259 result.addFailure(self, self.__exc_info()) 1260 return 1 1261 except KeyboardInterrupt: 1262 self._stop_capture() 1263 raise 1264 except: 1265 self._stop_capture() 1266 result.addError(self, self.__exc_info()) 1267 return 2 1268 return 0
1269
1270 - def defaultTestResult(self):
1271 """return a new instance of the defaultTestResult""" 1272 return SkipAwareTestResult()
1273
1274 - def skip(self, msg=None):
1275 """mark a test as skipped for the <msg> reason""" 1276 msg = msg or 'test was skipped' 1277 raise TestSkipped(msg)
1278 skipped_test = deprecated_function(skip) 1279
1280 - def assertIn(self, object, set):
1281 """assert <object> are in <set>""" 1282 self.assert_(object in set, "%s not in %s" % (object, set))
1283
1284 - def assertNotIn(self, object, set):
1285 """assert <object> are not in <set>""" 1286 self.assert_(object not in set, "%s in %s" % (object, set))
1287
1288 - def assertDictEquals(self, dict1, dict2):
1289 """compares two dicts 1290 1291 If the two dict differ, the first difference is shown in the error 1292 message 1293 """ 1294 dict1 = dict(dict1) 1295 msgs = [] 1296 for key, value in dict2.items(): 1297 try: 1298 if dict1[key] != value: 1299 msgs.append('%r != %r for key %r' % (dict1[key], value, 1300 key)) 1301 del dict1[key] 1302 except KeyError: 1303 msgs.append('missing %r key' % key) 1304 if dict1: 1305 msgs.append('dict2 is lacking %r' % dict1) 1306 if msgs: 1307 self.fail(''.join(msgs))
1308 assertDictEqual = assertDictEquals 1309 1310 1311
1312 - def assertUnorderedIterableEquals(self, got, expected, msg=None):
1313 """compares two iterable and shows difference between both""" 1314 got, expected = list(got), list(expected) 1315 self.assertSetEqual(set(got), set(expected), msg) 1316 if len(got) != len(expected): 1317 if msg is None: 1318 msg = ['Iterable have the same elements but not the same number', 1319 '\t<element>\t<expected>i\t<got>'] 1320 got_count = {} 1321 expected_count = {} 1322 for element in got: 1323 got_count[element] = got_count.get(element,0) + 1 1324 for element in expected: 1325 expected_count[element] = expected_count.get(element,0) + 1 1326 # we know that got_count.key() == expected_count.key() 1327 # because of assertSetEquals 1328 for element, count in got_count.iteritems(): 1329 other_count = expected_count[element] 1330 if other_count != count: 1331 msg.append('\t%s\t%s\t%s' % (element, other_count, count)) 1332 1333 self.fail(msg)
1334 1335 assertUnorderedIterableEqual = assertUnorderedIterableEquals 1336 assertUnordIterEquals = assertUnordIterEqual = assertUnorderedIterableEqual 1337
1338 - def assertSetEquals(self,got,expected, msg=None):
1339 if not(isinstance(got, set) and isinstance(expected, set)): 1340 warn("the assertSetEquals function if now intended for set only."\ 1341 "use assertUnorderedIterableEquals instead.", 1342 DeprecationWarning, 2) 1343 return self.assertUnorderedIterableEquals(got,expected, msg) 1344 1345 items={} 1346 items['missing'] = expected - got 1347 items['unexpected'] = got - expected 1348 if any(items.itervalues()): 1349 if msg is None: 1350 msg = '\n'.join('%s:\n\t%s' % (key,"\n\t".join(str(value) for value in values)) 1351 for key, values in items.iteritems() if values) 1352 self.fail(msg)
1353 1354 1355 assertSetEqual = assertSetEquals 1356
1357 - def assertListEquals(self, list_1, list_2, msg=None):
1358 """compares two lists 1359 1360 If the two list differ, the first difference is shown in the error 1361 message 1362 """ 1363 _l1 = list_1[:] 1364 for i, value in enumerate(list_2): 1365 try: 1366 if _l1[0] != value: 1367 from pprint import pprint 1368 pprint(list_1) 1369 pprint(list_2) 1370 self.fail('%r != %r for index %d' % (_l1[0], value, i)) 1371 del _l1[0] 1372 except IndexError: 1373 if msg is None: 1374 msg = 'list_1 has only %d elements, not %s '\ 1375 '(at least %r missing)'% (i, len(list_2), value) 1376 self.fail(msg) 1377 if _l1: 1378 if msg is None: 1379 msg = 'list_2 is lacking %r' % _l1 1380 self.fail(msg)
1381 assertListEqual = assertListEquals 1382
1383 - def assertLinesEquals(self, list_1, list_2, msg=None):
1384 """assert list of lines are equal""" 1385 self.assertListEquals(list_1.splitlines(), list_2.splitlines(), msg)
1386 assertLineEqual = assertLinesEquals 1387
1388 - def assertXMLWellFormed(self, stream, msg=None):
1389 """asserts the XML stream is well-formed (no DTD conformance check)""" 1390 from xml.sax import make_parser, SAXParseException 1391 parser = make_parser() 1392 try: 1393 parser.parse(stream) 1394 except SAXParseException: 1395 if msg is None: 1396 msg = 'XML stream not well formed' 1397 self.fail(msg)
1398 assertXMLValid = deprecated_function(assertXMLWellFormed, 1399 'assertXMLValid renamed to more precise assertXMLWellFormed') 1400
1401 - def assertXMLStringWellFormed(self, xml_string, msg=None):
1402 """asserts the XML string is well-formed (no DTD conformance check)""" 1403 stream = StringIO(xml_string) 1404 self.assertXMLWellFormed(stream, msg)
1405 1406 assertXMLStringValid = deprecated_function( 1407 assertXMLStringWellFormed, 1408 'assertXMLStringValid renamed to more precise assertXMLStringWellFormed' 1409 ) 1410
1411 - def assertXMLEqualsTuple(self, element, tup):
1412 """compare an ElementTree Element to a tuple formatted as follow: 1413 (tagname, [attrib[, children[, text[, tail]]]])""" 1414 # check tag 1415 self.assertTextEquals(element.tag, tup[0]) 1416 # check attrib 1417 if len(element.attrib) or len(tup)>1: 1418 if len(tup)<=1: 1419 self.fail( "tuple %s has no attributes (%s expected)"%(tup, 1420 dict(element.attrib))) 1421 self.assertDictEquals(element.attrib, tup[1]) 1422 # check childrend 1423 if len(element) or len(tup)>2: 1424 if len(tup)<=2: 1425 self.fail( "tuple %s has no children (%i expected)"%(tup, 1426 len(element))) 1427 if len(element) != len(tup[2]): 1428 self.fail( "tuple %s has %i children%s (%i expected)"%(tup, 1429 len(tup[2]), 1430 ('', 's')[len(tup[2])>1], len(element))) 1431 for index in xrange(len(tup[2])): 1432 self.assertXMLEqualsTuple(element[index], tup[2][index]) 1433 #check text 1434 if element.text or len(tup)>3: 1435 if len(tup)<=3: 1436 self.fail( "tuple %s has no text value (%r expected)"%(tup, 1437 element.text)) 1438 self.assertTextEquals(element.text, tup[3]) 1439 #check tail 1440 if element.tail or len(tup)>4: 1441 if len(tup)<=4: 1442 self.fail( "tuple %s has no tail value (%r expected)"%(tup, 1443 element.tail)) 1444 self.assertTextEquals(element.tail, tup[4])
1445
1446 - def _difftext(self, lines1, lines2, junk=None, msg_prefix='Texts differ'):
1447 junk = junk or (' ', '\t') 1448 # result is a generator 1449 result = difflib.ndiff(lines1, lines2, charjunk=lambda x: x in junk) 1450 read = [] 1451 for line in result: 1452 read.append(line) 1453 # lines that don't start with a ' ' are diff ones 1454 if not line.startswith(' '): 1455 self.fail('\n'.join(['%s\n'%msg_prefix]+read + list(result)))
1456
1457 - def assertTextEquals(self, text1, text2, junk=None, 1458 msg_prefix='Text differ'):
1459 """compare two multiline strings (using difflib and splitlines())""" 1460 msg = [] 1461 if not isinstance(text1, basestring): 1462 msg.append('text1 is not a string (%s)'%(type(text1))) 1463 if not isinstance(text2, basestring): 1464 msg.append('text2 is not a string (%s)'%(type(text2))) 1465 if msg: 1466 self.fail('\n'.join(msg)) 1467 self._difftext(text1.splitlines(True), text2.splitlines(True), junk, 1468 msg_prefix)
1469 assertTextEqual = assertTextEquals 1470
1471 - def assertStreamEquals(self, stream1, stream2, junk=None, 1472 msg_prefix='Stream differ'):
1473 """compare two streams (using difflib and readlines())""" 1474 # if stream2 is stream2, readlines() on stream1 will also read lines 1475 # in stream2, so they'll appear different, although they're not 1476 if stream1 is stream2: 1477 return 1478 # make sure we compare from the beginning of the stream 1479 stream1.seek(0) 1480 stream2.seek(0) 1481 # ocmpare 1482 self._difftext(stream1.readlines(), stream2.readlines(), junk, 1483 msg_prefix)
1484 1485 assertStreamEqual = assertStreamEquals
1486 - def assertFileEquals(self, fname1, fname2, junk=(' ', '\t')):
1487 """compares two files using difflib""" 1488 self.assertStreamEqual(file(fname1), file(fname2), junk, 1489 msg_prefix='Files differs\n-:%s\n+:%s\n'%(fname1, fname2))
1490 assertFileEqual = assertFileEquals 1491 1492
1493 - def assertDirEquals(self, path_a, path_b):
1494 """compares two files using difflib""" 1495 assert osp.exists(path_a), "%s doesn't exists" % path_a 1496 assert osp.exists(path_b), "%s doesn't exists" % path_b 1497 1498 all_a = [ (ipath[len(path_a):].lstrip('/'), idirs, ifiles) 1499 for ipath, idirs, ifiles in os.walk(path_a)] 1500 all_a.sort(key=itemgetter(0)) 1501 1502 all_b = [ (ipath[len(path_b):].lstrip('/'), idirs, ifiles) 1503 for ipath, idirs, ifiles in os.walk(path_b)] 1504 all_b.sort(key=itemgetter(0)) 1505 1506 iter_a, iter_b = iter(all_a), iter(all_b) 1507 partial_iter = True 1508 ipath_a, idirs_a, ifiles_a = data_a = None, None, None 1509 while True: 1510 try: 1511 ipath_a, idirs_a, ifiles_a = datas_a = iter_a.next() 1512 partial_iter = False 1513 ipath_b, idirs_b, ifiles_b = datas_b = iter_b.next() 1514 partial_iter = True 1515 1516 1517 self.assert_(ipath_a == ipath_b, 1518 "unexpected %s in %s while looking %s from %s" % 1519 (ipath_a, path_a, ipath_b, path_b)) 1520 1521 1522 errors = {} 1523 sdirs_a = set(idirs_a) 1524 sdirs_b = set(idirs_b) 1525 errors["unexpected directories"] = sdirs_a - sdirs_b 1526 errors["missing directories"] = sdirs_b - sdirs_a 1527 1528 sfiles_a = set(ifiles_a) 1529 sfiles_b = set(ifiles_b) 1530 errors["unexpected files"] = sfiles_a - sfiles_b 1531 errors["missing files"] = sfiles_b - sfiles_a 1532 1533 1534 msgs = [ "%s: %s"% (name, items) 1535 for name, items in errors.iteritems() if items] 1536 1537 if msgs: 1538 msgs.insert(0,"%s and %s differ :" % ( 1539 osp.join(path_a, ipath_a), 1540 osp.join(path_b, ipath_b), 1541 )) 1542 self.fail("\n".join(msgs)) 1543 1544 for files in (ifiles_a, ifiles_b): 1545 files.sort() 1546 1547 for index, path in enumerate(ifiles_a): 1548 self.assertFileEquals(osp.join(path_a, ipath_a, path), 1549 osp.join(path_b, ipath_b, ifiles_b[index])) 1550 1551 except StopIteration: 1552 break
1553 1554 1555 assertDirEqual = assertDirEquals 1556 1557
1558 - def assertIsInstance(self, obj, klass, msg=None, strict=False):
1559 """compares two files using difflib""" 1560 if msg is None: 1561 if strict: 1562 msg = '%r is not of class %s but of %s' 1563 else: 1564 msg = '%r is not an instance of %s but of %s' 1565 msg = msg % (obj, klass, type(obj)) 1566 if strict: 1567 self.assert_(obj.__class__ is klass, msg) 1568 else: 1569 self.assert_(isinstance(obj, klass), msg)
1570
1571 - def assertIs(self, obj, other, msg=None):
1572 """compares identity of two reference""" 1573 if msg is None: 1574 msg = "%r is not %r"%(obj, other) 1575 self.assert_(obj is other, msg)
1576 1577
1578 - def assertIsNot(self, obj, other, msg=None):
1579 """compares identity of two reference""" 1580 if msg is None: 1581 msg = "%r is %r"%(obj, other) 1582 self.assert_(obj is not other, msg )
1583
1584 - def assertNone(self, obj, msg=None):
1585 """assert obj is None""" 1586 if msg is None: 1587 msg = "reference to %r when None expected"%(obj,) 1588 self.assert_( obj is None, msg )
1589
1590 - def assertNotNone(self, obj, msg=None):
1591 """assert obj is not None""" 1592 if msg is None: 1593 msg = "unexpected reference to None" 1594 self.assert_( obj is not None, msg )
1595
1596 - def failUnlessRaises(self, excClass, callableObj, *args, **kwargs):
1597 """override default failUnlessRaise method to return the raised 1598 exception instance. 1599 1600 Fail unless an exception of class excClass is thrown 1601 by callableObj when invoked with arguments args and keyword 1602 arguments kwargs. If a different type of exception is 1603 thrown, it will not be caught, and the test case will be 1604 deemed to have suffered an error, exactly as for an 1605 unexpected exception. 1606 """ 1607 try: 1608 callableObj(*args, **kwargs) 1609 except excClass, exc: 1610 return exc 1611 else: 1612 if hasattr(excClass, '__name__'): 1613 excName = excClass.__name__ 1614 else: 1615 excName = str(excClass) 1616 raise self.failureException, "%s not raised" % excName
1617 1618 assertRaises = failUnlessRaises 1619 1620 import doctest 1621
1622 -class SkippedSuite(unittest.TestSuite):
1623 - def test(self):
1624 """just there to trigger test execution""" 1625 self.skipped_test('doctest module has no DocTestSuite class')
1626 1627 1628 # DocTestFinder was introduced in python2.4 1629 if sys.version_info >= (2, 4):
1630 - class DocTestFinder(doctest.DocTestFinder):
1631
1632 - def __init__(self, *args, **kwargs):
1633 self.skipped = kwargs.pop('skipped', ()) 1634 doctest.DocTestFinder.__init__(self, *args, **kwargs)
1635
1636 - def _get_test(self, obj, name, module, globs, source_lines):
1637 """override default _get_test method to be able to skip tests 1638 according to skipped attribute's value 1639 1640 Note: Python (<=2.4) use a _name_filter which could be used for that 1641 purpose but it's no longer available in 2.5 1642 Python 2.5 seems to have a [SKIP] flag 1643 """ 1644 if getattr(obj, '__name__', '') in self.skipped: 1645 return None 1646 return doctest.DocTestFinder._get_test(self, obj, name, module, 1647 globs, source_lines)
1648 else: 1649 # this is a hack to make skipped work with python <= 2.3
1650 - class DocTestFinder(object):
1651 - def __init__(self, skipped):
1652 self.skipped = skipped 1653 self.original_find_tests = doctest._find_tests 1654 doctest._find_tests = self._find_tests
1655
1656 - def _find_tests(self, module, prefix=None):
1657 tests = [] 1658 for testinfo in self.original_find_tests(module, prefix): 1659 testname, _, _, _ = testinfo 1660 # testname looks like A.B.C.function_name 1661 testname = testname.split('.')[-1] 1662 if testname not in self.skipped: 1663 tests.append(testinfo) 1664 return tests
1665 1666
1667 -class DocTest(TestCase):
1668 """trigger module doctest 1669 I don't know how to make unittest.main consider the DocTestSuite instance 1670 without this hack 1671 """ 1672 skipped = ()
1673 - def __call__(self, result=None, runcondition=None, options=None):\ 1674 # pylint: disable-msg=W0613
1675 try: 1676 finder = DocTestFinder(skipped=self.skipped) 1677 if sys.version_info >= (2, 4): 1678 suite = doctest.DocTestSuite(self.module, test_finder=finder) 1679 else: 1680 suite = doctest.DocTestSuite(self.module) 1681 except AttributeError: 1682 suite = SkippedSuite() 1683 return suite.run(result)
1684 run = __call__ 1685
1686 - def test(self):
1687 """just there to trigger test execution"""
1688 1689 MAILBOX = None 1690
1691 -class MockSMTP:
1692 """fake smtplib.SMTP""" 1693
1694 - def __init__(self, host, port):
1695 self.host = host 1696 self.port = port 1697 global MAILBOX 1698 self.reveived = MAILBOX = []
1699
1700 - def set_debuglevel(self, debuglevel):
1701 """ignore debug level"""
1702
1703 - def sendmail(self, fromaddr, toaddres, body):
1704 """push sent mail in the mailbox""" 1705 self.reveived.append((fromaddr, toaddres, body))
1706
1707 - def quit(self):
1708 """ignore quit"""
1709 1710
1711 -class MockConfigParser(ConfigParser):
1712 """fake ConfigParser.ConfigParser""" 1713
1714 - def __init__(self, options):
1715 ConfigParser.__init__(self) 1716 for section, pairs in options.iteritems(): 1717 self.add_section(section) 1718 for key, value in pairs.iteritems(): 1719 self.set(section,key,value)
1720 - def write(self, _):
1721 raise NotImplementedError()
1722 1723
1724 -class MockConnection:
1725 """fake DB-API 2.0 connexion AND cursor (i.e. cursor() return self)""" 1726
1727 - def __init__(self, results):
1728 self.received = [] 1729 self.states = [] 1730 self.results = results
1731
1732 - def cursor(self):
1733 """Mock cursor method""" 1734 return self
1735 - def execute(self, query, args=None):
1736 """Mock execute method""" 1737 self.received.append( (query, args) )
1738 - def fetchone(self):
1739 """Mock fetchone method""" 1740 return self.results[0]
1741 - def fetchall(self):
1742 """Mock fetchall method""" 1743 return self.results
1744 - def commit(self):
1745 """Mock commiy method""" 1746 self.states.append( ('commit', len(self.received)) )
1747 - def rollback(self):
1748 """Mock rollback method""" 1749 self.states.append( ('rollback', len(self.received)) )
1750 - def close(self):
1751 """Mock close method""" 1752 pass
1753 1754 MockConnexion = class_renamed('MockConnexion', MockConnection) 1755
1756 -def mock_object(**params):
1757 """creates an object using params to set attributes 1758 >>> option = mock_object(verbose=False, index=range(5)) 1759 >>> option.verbose 1760 False 1761 >>> option.index 1762 [0, 1, 2, 3, 4] 1763 """ 1764 return type('Mock', (), params)()
1765 1766
1767 -def create_files(paths, chroot):
1768 """Creates directories and files found in <path>. 1769 1770 :param paths: list of relative paths to files or directories 1771 :param chroot: the root directory in which paths will be created 1772 1773 >>> from os.path import isdir, isfile 1774 >>> isdir('/tmp/a') 1775 False 1776 >>> create_files(['a/b/foo.py', 'a/b/c/', 'a/b/c/d/e.py'], '/tmp') 1777 >>> isdir('/tmp/a') 1778 True 1779 >>> isdir('/tmp/a/b/c') 1780 True 1781 >>> isfile('/tmp/a/b/c/d/e.py') 1782 True 1783 >>> isfile('/tmp/a/b/foo.py') 1784 True 1785 """ 1786 dirs, files = set(), set() 1787 for path in paths: 1788 path = osp.join(chroot, path) 1789 filename = osp.basename(path) 1790 # path is a directory path 1791 if filename == '': 1792 dirs.add(path) 1793 # path is a filename path 1794 else: 1795 dirs.add(osp.dirname(path)) 1796 files.add(path) 1797 for dirpath in dirs: 1798 if not osp.isdir(dirpath): 1799 os.makedirs(dirpath) 1800 for filepath in files: 1801 file(filepath, 'w').close()
1802
1803 -def enable_dbc(*args):
1804 """ 1805 Without arguments, return True if contracts can be enabled and should be 1806 enabled (see option -d), return False otherwise. 1807 1808 With arguments, return False if contracts can't or shouldn't be enabled, 1809 otherwise weave ContractAspect with items passed as arguments. 1810 """ 1811 if not ENABLE_DBC: 1812 return False 1813 try: 1814 from logilab.aspects.weaver import weaver 1815 from logilab.aspects.lib.contracts import ContractAspect 1816 except ImportError: 1817 sys.stderr.write( 1818 'Warning: logilab.aspects is not available. Contracts disabled.') 1819 return False 1820 for arg in args: 1821 weaver.weave_module(arg, ContractAspect) 1822 return True
1823 1824
1825 -class AttrObject: # XXX cf mock_object
1826 - def __init__(self, **kwargs):
1827 self.__dict__.update(kwargs)
1828
1829 -def tag(*args):
1830 """descriptor adding tag to a function""" 1831 def desc(func): 1832 assert not hasattr(func, 'tags') 1833 func.tags = Tags(args) 1834 return func
1835 return desc 1836
1837 -class Tags(set):
1838 """A set of tag able validate an expression"""
1839 - def __getitem__(self, key):
1840 return key in self
1841
1842 - def match(self, exp):
1843 return eval(exp, {}, self)
1844
1845 -def require_version(version):
1846 """ Compare version of python interpretor to the given one. Skip the test 1847 if older. 1848 """ 1849 def check_require_version(f): 1850 version_elements = version.split('.') 1851 try: 1852 compare = tuple([int(v) for v in version_elements]) 1853 except ValueError: 1854 raise ValueError('%s is not a correct version : should be X.Y[.Z].' % version) 1855 current = sys.version_info[:3] 1856 #print 'comp', current, compare 1857 if current < compare: 1858 #print 'version too old' 1859 def new_f(self, *args, **kwargs): 1860 self.skip('Need at least %s version of python. Current version is %s.' % (version, '.'.join([str(element) for element in current])))
1861 new_f.__name__ = f.__name__ 1862 return new_f 1863 else: 1864 #print 'version young enough' 1865 return f 1866 return check_require_version 1867
1868 -def require_module(module):
1869 """ Check if the given module is loaded. Skip the test if not. 1870 """ 1871 def check_require_module(f): 1872 try: 1873 __import__(module) 1874 #print module, 'imported' 1875 return f 1876 except ImportError: 1877 #print module, 'can not be imported' 1878 def new_f(self, *args, **kwargs): 1879 self.skip('%s can not be imported.' % module)
1880 new_f.__name__ = f.__name__ 1881 return new_f 1882 return check_require_module 1883