Package logilab-common-0 ::
Package 39 ::
Package 0 ::
Module testlib
|
|
1
2 """Run tests.
3
4 This will find all modules whose name match a given prefix in the test
5 directory, and run them. Various command line options provide
6 additional facilities.
7
8 Command line options:
9
10 -v: verbose -- run tests in verbose mode with output to stdout
11 -q: quiet -- don't print anything except if a test fails
12 -t: testdir -- directory where the tests will be found
13 -x: exclude -- add a test to exclude
14 -p: profile -- profiled execution
15 -c: capture -- capture standard out/err during tests
16 -d: dbc -- enable design-by-contract
17 -m: match -- only run test matching the tag pattern which follow
18
19 If no non-option arguments are present, prefixes used are 'test',
20 'regrtest', 'smoketest' and 'unittest'.
21
22 :copyright: 2003-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
23 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
24 :license: General Public License version 2 - http://www.gnu.org/licenses
25 """
26 __docformat__ = "restructuredtext en"
27
28
29
30
31 import sys
32 import os, os.path as osp
33 import re
34 import time
35 import getopt
36 import traceback
37 import inspect
38 import unittest
39 import difflib
40 import types
41 import tempfile
42 import math
43 from shutil import rmtree
44 from operator import itemgetter
45 from warnings import warn
46 from compiler.consts import CO_GENERATOR
47 from ConfigParser import ConfigParser
48
49 try:
50 from test import test_support
51 except ImportError:
52
56 test_support = TestSupport()
57
58 from logilab.common.deprecation import class_renamed, deprecated_function, \
59 obsolete
60
61 from logilab.common.compat import set, enumerate, any, sorted
62
63 from logilab.common.modutils import load_module_from_name
64 from logilab.common.debugger import Debugger, colorize_source
65 from logilab.common.decorators import cached
66 from logilab.common import textutils
67
68
69 __all__ = ['main', 'unittest_main', 'find_tests', 'run_test', 'spawn']
70
71 DEFAULT_PREFIXES = ('test', 'regrtest', 'smoketest', 'unittest',
72 'func', 'validation')
73
74 ENABLE_DBC = False
75
76 FILE_RESTART = ".pytest.restart"
77
78
79 __unittest = 1
80
81
83 """A decorator ensuring no temporary file left when the function return
84 Work only for temporary file create with the tempfile module"""
85 def proxy(*args, **kargs):
86
87 old_tmpdir = tempfile.gettempdir()
88 new_tmpdir = tempfile.mkdtemp("-logilab-common-testlib","temp-dir-")
89 tempfile.tempdir = new_tmpdir
90 try:
91 return callable(*args, **kargs)
92 finally:
93 try:
94 rmtree(new_tmpdir, ignore_errors=True)
95 finally:
96 tempfile.tempdir = old_tmpdir
97 return proxy
98
99
100 -def main(testdir=None, exitafter=True):
101 """Execute a test suite.
102
103 This also parses command-line options and modifies its behaviour
104 accordingly.
105
106 tests -- a list of strings containing test names (optional)
107 testdir -- the directory in which to look for tests (optional)
108
109 Users other than the Python test suite will certainly want to
110 specify testdir; if it's omitted, the directory containing the
111 Python test suite is searched for.
112
113 If the tests argument is omitted, the tests listed on the
114 command-line will be used. If that's empty, too, then all *.py
115 files beginning with test_ will be used.
116
117 """
118
119 try:
120 opts, args = getopt.getopt(sys.argv[1:], 'hvqxr:t:pcd', ['help'])
121 except getopt.error, msg:
122 print msg
123 print __doc__
124 return 2
125 verbose = 0
126 quiet = False
127 profile = False
128 exclude = []
129 capture = 0
130 for o, a in opts:
131 if o == '-v':
132 verbose += 1
133 elif o == '-q':
134 quiet = True
135 verbose = 0
136 elif o == '-x':
137 exclude.append(a)
138 elif o == '-t':
139 testdir = a
140 elif o == '-p':
141 profile = True
142 elif o == '-c':
143 capture += 1
144 elif o == '-d':
145 global ENABLE_DBC
146 ENABLE_DBC = True
147 elif o in ('-h', '--help'):
148 print __doc__
149 sys.exit(0)
150
151 args = [item.rstrip('.py') for item in args]
152 exclude = [item.rstrip('.py') for item in exclude]
153
154 if testdir is not None:
155 os.chdir(testdir)
156 sys.path.insert(0, '')
157 tests = find_tests('.', args or DEFAULT_PREFIXES, excludes=exclude)
158
159 test_support.verbose = verbose
160 if profile:
161 print >> sys.stderr, '** profiled run'
162 from hotshot import Profile
163 prof = Profile('stones.prof')
164 start_time, start_ctime = time.time(), time.clock()
165 good, bad, skipped, all_result = prof.runcall(run_tests, tests, quiet,
166 verbose, None, capture)
167 end_time, end_ctime = time.time(), time.clock()
168 prof.close()
169 else:
170 start_time, start_ctime = time.time(), time.clock()
171 good, bad, skipped, all_result = run_tests(tests, quiet, verbose, None,
172 capture)
173 end_time, end_ctime = time.time(), time.clock()
174 if not quiet:
175 print '*'*80
176 if all_result:
177 print 'Ran %s test cases in %0.2fs (%0.2fs CPU)' % (
178 all_result.testsRun, end_time - start_time,
179 end_ctime - start_ctime),
180 if all_result.errors:
181 print ', %s errors' % len(all_result.errors),
182 if all_result.failures:
183 print ', %s failed' % len(all_result.failures),
184 if all_result.skipped:
185 print ', %s skipped' % len(all_result.skipped),
186 print
187 if good:
188 if not bad and not skipped and len(good) > 1:
189 print "All",
190 print _count(len(good), "test"), "OK."
191 if bad:
192 print _count(len(bad), "test"), "failed:",
193 print ', '.join(bad)
194 if skipped:
195 print _count(len(skipped), "test"), "skipped:",
196 print ', '.join(['%s (%s)' % (test, msg) for test, msg in skipped])
197 if profile:
198 from hotshot import stats
199 stats = stats.load('stones.prof')
200 stats.sort_stats('time', 'calls')
201 stats.print_stats(30)
202 if exitafter:
203 sys.exit(len(bad) + len(skipped))
204 else:
205 sys.path.pop(0)
206 return len(bad)
207 main = obsolete("testlib.main() is obsolete, use the pytest tool instead")(main)
208
209
210 -def run_tests(tests, quiet, verbose, runner=None, capture=0):
211 """Execute a list of tests.
212
213 :rtype: tuple
214 :return: tuple (list of passed tests, list of failed tests, list of skipped tests)
215 """
216 good = []
217 bad = []
218 skipped = []
219 all_result = None
220 for test in tests:
221 if not quiet:
222 print
223 print '-'*80
224 print "Executing", test
225 result = run_test(test, verbose, runner, capture)
226 if type(result) is type(''):
227
228 skipped.append( (test, result))
229 else:
230 if all_result is None:
231 all_result = result
232 else:
233 all_result.testsRun += result.testsRun
234 all_result.failures += result.failures
235 all_result.errors += result.errors
236 all_result.skipped += result.skipped
237 if result.errors or result.failures:
238 bad.append(test)
239 if verbose:
240 print "test", test, \
241 "failed -- %s errors, %s failures" % (
242 len(result.errors), len(result.failures))
243 else:
244 good.append(test)
245
246 return good, bad, skipped, all_result
247
252 """
253 Return a list of all applicable test modules.
254 """
255 tests = []
256 for name in os.listdir(testdir):
257 if not suffix or name.endswith(suffix):
258 for prefix in prefixes:
259 if name.startswith(prefix):
260 if remove_suffix and name.endswith(suffix):
261 name = name[:-len(suffix)]
262 if name not in excludes:
263 tests.append(name)
264 tests.sort()
265 return tests
266
267
268 -def run_test(test, verbose, runner=None, capture=0):
269 """
270 Run a single test.
271
272 test -- the name of the test
273 verbose -- if true, print more messages
274 """
275 test_support.unload(test)
276 try:
277 m = load_module_from_name(test, path=sys.path)
278
279 try:
280 suite = m.suite
281 if callable(suite):
282 suite = suite()
283 except AttributeError:
284 loader = unittest.TestLoader()
285 suite = loader.loadTestsFromModule(m)
286 if runner is None:
287 runner = SkipAwareTextTestRunner(capture=capture)
288 return runner.run(suite)
289 except KeyboardInterrupt, v:
290 raise KeyboardInterrupt, v, sys.exc_info()[2]
291 except:
292
293 type, value = sys.exc_info()[:2]
294 msg = "test %s crashed -- %s : %s" % (test, type, value)
295 if verbose:
296 traceback.print_exc()
297 return msg
298
300 """format word according to n"""
301 if n == 1:
302 return "%d %s" % (n, word)
303 else:
304 return "%d %ss" % (n, word)
305
306
307
308
309
311 """starts an interactive shell so that the user can inspect errors
312 """
313 debuggers = result.debuggers
314 descrs = result.error_descrs + result.fail_descrs
315 if len(debuggers) == 1:
316
317 debuggers[0].start()
318 else:
319 while True:
320 testindex = 0
321 print "Choose a test to debug:"
322
323 print "\n".join(['\t%s : %s' % (i, descr) for i, (_, descr)
324 in enumerate(descrs)])
325 print "Type 'exit' (or ^D) to quit"
326 print
327 try:
328 todebug = raw_input('Enter a test name: ')
329 if todebug.strip().lower() == 'exit':
330 print
331 break
332 else:
333 try:
334 testindex = int(todebug)
335 debugger = debuggers[descrs[testindex][0]]
336 except (ValueError, IndexError):
337 print "ERROR: invalid test number %r" % (todebug, )
338 else:
339 debugger.start()
340 except (EOFError, KeyboardInterrupt):
341 print
342 break
343
344
345
346 from cStringIO import StringIO
347
349
350 - def __init__(self, stream, descriptions, verbosity,
351 exitfirst=False, capture=0, printonly=None,
352 pdbmode=False, cvg=None, colorize=False):
353 super(SkipAwareTestResult, self).__init__(stream,
354 descriptions, verbosity)
355 self.skipped = []
356 self.debuggers = []
357 self.fail_descrs = []
358 self.error_descrs = []
359 self.exitfirst = exitfirst
360 self.capture = capture
361 self.printonly = printonly
362 self.pdbmode = pdbmode
363 self.cvg = cvg
364 self.colorize = colorize
365 self.pdbclass = Debugger
366 self.verbose = verbosity > 1
367
369 return getattr(self, '%s_descrs' % flavour.lower())
370
372 self.descrs_for(flavour).append( (len(self.debuggers), test_descr) )
373 if self.pdbmode:
374 self.debuggers.append(self.pdbclass(sys.exc_info()[2]))
375
377 """Converts a sys.exc_info()-style tuple of values into a string.
378
379 This method is overridden here because we want to colorize
380 lines if --color is passed, and display local variables if
381 --verbose is passed
382 """
383 exctype, exc, tb = err
384 output = ['Traceback (most recent call last)']
385 frames = inspect.getinnerframes(tb)
386 colorize = self.colorize
387
388
389
390 nb_frames_skipped = self._count_relevant_tb_levels(tb.tb_next)
391 for index, (frame, filename, lineno, funcname, ctx, ctxindex) in enumerate(frames):
392 if not (0 < index <= nb_frames_skipped):
393 continue
394 filename = osp.abspath(filename)
395 if ctx is None:
396 source = '<no source available>'
397 else:
398 source = ''.join(ctx)
399 if colorize:
400 filename = textutils.colorize_ansi(filename, 'magenta')
401 source = colorize_source(source)
402 output.append(' File "%s", line %s, in %s' % (filename, lineno, funcname))
403 output.append(' %s' % source.strip())
404 if self.verbose:
405 output.append('%r == %r' % (dir(frame), test.__module__))
406 output.append('')
407 output.append(' ' + ' local variables '.center(66, '-'))
408 for varname, value in sorted(frame.f_locals.items()):
409 output.append(' %s: %r' % (varname, value))
410 if varname == 'self':
411 for varname, value in sorted(vars(value).items()):
412 output.append(' self.%s: %r' % (varname, value))
413 output.append(' ' + '-' * 66)
414 output.append('')
415 output.append(''.join(traceback.format_exception_only(exctype, exc)))
416 return '\n'.join(output)
417
419 """err == (exc_type, exc, tcbk)"""
420 exc_type, exc, _ = err
421 if exc_type == TestSkipped:
422 self.addSkipped(test, exc)
423 else:
424 if self.exitfirst:
425 self.shouldStop = True
426 descr = self.getDescription(test)
427 super(SkipAwareTestResult, self).addError(test, err)
428 self._create_pdb(descr, 'error')
429
431 if self.exitfirst:
432 self.shouldStop = True
433 descr = self.getDescription(test)
434 super(SkipAwareTestResult, self).addFailure(test, err)
435 self._create_pdb(descr, 'fail')
436
438 self.skipped.append((test, self.getDescription(test), reason))
439 if self.showAll:
440 self.stream.writeln("SKIPPED")
441 elif self.dots:
442 self.stream.write('S')
443
445 super(SkipAwareTestResult, self).printErrors()
446 self.printSkippedList()
447
449 for _, descr, err in self.skipped:
450 self.stream.writeln(self.separator1)
451 self.stream.writeln("%s: %s" % ('SKIPPED', descr))
452 self.stream.writeln("\t%s" % err)
453
455 for (_, descr), (test, err) in zip(self.descrs_for(flavour), errors):
456 self.stream.writeln(self.separator1)
457 if self.colorize:
458 self.stream.writeln("%s: %s" % (
459 textutils.colorize_ansi(flavour, color='red'), descr))
460 else:
461 self.stream.writeln("%s: %s" % (flavour, descr))
462
463 self.stream.writeln(self.separator2)
464 self.stream.writeln(err)
465
466 try:
467 output, errput = test.captured_output()
468 except AttributeError:
469 pass
470 else:
471 if output:
472 self.stream.writeln(self.separator2)
473 self.stream.writeln("captured stdout".center(
474 len(self.separator2)))
475 self.stream.writeln(self.separator2)
476 self.stream.writeln(output)
477 else:
478 self.stream.writeln('no stdout'.center(
479 len(self.separator2)))
480 if errput:
481 self.stream.writeln(self.separator2)
482 self.stream.writeln("captured stderr".center(
483 len(self.separator2)))
484 self.stream.writeln(self.separator2)
485 self.stream.writeln(errput)
486 else:
487 self.stream.writeln('no stderr'.center(
488 len(self.separator2)))
489
490
491 -def run(self, result, runcondition=None, options=None):
492 for test in self._tests:
493 if result.shouldStop:
494 break
495 try:
496 test(result, runcondition, options)
497 except TypeError:
498
499
500 warn("%s should extend lgc.testlib.TestCase instead of unittest.TestCase"
501 % test)
502 test(result)
503 return result
504 unittest.TestSuite.run = run
505
506
507 TestSuite = unittest.TestSuite
508
509
511 return self.run(*args, **kwds)
512 unittest.TestSuite.__call__ = __call__
513
514
515 -class SkipAwareTextTestRunner(unittest.TextTestRunner):
516
517 - def __init__(self, stream=sys.stderr, verbosity=1,
518 exitfirst=False, capture=False, printonly=None,
519 pdbmode=False, cvg=None, test_pattern=None,
520 skipped_patterns=(), colorize=False, options=None):
521 super(SkipAwareTextTestRunner, self).__init__(stream=stream,
522 verbosity=verbosity)
523 self.exitfirst = exitfirst
524 self.capture = capture
525 self.printonly = printonly
526 self.pdbmode = pdbmode
527 self.cvg = cvg
528 self.test_pattern = test_pattern
529 self.skipped_patterns = skipped_patterns
530 self.colorize = colorize
531 self.options = options
532
533 - def _this_is_skipped(self, testedname):
534 return any([(pat in testedname) for pat in self.skipped_patterns])
535
536 - def _runcondition(self, test, skipgenerator=True):
537 if isinstance(test, InnerTest):
538 testname = test.name
539 else:
540 if isinstance(test, TestCase):
541 meth = test._get_test_method()
542 func = meth.im_func
543 testname = '%s.%s' % (meth.im_class.__name__, func.__name__)
544 elif isinstance(test, types.FunctionType):
545 func = test
546 testname = func.__name__
547 elif isinstance(test, types.MethodType):
548 func = test.im_func
549 testname = '%s.%s' % (test.im_class.__name__, func.__name__)
550 else:
551 return True
552
553 if is_generator(func) and skipgenerator:
554 return self.does_match_tags(func)
555
556
557 if self._this_is_skipped(testname):
558 return False
559 if self.test_pattern is not None:
560 try:
561 classpattern, testpattern = self.test_pattern.split('.')
562 klass, name = testname.split('.')
563 if classpattern not in klass or testpattern not in name:
564 return False
565 except ValueError:
566 if self.test_pattern not in testname:
567 return False
568
569 return self.does_match_tags(test)
570
572 if self.options is not None:
573 tags_pattern = getattr(self.options, 'tags_pattern', None)
574 if tags_pattern is not None:
575 tags = getattr(test, 'tags', Tags())
576 return tags.match(tags_pattern)
577 return True
578
579 - def _makeResult(self):
580 return SkipAwareTestResult(self.stream, self.descriptions,
581 self.verbosity, self.exitfirst, self.capture,
582 self.printonly, self.pdbmode, self.cvg,
583 self.colorize)
584
585 - def run(self, test):
586 "Run the given test case or test suite."
587 result = self._makeResult()
588 startTime = time.time()
589 test(result, self._runcondition, self.options)
590 stopTime = time.time()
591 timeTaken = stopTime - startTime
592 result.printErrors()
593 self.stream.writeln(result.separator2)
594 run = result.testsRun
595 self.stream.writeln("Ran %d test%s in %.3fs" %
596 (run, run != 1 and "s" or "", timeTaken))
597 self.stream.writeln()
598 if not result.wasSuccessful():
599 if self.colorize:
600 self.stream.write(textutils.colorize_ansi("FAILED", color='red'))
601 else:
602 self.stream.write("FAILED")
603 else:
604 if self.colorize:
605 self.stream.write(textutils.colorize_ansi("OK", color='green'))
606 else:
607 self.stream.write("OK")
608 failed, errored, skipped = map(len, (result.failures, result.errors,
609 result.skipped))
610
611 det_results = []
612 for name, value in (("failures", result.failures),
613 ("errors",result.errors),
614 ("skipped", result.skipped)):
615 if value:
616 det_results.append("%s=%i" % (name, len(value)))
617 if det_results:
618 self.stream.write(" (")
619 self.stream.write(', '.join(det_results))
620 self.stream.write(")")
621 self.stream.writeln("")
622 return result
623
624
626 """Keyword args (**kwargs) support for generative tests."""
627
629 """Variable arguments (*args) for generative tests."""
631 return tuple.__new__(cls, args)
632
633
634
636 """
637 Overrides default testloader to be able to omit classname when
638 specifying tests to run on command line.
639
640 For example, if the file test_foo.py contains ::
641
642 class FooTC(TestCase):
643 def test_foo1(self): # ...
644 def test_foo2(self): # ...
645 def test_bar1(self): # ...
646
647 class BarTC(TestCase):
648 def test_bar2(self): # ...
649
650 'python test_foo.py' will run the 3 tests in FooTC
651 'python test_foo.py FooTC' will run the 3 tests in FooTC
652 'python test_foo.py test_foo' will run test_foo1 and test_foo2
653 'python test_foo.py test_foo1' will run test_foo1
654 'python test_foo.py test_bar' will run FooTC.test_bar1 and BarTC.test_bar2
655 """
656
658 self.skipped_patterns = []
659
661 suites = []
662 for name in names:
663 suites.extend(self.loadTestsFromName(name, module))
664 return self.suiteClass(suites)
665
667 tests = {}
668 for obj in vars(module).values():
669 if (issubclass(type(obj), (types.ClassType, type)) and
670 issubclass(obj, unittest.TestCase)):
671 classname = obj.__name__
672 if classname[0] == '_' or self._this_is_skipped(classname):
673 continue
674 methodnames = []
675
676 for attrname in dir(obj):
677 if attrname.startswith(self.testMethodPrefix):
678 attr = getattr(obj, attrname)
679 if callable(attr):
680 methodnames.append(attrname)
681
682 tests[classname] = (obj, methodnames)
683 return tests
684
686 try:
687 suite = getattr(module, suitename)()
688 except AttributeError:
689 return []
690 assert hasattr(suite, '_tests'), \
691 "%s.%s is not a valid TestSuite" % (module.__name__, suitename)
692
693
694 return suite._tests
695
697 parts = name.split('.')
698 if module is None or len(parts) > 2:
699
700 return [super(NonStrictTestLoader, self).loadTestsFromName(name)]
701 tests = self._collect_tests(module)
702
703
704 collected = []
705 if len(parts) == 1:
706 pattern = parts[0]
707 if callable(getattr(module, pattern, None)
708 ) and pattern not in tests:
709
710 return self.loadTestsFromSuite(module, pattern)
711 if pattern in tests:
712
713 klass, methodnames = tests[pattern]
714 for methodname in methodnames:
715 collected = [klass(methodname)
716 for methodname in methodnames]
717 else:
718
719 for klass, methodnames in tests.values():
720 collected += [klass(methodname)
721 for methodname in methodnames]
722 elif len(parts) == 2:
723
724 classname, pattern = parts
725 klass, methodnames = tests.get(classname, (None, []))
726 for methodname in methodnames:
727 collected = [klass(methodname) for methodname in methodnames]
728 return collected
729
731 return any([(pat in testedname) for pat in self.skipped_patterns])
732
734 """Return a sorted sequence of method names found within testCaseClass
735 """
736 is_skipped = self._this_is_skipped
737 classname = testCaseClass.__name__
738 if classname[0] == '_' or is_skipped(classname):
739 return []
740 testnames = super(NonStrictTestLoader, self).getTestCaseNames(
741 testCaseClass)
742 return [testname for testname in testnames if not is_skipped(testname)]
743
744
746
747 USAGE = """\
748 Usage: %(progName)s [options] [test] [...]
749
750 Options:
751 -h, --help Show this message
752 -v, --verbose Verbose output
753 -i, --pdb Enable test failure inspection
754 -x, --exitfirst Exit on first failure
755 -c, --capture Captures and prints standard out/err only on errors
756 -p, --printonly Only prints lines matching specified pattern
757 (implies capture)
758 -s, --skip skip test matching this pattern (no regexp for now)
759 -q, --quiet Minimal output
760 --color colorize tracebacks
761
762 -m, --match Run only test whose tag match this pattern
763
764 -P, --profile FILE: Run the tests using cProfile and saving results
765 in FILE
766
767 Examples:
768 %(progName)s - run default set of tests
769 %(progName)s MyTestSuite - run suite 'MyTestSuite'
770 %(progName)s MyTestCase.testSomething - run MyTestCase.testSomething
771 %(progName)s MyTestCase - run all 'test*' test methods
772 in MyTestCase
773 """
774 - def __init__(self, module='__main__', defaultTest=None, batchmode=False,
775 cvg=None, options=None, outstream=sys.stderr):
776 self.batchmode = batchmode
777 self.cvg = cvg
778 self.options = options
779 self.outstream = outstream
780 super(SkipAwareTestProgram, self).__init__(
781 module=module, defaultTest=defaultTest,
782 testLoader=NonStrictTestLoader())
783
785 self.pdbmode = False
786 self.exitfirst = False
787 self.capture = 0
788 self.printonly = None
789 self.skipped_patterns = []
790 self.test_pattern = None
791 self.tags_pattern = None
792 self.colorize = False
793 self.profile_name = None
794 import getopt
795 try:
796 options, args = getopt.getopt(argv[1:], 'hHvixrqcp:s:m:P:',
797 ['help', 'verbose', 'quiet', 'pdb',
798 'exitfirst', 'restart', 'capture', 'printonly=',
799 'skip=', 'color', 'match=', 'profile='])
800 for opt, value in options:
801 if opt in ('-h', '-H', '--help'):
802 self.usageExit()
803 if opt in ('-i', '--pdb'):
804 self.pdbmode = True
805 if opt in ('-x', '--exitfirst'):
806 self.exitfirst = True
807 if opt in ('-r', '--restart'):
808 self.restart = True
809 self.exitfirst = True
810 if opt in ('-q', '--quiet'):
811 self.verbosity = 0
812 if opt in ('-v', '--verbose'):
813 self.verbosity = 2
814 if opt in ('-c', '--capture'):
815 self.capture += 1
816 if opt in ('-p', '--printonly'):
817 self.printonly = re.compile(value)
818 if opt in ('-s', '--skip'):
819 self.skipped_patterns = [pat.strip() for pat in
820 value.split(', ')]
821 if opt == '--color':
822 self.colorize = True
823 if opt in ('-m', '--match'):
824
825 self.options["tag_pattern"] = value
826 if opt in ('-P', '--profile'):
827 self.profile_name = value
828 self.testLoader.skipped_patterns = self.skipped_patterns
829 if self.printonly is not None:
830 self.capture += 1
831 if len(args) == 0 and self.defaultTest is None:
832 suitefunc = getattr(self.module, 'suite', None)
833 if isinstance(suitefunc, (types.FunctionType,
834 types.MethodType)):
835 self.test = self.module.suite()
836 else:
837 self.test = self.testLoader.loadTestsFromModule(self.module)
838 return
839 if len(args) > 0:
840 self.test_pattern = args[0]
841 self.testNames = args
842 else:
843 self.testNames = (self.defaultTest, )
844 self.createTests()
845 except getopt.error, msg:
846 self.usageExit(msg)
847
848
850 if self.profile_name:
851 import cProfile
852 cProfile.runctx('self._runTests()', globals(), locals(), self.profile_name )
853 else:
854 return self._runTests()
855
857 if hasattr(self.module, 'setup_module'):
858 try:
859 self.module.setup_module(self.options)
860 except Exception, exc:
861 print 'setup_module error:', exc
862 sys.exit(1)
863 self.testRunner = SkipAwareTextTestRunner(verbosity=self.verbosity,
864 stream=self.outstream,
865 exitfirst=self.exitfirst,
866 capture=self.capture,
867 printonly=self.printonly,
868 pdbmode=self.pdbmode,
869 cvg=self.cvg,
870 test_pattern=self.test_pattern,
871 skipped_patterns=self.skipped_patterns,
872 colorize=self.colorize,
873 options=self.options)
874
875 def removeSucceededTests(obj, succTests):
876 """ Recurcive function that removes succTests from
877 a TestSuite or TestCase
878 """
879 if isinstance(obj, TestSuite):
880 removeSucceededTests(obj._tests, succTests)
881 if isinstance(obj, list):
882 for el in obj[:]:
883 if isinstance(el, TestSuite):
884 removeSucceededTests(el, succTests)
885 elif isinstance(el, TestCase):
886 descr = '.'.join((el.__class__.__module__,
887 el.__class__.__name__,
888 el._testMethodName))
889 if descr in succTests:
890 obj.remove(el)
891
892 if getattr(self.options, 'restart', False):
893
894 try:
895 restartfile = open(FILE_RESTART, 'r')
896 try:
897 try:
898 succeededtests = list(elem.rstrip('\n\r') for elem in
899 restartfile.readlines())
900 removeSucceededTests(self.test, succeededtests)
901 except Exception, e:
902 raise e
903 finally:
904 restartfile.close()
905 except Exception ,e:
906 raise "Error while reading \
907 succeeded tests into", osp.join(os.getcwd(),FILE_RESTART)
908
909 result = self.testRunner.run(self.test)
910 if hasattr(self.module, 'teardown_module'):
911 try:
912 self.module.teardown_module(self.options, result)
913 except Exception, exc:
914 print 'teardown_module error:', exc
915 sys.exit(1)
916 if os.environ.get('PYDEBUG'):
917 warn("PYDEBUG usage is deprecated, use -i / --pdb instead",
918 DeprecationWarning)
919 self.pdbmode = True
920 if result.debuggers and self.pdbmode:
921 start_interactive_mode(result)
922 if not self.batchmode:
923 sys.exit(not result.wasSuccessful())
924 self.result = result
925
926
927
928
930 """adapted from py lib (http://codespeak.net/py)
931 Capture IO to/from a given os-level filedescriptor.
932 """
933 - def __init__(self, fd, attr='stdout', printonly=None):
934 self.targetfd = fd
935 self.tmpfile = os.tmpfile()
936 self.printonly = printonly
937
938 self._savefd = os.dup(fd)
939
940 os.dup2(self.tmpfile.fileno(), fd)
941
942 self.oldval = getattr(sys, attr)
943 setattr(sys, attr, self)
944 self.attr = attr
945
947
948 for line in msg.splitlines():
949 line += '\n'
950 if self.printonly is None or self.printonly.search(line) is None:
951 self.tmpfile.write(line)
952 else:
953 os.write(self._savefd, line)
954
955
956
957
958
959
960
961
963 """restore original fd and returns captured output"""
964
965 self.tmpfile.flush()
966 try:
967 ref_file = getattr(sys, '__%s__' % self.attr)
968 ref_file.flush()
969 except AttributeError:
970 pass
971 if hasattr(self.oldval, 'flush'):
972 self.oldval.flush()
973
974 os.dup2(self._savefd, self.targetfd)
975
976 setattr(sys, self.attr, self.oldval)
977
978 os.close(self._savefd)
979
980 self.tmpfile.seek(0)
981 return self.tmpfile.read()
982
983
984 -def _capture(which='stdout', printonly=None):
985 """private method, should not be called directly
986 (cf. capture_stdout() and capture_stderr())
987 """
988 assert which in ('stdout', 'stderr'
989 ), "Can only capture stdout or stderr, not %s" % which
990 if which == 'stdout':
991 fd = 1
992 else:
993 fd = 2
994 return FDCapture(fd, which, printonly)
995
997 """captures the standard output
998
999 returns a handle object which has a `restore()` method.
1000 The restore() method returns the captured stdout and restores it
1001 """
1002 return _capture('stdout', printonly)
1003
1005 """captures the standard error output
1006
1007 returns a handle object which has a `restore()` method.
1008 The restore() method returns the captured stderr and restores it
1009 """
1010 return _capture('stderr', printonly)
1011
1012
1013 -def unittest_main(module='__main__', defaultTest=None,
1014 batchmode=False, cvg=None, options=None,
1015 outstream=sys.stderr):
1016 """use this functon if you want to have the same functionality
1017 as unittest.main"""
1018 return SkipAwareTestProgram(module, defaultTest, batchmode,
1019 cvg, options, outstream)
1020
1022 """raised when a test is skipped"""
1023
1025 flags = function.func_code.co_flags
1026 return flags & CO_GENERATOR
1027
1028
1030 args = []
1031 varargs = ()
1032 kwargs = {}
1033 flags = 0
1034 for param in params:
1035 if isinstance(param, starargs):
1036 varargs = param
1037 if flags:
1038 raise TypeError('found starargs after keywords !')
1039 flags |= 2
1040 args += list(varargs)
1041 elif isinstance(param, keywords):
1042 kwargs = param
1043 if flags & 4:
1044 raise TypeError('got multiple keywords parameters')
1045 flags |= 4
1046 elif flags & 2 or flags & 4:
1047 raise TypeError('found parameters after kwargs or args')
1048 else:
1049 args.append(param)
1050
1051 return args, kwargs
1052
1055 instance = tuple.__new__(cls, data)
1056 instance.name = name
1057 return instance
1058
1060 """this is a simple property-like class but for
1061 class attributes.
1062 """
1063
1065 self.getter = getter
1066
1068 "__get__(objn objtype) -> objtype"
1069 return self.getter(objtype)
1070
1071
1073 """unittest.TestCase with some additional methods"""
1074
1075 capture = False
1076 pdbclass = Debugger
1077
1078 - def __init__(self, methodName='runTest'):
1079 super(TestCase, self).__init__(methodName)
1080
1081 if sys.version_info >= (2, 5):
1082 self.__exc_info = self._exc_info
1083 self.__testMethodName = self._testMethodName
1084 else:
1085
1086 self._testMethodName = self.__testMethodName
1087 self._captured_stdout = ""
1088 self._captured_stderr = ""
1089 self._out = []
1090 self._err = []
1091 self._current_test_descr = None
1092 self._options_ = None
1093
1095 """helper attribute holding the standard test's data directory
1096
1097 NOTE: this is a logilab's standard
1098 """
1099 mod = __import__(cls.__module__)
1100 return osp.join(osp.dirname(osp.abspath(mod.__file__)), 'data')
1101
1102
1103 datadir = ClassGetProperty(cached(datadir))
1104
1106 """joins the object's datadir and `fname`"""
1107 return osp.join(self.datadir, fname)
1108
1110 """sets the current test's description.
1111 This can be useful for generative tests because it allows to specify
1112 a description per yield
1113 """
1114 self._current_test_descr = descr
1115
1116
1118 """override default unitest shortDescription to handle correctly
1119 generative tests
1120 """
1121 if self._current_test_descr is not None:
1122 return self._current_test_descr
1123 return super(TestCase, self).shortDescription()
1124
1125
1127 """return a two tuple with standard output and error stripped"""
1128 return self._captured_stdout.strip(), self._captured_stderr.strip()
1129
1131 """start_capture if enable"""
1132 if self.capture:
1133 self.start_capture()
1134
1136 """stop_capture and restore previous output"""
1137 self._force_output_restore()
1138
1140 """start_capture"""
1141 self._out.append(capture_stdout(printonly or self._printonly))
1142 self._err.append(capture_stderr(printonly or self._printonly))
1143
1145 """set the pattern of line to print"""
1146 rgx = re.compile(pattern, flags)
1147 if self._out:
1148 self._out[-1].printonly = rgx
1149 self._err[-1].printonly = rgx
1150 else:
1151 self.start_capture(printonly=rgx)
1152
1154 """stop output and error capture"""
1155 if self._out:
1156 _out = self._out.pop()
1157 _err = self._err.pop()
1158 return _out.restore(), _err.restore()
1159 return '', ''
1160
1162 """remove all capture set"""
1163 while self._out:
1164 self._captured_stdout += self._out.pop().restore()
1165 self._captured_stderr += self._err.pop().restore()
1166
1167 - def quiet_run(self, result, func, *args, **kwargs):
1168 self._start_capture()
1169 try:
1170 func(*args, **kwargs)
1171 except (KeyboardInterrupt, SystemExit):
1172 self._stop_capture()
1173 raise
1174 except:
1175 self._stop_capture()
1176 result.addError(self, self.__exc_info())
1177 return False
1178 self._stop_capture()
1179 return True
1180
1182 """return the test method"""
1183 return getattr(self, self.__testMethodName)
1184
1185
1186 - def optval(self, option, default=None):
1187 """return the option value or default if the option is not define"""
1188 return getattr(self._options_, option, default)
1189
1190 - def __call__(self, result=None, runcondition=None, options=None):
1191 """rewrite TestCase.__call__ to support generative tests
1192 This is mostly a copy/paste from unittest.py (i.e same
1193 variable names, same logic, except for the generative tests part)
1194 """
1195 if result is None:
1196 result = self.defaultTestResult()
1197 result.pdbclass = self.pdbclass
1198
1199
1200 self.capture = self.capture or getattr(result, 'capture', False)
1201 self._options_ = options
1202 self._printonly = getattr(result, 'printonly', None)
1203
1204
1205 testMethod = self._get_test_method()
1206 if runcondition and not runcondition(testMethod):
1207 return
1208 result.startTest(self)
1209 try:
1210 if not self.quiet_run(result, self.setUp):
1211 return
1212
1213 if is_generator(testMethod.im_func):
1214 success = self._proceed_generative(result, testMethod,
1215 runcondition)
1216 else:
1217 status = self._proceed(result, testMethod)
1218 success = (status == 0)
1219 if not self.quiet_run(result, self.tearDown):
1220 return
1221 if success:
1222 if hasattr(options, "exitfirst") and options.exitfirst:
1223
1224 try:
1225 restartfile = open(FILE_RESTART, 'a')
1226 try:
1227 try:
1228 descr = '.'.join((self.__class__.__module__,
1229 self.__class__.__name__,
1230 self._testMethodName))
1231 restartfile.write(descr+os.linesep)
1232 except Exception, e:
1233 raise e
1234 finally:
1235 restartfile.close()
1236 except Exception, e:
1237 print >> sys.__stderr__, "Error while saving \
1238 succeeded test into", osp.join(os.getcwd(),FILE_RESTART)
1239 raise e
1240 result.addSuccess(self)
1241 finally:
1242
1243
1244 result.stopTest(self)
1245
1246
1247
1249
1250 result.testsRun -= 1
1251 self._start_capture()
1252 success = True
1253 try:
1254 for params in testfunc():
1255 if runcondition and not runcondition(testfunc,
1256 skipgenerator=False):
1257 if not (isinstance(params, InnerTest)
1258 and runcondition(params)):
1259 continue
1260 if not isinstance(params, (tuple, list)):
1261 params = (params, )
1262 func = params[0]
1263 args, kwargs = parse_generative_args(params[1:])
1264
1265 result.testsRun += 1
1266 status = self._proceed(result, func, args, kwargs)
1267 if status == 0:
1268 result.addSuccess(self)
1269 success = True
1270 else:
1271 success = False
1272 if status == 2:
1273 result.shouldStop = True
1274 if result.shouldStop:
1275 break
1276 except:
1277
1278 result.addError(self, self.__exc_info())
1279 success = False
1280 self._stop_capture()
1281 return success
1282
1283 - def _proceed(self, result, testfunc, args=(), kwargs=None):
1284 """proceed the actual test
1285 returns 0 on success, 1 on failure, 2 on error
1286
1287 Note: addSuccess can't be called here because we have to wait
1288 for tearDown to be successfully executed to declare the test as
1289 successful
1290 """
1291 self._start_capture()
1292 kwargs = kwargs or {}
1293 try:
1294 testfunc(*args, **kwargs)
1295 self._stop_capture()
1296 except self.failureException:
1297 self._stop_capture()
1298 result.addFailure(self, self.__exc_info())
1299 return 1
1300 except KeyboardInterrupt:
1301 self._stop_capture()
1302 raise
1303 except:
1304 self._stop_capture()
1305 result.addError(self, self.__exc_info())
1306 return 2
1307 return 0
1308
1310 """return a new instance of the defaultTestResult"""
1311 return SkipAwareTestResult()
1312
1313 - def skip(self, msg=None):
1314 """mark a test as skipped for the <msg> reason"""
1315 msg = msg or 'test was skipped'
1316 raise TestSkipped(msg)
1317 skipped_test = deprecated_function(skip)
1318
1320 """assert <object> are in <set>"""
1321 self.assert_(object in set, "%s not in %s" % (object, set))
1322
1324 """assert <object> are not in <set>"""
1325 self.assert_(object not in set, "%s in %s" % (object, set))
1326
1328 """compares two dicts
1329
1330 If the two dict differ, the first difference is shown in the error
1331 message
1332 """
1333 dict1 = dict(dict1)
1334 msgs = []
1335 for key, value in dict2.items():
1336 try:
1337 if dict1[key] != value:
1338 msgs.append('%r != %r for key %r' % (dict1[key], value,
1339 key))
1340 del dict1[key]
1341 except KeyError:
1342 msgs.append('missing %r key' % key)
1343 if dict1:
1344 msgs.append('dict2 is lacking %r' % dict1)
1345 if msgs:
1346 self.fail(''.join(msgs))
1347 assertDictEqual = assertDictEquals
1348
1349
1350
1352 """compares two iterable and shows difference between both"""
1353 got, expected = list(got), list(expected)
1354 self.assertSetEqual(set(got), set(expected), msg)
1355 if len(got) != len(expected):
1356 if msg is None:
1357 msg = ['Iterable have the same elements but not the same number',
1358 '\t<element>\t<expected>i\t<got>']
1359 got_count = {}
1360 expected_count = {}
1361 for element in got:
1362 got_count[element] = got_count.get(element,0) + 1
1363 for element in expected:
1364 expected_count[element] = expected_count.get(element,0) + 1
1365
1366
1367 for element, count in got_count.iteritems():
1368 other_count = expected_count[element]
1369 if other_count != count:
1370 msg.append('\t%s\t%s\t%s' % (element, other_count, count))
1371
1372 self.fail(msg)
1373
1374 assertUnorderedIterableEqual = assertUnorderedIterableEquals
1375 assertUnordIterEquals = assertUnordIterEqual = assertUnorderedIterableEqual
1376
1378 if not(isinstance(got, set) and isinstance(expected, set)):
1379 warn("the assertSetEquals function if now intended for set only."\
1380 "use assertUnorderedIterableEquals instead.",
1381 DeprecationWarning, 2)
1382 return self.assertUnorderedIterableEquals(got,expected, msg)
1383
1384 items={}
1385 items['missing'] = expected - got
1386 items['unexpected'] = got - expected
1387 if any(items.itervalues()):
1388 if msg is None:
1389 msg = '\n'.join('%s:\n\t%s' % (key,"\n\t".join(str(value) for value in values))
1390 for key, values in items.iteritems() if values)
1391 self.fail(msg)
1392
1393
1394 assertSetEqual = assertSetEquals
1395
1397 """compares two lists
1398
1399 If the two list differ, the first difference is shown in the error
1400 message
1401 """
1402 _l1 = list_1[:]
1403 for i, value in enumerate(list_2):
1404 try:
1405 if _l1[0] != value:
1406 from pprint import pprint
1407 pprint(list_1)
1408 pprint(list_2)
1409 self.fail('%r != %r for index %d' % (_l1[0], value, i))
1410 del _l1[0]
1411 except IndexError:
1412 if msg is None:
1413 msg = 'list_1 has only %d elements, not %s '\
1414 '(at least %r missing)'% (i, len(list_2), value)
1415 self.fail(msg)
1416 if _l1:
1417 if msg is None:
1418 msg = 'list_2 is lacking %r' % _l1
1419 self.fail(msg)
1420 assertListEqual = assertListEquals
1421
1423 """assert list of lines are equal"""
1424 self.assertListEquals(list_1.splitlines(), list_2.splitlines(), msg)
1425 assertLineEqual = assertLinesEquals
1426
1437 assertXMLValid = deprecated_function(assertXMLWellFormed,
1438 'assertXMLValid renamed to more precise assertXMLWellFormed')
1439
1444
1445 assertXMLStringValid = deprecated_function(
1446 assertXMLStringWellFormed,
1447 'assertXMLStringValid renamed to more precise assertXMLStringWellFormed'
1448 )
1449
1451 """compare an ElementTree Element to a tuple formatted as follow:
1452 (tagname, [attrib[, children[, text[, tail]]]])"""
1453
1454 self.assertTextEquals(element.tag, tup[0])
1455
1456 if len(element.attrib) or len(tup)>1:
1457 if len(tup)<=1:
1458 self.fail( "tuple %s has no attributes (%s expected)"%(tup,
1459 dict(element.attrib)))
1460 self.assertDictEquals(element.attrib, tup[1])
1461
1462 if len(element) or len(tup)>2:
1463 if len(tup)<=2:
1464 self.fail( "tuple %s has no children (%i expected)"%(tup,
1465 len(element)))
1466 if len(element) != len(tup[2]):
1467 self.fail( "tuple %s has %i children%s (%i expected)"%(tup,
1468 len(tup[2]),
1469 ('', 's')[len(tup[2])>1], len(element)))
1470 for index in xrange(len(tup[2])):
1471 self.assertXMLEqualsTuple(element[index], tup[2][index])
1472
1473 if element.text or len(tup)>3:
1474 if len(tup)<=3:
1475 self.fail( "tuple %s has no text value (%r expected)"%(tup,
1476 element.text))
1477 self.assertTextEquals(element.text, tup[3])
1478
1479 if element.tail or len(tup)>4:
1480 if len(tup)<=4:
1481 self.fail( "tuple %s has no tail value (%r expected)"%(tup,
1482 element.tail))
1483 self.assertTextEquals(element.tail, tup[4])
1484
1485 - def _difftext(self, lines1, lines2, junk=None, msg_prefix='Texts differ'):
1486 junk = junk or (' ', '\t')
1487
1488 result = difflib.ndiff(lines1, lines2, charjunk=lambda x: x in junk)
1489 read = []
1490 for line in result:
1491 read.append(line)
1492
1493 if not line.startswith(' '):
1494 self.fail('\n'.join(['%s\n'%msg_prefix]+read + list(result)))
1495
1496 - def assertTextEquals(self, text1, text2, junk=None,
1497 msg_prefix='Text differ'):
1498 """compare two multiline strings (using difflib and splitlines())"""
1499 msg = []
1500 if not isinstance(text1, basestring):
1501 msg.append('text1 is not a string (%s)'%(type(text1)))
1502 if not isinstance(text2, basestring):
1503 msg.append('text2 is not a string (%s)'%(type(text2)))
1504 if msg:
1505 self.fail('\n'.join(msg))
1506 self._difftext(text1.strip().splitlines(True), text2.strip().splitlines(True),
1507 junk, msg_prefix)
1508 assertTextEqual = assertTextEquals
1509
1510 - def assertStreamEquals(self, stream1, stream2, junk=None,
1511 msg_prefix='Stream differ'):
1512 """compare two streams (using difflib and readlines())"""
1513
1514
1515 if stream1 is stream2:
1516 return
1517
1518 stream1.seek(0)
1519 stream2.seek(0)
1520
1521 self._difftext(stream1.readlines(), stream2.readlines(), junk,
1522 msg_prefix)
1523
1524 assertStreamEqual = assertStreamEquals
1526 """compares two files using difflib"""
1527 self.assertStreamEqual(file(fname1), file(fname2), junk,
1528 msg_prefix='Files differs\n-:%s\n+:%s\n'%(fname1, fname2))
1529 assertFileEqual = assertFileEquals
1530
1531
1533 """compares two files using difflib"""
1534 assert osp.exists(path_a), "%s doesn't exists" % path_a
1535 assert osp.exists(path_b), "%s doesn't exists" % path_b
1536
1537 all_a = [ (ipath[len(path_a):].lstrip('/'), idirs, ifiles)
1538 for ipath, idirs, ifiles in os.walk(path_a)]
1539 all_a.sort(key=itemgetter(0))
1540
1541 all_b = [ (ipath[len(path_b):].lstrip('/'), idirs, ifiles)
1542 for ipath, idirs, ifiles in os.walk(path_b)]
1543 all_b.sort(key=itemgetter(0))
1544
1545 iter_a, iter_b = iter(all_a), iter(all_b)
1546 partial_iter = True
1547 ipath_a, idirs_a, ifiles_a = data_a = None, None, None
1548 while True:
1549 try:
1550 ipath_a, idirs_a, ifiles_a = datas_a = iter_a.next()
1551 partial_iter = False
1552 ipath_b, idirs_b, ifiles_b = datas_b = iter_b.next()
1553 partial_iter = True
1554
1555
1556 self.assert_(ipath_a == ipath_b,
1557 "unexpected %s in %s while looking %s from %s" %
1558 (ipath_a, path_a, ipath_b, path_b))
1559
1560
1561 errors = {}
1562 sdirs_a = set(idirs_a)
1563 sdirs_b = set(idirs_b)
1564 errors["unexpected directories"] = sdirs_a - sdirs_b
1565 errors["missing directories"] = sdirs_b - sdirs_a
1566
1567 sfiles_a = set(ifiles_a)
1568 sfiles_b = set(ifiles_b)
1569 errors["unexpected files"] = sfiles_a - sfiles_b
1570 errors["missing files"] = sfiles_b - sfiles_a
1571
1572
1573 msgs = [ "%s: %s"% (name, items)
1574 for name, items in errors.iteritems() if items]
1575
1576 if msgs:
1577 msgs.insert(0,"%s and %s differ :" % (
1578 osp.join(path_a, ipath_a),
1579 osp.join(path_b, ipath_b),
1580 ))
1581 self.fail("\n".join(msgs))
1582
1583 for files in (ifiles_a, ifiles_b):
1584 files.sort()
1585
1586 for index, path in enumerate(ifiles_a):
1587 self.assertFileEquals(osp.join(path_a, ipath_a, path),
1588 osp.join(path_b, ipath_b, ifiles_b[index]))
1589
1590 except StopIteration:
1591 break
1592
1593
1594 assertDirEqual = assertDirEquals
1595
1596
1598 """compares two files using difflib"""
1599 if msg is None:
1600 if strict:
1601 msg = '%r is not of class %s but of %s'
1602 else:
1603 msg = '%r is not an instance of %s but of %s'
1604 msg = msg % (obj, klass, type(obj))
1605 if strict:
1606 self.assert_(obj.__class__ is klass, msg)
1607 else:
1608 self.assert_(isinstance(obj, klass), msg)
1609
1610 - def assertIs(self, obj, other, msg=None):
1611 """compares identity of two reference"""
1612 if msg is None:
1613 msg = "%r is not %r"%(obj, other)
1614 self.assert_(obj is other, msg)
1615
1616
1618 """compares identity of two reference"""
1619 if msg is None:
1620 msg = "%r is %r"%(obj, other)
1621 self.assert_(obj is not other, msg )
1622
1624 """assert obj is None"""
1625 if msg is None:
1626 msg = "reference to %r when None expected"%(obj,)
1627 self.assert_( obj is None, msg )
1628
1630 """assert obj is not None"""
1631 if msg is None:
1632 msg = "unexpected reference to None"
1633 self.assert_( obj is not None, msg )
1634
1636 """compares two floats"""
1637 if msg is None:
1638 msg = "%r != %r" % (obj, other)
1639 self.assert_(math.fabs(obj - other) < prec, msg)
1640
1642 """override default failUnlessRaise method to return the raised
1643 exception instance.
1644
1645 Fail unless an exception of class excClass is thrown
1646 by callableObj when invoked with arguments args and keyword
1647 arguments kwargs. If a different type of exception is
1648 thrown, it will not be caught, and the test case will be
1649 deemed to have suffered an error, exactly as for an
1650 unexpected exception.
1651 """
1652 try:
1653 callableObj(*args, **kwargs)
1654 except excClass, exc:
1655 return exc
1656 else:
1657 if hasattr(excClass, '__name__'):
1658 excName = excClass.__name__
1659 else:
1660 excName = str(excClass)
1661 raise self.failureException, "%s not raised" % excName
1662
1663 assertRaises = failUnlessRaises
1664
1665 import doctest
1666
1669 """just there to trigger test execution"""
1670 self.skipped_test('doctest module has no DocTestSuite class')
1671
1672
1673
1674 if sys.version_info >= (2, 4):
1676
1678 self.skipped = kwargs.pop('skipped', ())
1679 doctest.DocTestFinder.__init__(self, *args, **kwargs)
1680
1681 - def _get_test(self, obj, name, module, globs, source_lines):
1682 """override default _get_test method to be able to skip tests
1683 according to skipped attribute's value
1684
1685 Note: Python (<=2.4) use a _name_filter which could be used for that
1686 purpose but it's no longer available in 2.5
1687 Python 2.5 seems to have a [SKIP] flag
1688 """
1689 if getattr(obj, '__name__', '') in self.skipped:
1690 return None
1691 return doctest.DocTestFinder._get_test(self, obj, name, module,
1692 globs, source_lines)
1693 else:
1694
1697 self.skipped = skipped
1698 self.original_find_tests = doctest._find_tests
1699 doctest._find_tests = self._find_tests
1700
1702 tests = []
1703 for testinfo in self.original_find_tests(module, prefix):
1704 testname, _, _, _ = testinfo
1705
1706 testname = testname.split('.')[-1]
1707 if testname not in self.skipped:
1708 tests.append(testinfo)
1709 return tests
1710
1711
1713 """trigger module doctest
1714 I don't know how to make unittest.main consider the DocTestSuite instance
1715 without this hack
1716 """
1717 skipped = ()
1718 - def __call__(self, result=None, runcondition=None, options=None):\
1719
1720 try:
1721 finder = DocTestFinder(skipped=self.skipped)
1722 if sys.version_info >= (2, 4):
1723 suite = doctest.DocTestSuite(self.module, test_finder=finder)
1724 else:
1725 suite = doctest.DocTestSuite(self.module)
1726 except AttributeError:
1727 suite = SkippedSuite()
1728 return suite.run(result)
1729 run = __call__
1730
1732 """just there to trigger test execution"""
1733
1734 MAILBOX = None
1735
1737 """fake smtplib.SMTP"""
1738
1740 self.host = host
1741 self.port = port
1742 global MAILBOX
1743 self.reveived = MAILBOX = []
1744
1746 """ignore debug level"""
1747
1748 - def sendmail(self, fromaddr, toaddres, body):
1749 """push sent mail in the mailbox"""
1750 self.reveived.append((fromaddr, toaddres, body))
1751
1754
1755
1757 """fake ConfigParser.ConfigParser"""
1758
1760 ConfigParser.__init__(self)
1761 for section, pairs in options.iteritems():
1762 self.add_section(section)
1763 for key, value in pairs.iteritems():
1764 self.set(section,key,value)
1766 raise NotImplementedError()
1767
1768
1770 """fake DB-API 2.0 connexion AND cursor (i.e. cursor() return self)"""
1771
1773 self.received = []
1774 self.states = []
1775 self.results = results
1776
1778 """Mock cursor method"""
1779 return self
1780 - def execute(self, query, args=None):
1781 """Mock execute method"""
1782 self.received.append( (query, args) )
1784 """Mock fetchone method"""
1785 return self.results[0]
1787 """Mock fetchall method"""
1788 return self.results
1790 """Mock commiy method"""
1791 self.states.append( ('commit', len(self.received)) )
1793 """Mock rollback method"""
1794 self.states.append( ('rollback', len(self.received)) )
1796 """Mock close method"""
1797 pass
1798
1799 MockConnexion = class_renamed('MockConnexion', MockConnection)
1800
1802 """creates an object using params to set attributes
1803 >>> option = mock_object(verbose=False, index=range(5))
1804 >>> option.verbose
1805 False
1806 >>> option.index
1807 [0, 1, 2, 3, 4]
1808 """
1809 return type('Mock', (), params)()
1810
1811
1813 """Creates directories and files found in <path>.
1814
1815 :param paths: list of relative paths to files or directories
1816 :param chroot: the root directory in which paths will be created
1817
1818 >>> from os.path import isdir, isfile
1819 >>> isdir('/tmp/a')
1820 False
1821 >>> create_files(['a/b/foo.py', 'a/b/c/', 'a/b/c/d/e.py'], '/tmp')
1822 >>> isdir('/tmp/a')
1823 True
1824 >>> isdir('/tmp/a/b/c')
1825 True
1826 >>> isfile('/tmp/a/b/c/d/e.py')
1827 True
1828 >>> isfile('/tmp/a/b/foo.py')
1829 True
1830 """
1831 dirs, files = set(), set()
1832 for path in paths:
1833 path = osp.join(chroot, path)
1834 filename = osp.basename(path)
1835
1836 if filename == '':
1837 dirs.add(path)
1838
1839 else:
1840 dirs.add(osp.dirname(path))
1841 files.add(path)
1842 for dirpath in dirs:
1843 if not osp.isdir(dirpath):
1844 os.makedirs(dirpath)
1845 for filepath in files:
1846 file(filepath, 'w').close()
1847
1849 """
1850 Without arguments, return True if contracts can be enabled and should be
1851 enabled (see option -d), return False otherwise.
1852
1853 With arguments, return False if contracts can't or shouldn't be enabled,
1854 otherwise weave ContractAspect with items passed as arguments.
1855 """
1856 if not ENABLE_DBC:
1857 return False
1858 try:
1859 from logilab.aspects.weaver import weaver
1860 from logilab.aspects.lib.contracts import ContractAspect
1861 except ImportError:
1862 sys.stderr.write(
1863 'Warning: logilab.aspects is not available. Contracts disabled.')
1864 return False
1865 for arg in args:
1866 weaver.weave_module(arg, ContractAspect)
1867 return True
1868
1869
1872 self.__dict__.update(kwargs)
1873
1875 """descriptor adding tag to a function"""
1876 def desc(func):
1877 assert not hasattr(func, 'tags')
1878 func.tags = Tags(args)
1879 return func
1880 return desc
1881
1889
1891 """ Compare version of python interpretor to the given one. Skip the test
1892 if older.
1893 """
1894 def check_require_version(f):
1895 version_elements = version.split('.')
1896 try:
1897 compare = tuple([int(v) for v in version_elements])
1898 except ValueError:
1899 raise ValueError('%s is not a correct version : should be X.Y[.Z].' % version)
1900 current = sys.version_info[:3]
1901
1902 if current < compare:
1903
1904 def new_f(self, *args, **kwargs):
1905 self.skip('Need at least %s version of python. Current version is %s.' % (version, '.'.join([str(element) for element in current])))
1906 new_f.__name__ = f.__name__
1907 return new_f
1908 else:
1909
1910 return f
1911 return check_require_version
1912
1914 """ Check if the given module is loaded. Skip the test if not.
1915 """
1916 def check_require_module(f):
1917 try:
1918 __import__(module)
1919
1920 return f
1921 except ImportError:
1922
1923 def new_f(self, *args, **kwargs):
1924 self.skip('%s can not be imported.' % module)
1925 new_f.__name__ = f.__name__
1926 return new_f
1927 return check_require_module
1928