Package logilab-common-0 ::
Package 36 ::
Package 1 ::
Module testlib
|
|
1
2 """Run tests.
3
4 This will find all modules whose name match a given prefix in the test
5 directory, and run them. Various command line options provide
6 additional facilities.
7
8 Command line options:
9
10 -v: verbose -- run tests in verbose mode with output to stdout
11 -q: quiet -- don't print anything except if a test fails
12 -t: testdir -- directory where the tests will be found
13 -x: exclude -- add a test to exclude
14 -p: profile -- profiled execution
15 -c: capture -- capture standard out/err during tests
16 -d: dbc -- enable design-by-contract
17 -m: match -- only run test matching the tag pattern which follow
18
19 If no non-option arguments are present, prefixes used are 'test',
20 'regrtest', 'smoketest' and 'unittest'.
21
22 :copyright: 2003-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
23 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
24 :license: General Public License version 2 - http://www.gnu.org/licenses
25 """
26 __docformat__ = "restructuredtext en"
27
28
29
30
31 import sys
32 import os, os.path as osp
33 import re
34 import time
35 import getopt
36 import traceback
37 import unittest
38 import difflib
39 import types
40 import tempfile
41 from shutil import rmtree
42 from operator import itemgetter
43 from warnings import warn
44 from compiler.consts import CO_GENERATOR
45 from ConfigParser import ConfigParser
46
47
48
49
50 try:
51 from test import test_support
52 except ImportError:
53
57 test_support = TestSupport()
58
59 try:
60 from pygments import highlight, lexers, formatters
61
62 PYGMENTS_FOUND = True
63 except ImportError:
64 PYGMENTS_FOUND = False
65
66 from logilab.common.deprecation import class_renamed, deprecated_function, \
67 obsolete
68
69 from logilab.common.compat import set, enumerate, any
70
71 from logilab.common.modutils import load_module_from_name
72 from logilab.common.debugger import Debugger
73 from logilab.common.decorators import cached
74 from logilab.common import textutils
75
76 __all__ = ['main', 'unittest_main', 'find_tests', 'run_test', 'spawn']
77
78 DEFAULT_PREFIXES = ('test', 'regrtest', 'smoketest', 'unittest',
79 'func', 'validation')
80
81 ENABLE_DBC = False
82
83 FILE_RESTART = ".pytest.restart"
84
85
87 """A decorator ensuring no temporary file left when the function return
88 Work only for temporary file create with the tempfile module"""
89 def proxy(*args, **kargs):
90
91 old_tmpdir = tempfile.gettempdir()
92 new_tmpdir = tempfile.mkdtemp("-logilab-common-testlib","temp-dir-")
93 tempfile.tempdir = new_tmpdir
94 try:
95 return callable(*args, **kargs)
96 finally:
97 try:
98 rmtree(new_tmpdir, ignore_errors=True)
99 finally:
100 tempfile.tempdir = old_tmpdir
101 return proxy
102
103 -def main(testdir=None, exitafter=True):
104 """Execute a test suite.
105
106 This also parses command-line options and modifies its behaviour
107 accordingly.
108
109 tests -- a list of strings containing test names (optional)
110 testdir -- the directory in which to look for tests (optional)
111
112 Users other than the Python test suite will certainly want to
113 specify testdir; if it's omitted, the directory containing the
114 Python test suite is searched for.
115
116 If the tests argument is omitted, the tests listed on the
117 command-line will be used. If that's empty, too, then all *.py
118 files beginning with test_ will be used.
119
120 """
121
122 try:
123 opts, args = getopt.getopt(sys.argv[1:], 'hvqxr:t:pcd', ['help'])
124 except getopt.error, msg:
125 print msg
126 print __doc__
127 return 2
128 verbose = 0
129 quiet = False
130 profile = False
131 exclude = []
132 capture = 0
133 for o, a in opts:
134 if o == '-v':
135 verbose += 1
136 elif o == '-q':
137 quiet = True
138 verbose = 0
139 elif o == '-x':
140 exclude.append(a)
141 elif o == '-t':
142 testdir = a
143 elif o == '-p':
144 profile = True
145 elif o == '-c':
146 capture += 1
147 elif o == '-d':
148 global ENABLE_DBC
149 ENABLE_DBC = True
150 elif o in ('-h', '--help'):
151 print __doc__
152 sys.exit(0)
153
154 args = [item.rstrip('.py') for item in args]
155 exclude = [item.rstrip('.py') for item in exclude]
156
157 if testdir is not None:
158 os.chdir(testdir)
159 sys.path.insert(0, '')
160 tests = find_tests('.', args or DEFAULT_PREFIXES, excludes=exclude)
161
162 test_support.verbose = verbose
163 if profile:
164 print >> sys.stderr, '** profiled run'
165 from hotshot import Profile
166 prof = Profile('stones.prof')
167 start_time, start_ctime = time.time(), time.clock()
168 good, bad, skipped, all_result = prof.runcall(run_tests, tests, quiet,
169 verbose, None, capture)
170 end_time, end_ctime = time.time(), time.clock()
171 prof.close()
172 else:
173 start_time, start_ctime = time.time(), time.clock()
174 good, bad, skipped, all_result = run_tests(tests, quiet, verbose, None,
175 capture)
176 end_time, end_ctime = time.time(), time.clock()
177 if not quiet:
178 print '*'*80
179 if all_result:
180 print 'Ran %s test cases in %0.2fs (%0.2fs CPU)' % (
181 all_result.testsRun, end_time - start_time,
182 end_ctime - start_ctime),
183 if all_result.errors:
184 print ', %s errors' % len(all_result.errors),
185 if all_result.failures:
186 print ', %s failed' % len(all_result.failures),
187 if all_result.skipped:
188 print ', %s skipped' % len(all_result.skipped),
189 print
190 if good:
191 if not bad and not skipped and len(good) > 1:
192 print "All",
193 print _count(len(good), "test"), "OK."
194 if bad:
195 print _count(len(bad), "test"), "failed:",
196 print ', '.join(bad)
197 if skipped:
198 print _count(len(skipped), "test"), "skipped:",
199 print ', '.join(['%s (%s)' % (test, msg) for test, msg in skipped])
200 if profile:
201 from hotshot import stats
202 stats = stats.load('stones.prof')
203 stats.sort_stats('time', 'calls')
204 stats.print_stats(30)
205 if exitafter:
206 sys.exit(len(bad) + len(skipped))
207 else:
208 sys.path.pop(0)
209 return len(bad)
210 main = obsolete("testlib.main() is obsolete, use the pytest tool instead")(main)
211
212
213 -def run_tests(tests, quiet, verbose, runner=None, capture=0):
214 """Execute a list of tests.
215
216 :rtype: tuple
217 :return: tuple (list of passed tests, list of failed tests, list of skipped tests)
218 """
219 good = []
220 bad = []
221 skipped = []
222 all_result = None
223 for test in tests:
224 if not quiet:
225 print
226 print '-'*80
227 print "Executing", test
228 result = run_test(test, verbose, runner, capture)
229 if type(result) is type(''):
230
231 skipped.append( (test, result))
232 else:
233 if all_result is None:
234 all_result = result
235 else:
236 all_result.testsRun += result.testsRun
237 all_result.failures += result.failures
238 all_result.errors += result.errors
239 all_result.skipped += result.skipped
240 if result.errors or result.failures:
241 bad.append(test)
242 if verbose:
243 print "test", test, \
244 "failed -- %s errors, %s failures" % (
245 len(result.errors), len(result.failures))
246 else:
247 good.append(test)
248
249 return good, bad, skipped, all_result
250
255 """
256 Return a list of all applicable test modules.
257 """
258 tests = []
259 for name in os.listdir(testdir):
260 if not suffix or name.endswith(suffix):
261 for prefix in prefixes:
262 if name.startswith(prefix):
263 if remove_suffix and name.endswith(suffix):
264 name = name[:-len(suffix)]
265 if name not in excludes:
266 tests.append(name)
267 tests.sort()
268 return tests
269
270
271 -def run_test(test, verbose, runner=None, capture=0):
272 """
273 Run a single test.
274
275 test -- the name of the test
276 verbose -- if true, print more messages
277 """
278 test_support.unload(test)
279 try:
280 m = load_module_from_name(test, path=sys.path)
281
282 try:
283 suite = m.suite
284 if callable(suite):
285 suite = suite()
286 except AttributeError:
287 loader = unittest.TestLoader()
288 suite = loader.loadTestsFromModule(m)
289 if runner is None:
290 runner = SkipAwareTextTestRunner(capture=capture)
291 return runner.run(suite)
292 except KeyboardInterrupt, v:
293 raise KeyboardInterrupt, v, sys.exc_info()[2]
294 except:
295
296 type, value = sys.exc_info()[:2]
297 msg = "test %s crashed -- %s : %s" % (test, type, value)
298 if verbose:
299 traceback.print_exc()
300 return msg
301
303 """format word according to n"""
304 if n == 1:
305 return "%d %s" % (n, word)
306 else:
307 return "%d %ss" % (n, word)
308
309
310
311
312
314 """starts an interactive shell so that the user can inspect errors
315 """
316 debuggers = result.debuggers
317 descrs = result.error_descrs + result.fail_descrs
318 if len(debuggers) == 1:
319
320 debuggers[0].start()
321 else:
322 while True:
323 testindex = 0
324 print "Choose a test to debug:"
325
326 print "\n".join(['\t%s : %s' % (i, descr) for i, (_, descr)
327 in enumerate(descrs)])
328 print "Type 'exit' (or ^D) to quit"
329 print
330 try:
331 todebug = raw_input('Enter a test name: ')
332 if todebug.strip().lower() == 'exit':
333 print
334 break
335 else:
336 try:
337 testindex = int(todebug)
338 debugger = debuggers[descrs[testindex][0]]
339 except (ValueError, IndexError):
340 print "ERROR: invalid test number %r" % (todebug, )
341 else:
342 debugger.start()
343 except (EOFError, KeyboardInterrupt):
344 print
345 break
346
347
348
349 from cStringIO import StringIO
350
352
353 - def __init__(self, stream, descriptions, verbosity,
354 exitfirst=False, capture=0, printonly=None,
355 pdbmode=False, cvg=None):
356 super(SkipAwareTestResult, self).__init__(stream,
357 descriptions, verbosity)
358 self.skipped = []
359 self.debuggers = []
360 self.fail_descrs = []
361 self.error_descrs = []
362 self.exitfirst = exitfirst
363 self.capture = capture
364 self.printonly = printonly
365 self.pdbmode = pdbmode
366 self.cvg = cvg
367 self.pdbclass = Debugger
368
370 return getattr(self, '%s_descrs' % flavour.lower())
371
373 self.descrs_for(flavour).append( (len(self.debuggers), test_descr) )
374 if self.pdbmode:
375 self.debuggers.append(self.pdbclass(sys.exc_info()[2]))
376
378 """err == (exc_type, exc, tcbk)"""
379 exc_type, exc, _ = err
380 if exc_type == TestSkipped:
381 self.addSkipped(test, exc)
382 else:
383 if self.exitfirst:
384 self.shouldStop = True
385 descr = self.getDescription(test)
386 super(SkipAwareTestResult, self).addError(test, err)
387 self._create_pdb(descr, 'error')
388
390 if self.exitfirst:
391 self.shouldStop = True
392 descr = self.getDescription(test)
393 super(SkipAwareTestResult, self).addFailure(test, err)
394 self._create_pdb(descr, 'fail')
395
397 self.skipped.append((test, self.getDescription(test), reason))
398 if self.showAll:
399 self.stream.writeln("SKIPPED")
400 elif self.dots:
401 self.stream.write('S')
402
404 super(SkipAwareTestResult, self).printErrors()
405 self.printSkippedList()
406
408 for _, descr, err in self.skipped:
409 self.stream.writeln(self.separator1)
410 self.stream.writeln("%s: %s" % ('SKIPPED', descr))
411 self.stream.writeln("\t%s" % err)
412
414 for (_, descr), (test, err) in zip(self.descrs_for(flavour), errors):
415 self.stream.writeln(self.separator1)
416 if isatty(self.stream):
417 self.stream.writeln("%s: %s" % (
418 textutils.colorize_ansi(flavour, color='red'), descr))
419 else:
420 self.stream.writeln("%s: %s" % (flavour, descr))
421
422 self.stream.writeln(self.separator2)
423 if PYGMENTS_FOUND and isatty(self.stream):
424
425 if isinstance(err, str):
426 try:
427
428 err = err.decode('utf-8')
429 except UnicodeDecodeError:
430 err = err.decode('iso-8859-1')
431 err_color = highlight(err, lexers.PythonLexer(),
432 formatters.terminal.TerminalFormatter())
433
434
435 if hasattr(self.stream, 'encoding'):
436 err_color = err_color.encode(self.stream.encoding, 'replace')
437 else:
438
439
440 err_color = err_color.encode('utf-8', 'replace')
441 self.stream.writeln(err_color)
442 else:
443 self.stream.writeln(err)
444
445 try:
446 output, errput = test.captured_output()
447 except AttributeError:
448 pass
449 else:
450 if output:
451 self.stream.writeln(self.separator2)
452 self.stream.writeln("captured stdout".center(
453 len(self.separator2)))
454 self.stream.writeln(self.separator2)
455 self.stream.writeln(output)
456 else:
457 self.stream.writeln('no stdout'.center(
458 len(self.separator2)))
459 if errput:
460 self.stream.writeln(self.separator2)
461 self.stream.writeln("captured stderr".center(
462 len(self.separator2)))
463 self.stream.writeln(self.separator2)
464 self.stream.writeln(errput)
465 else:
466 self.stream.writeln('no stderr'.center(
467 len(self.separator2)))
468
469
471 return hasattr(stream, 'isatty') and stream.isatty()
472
473 -def run(self, result, runcondition=None, options=None):
474 for test in self._tests:
475 if result.shouldStop:
476 break
477 try:
478 test(result, runcondition, options)
479 except TypeError:
480
481
482 warn("%s should extend lgc.testlib.TestCase instead of unittest.TestCase"
483 % test)
484 test(result)
485 return result
486 unittest.TestSuite.run = run
487
488
489 TestSuite = unittest.TestSuite
490
491
493 return self.run(*args, **kwds)
494 unittest.TestSuite.__call__ = __call__
495
496
497 -class SkipAwareTextTestRunner(unittest.TextTestRunner):
498
499 - def __init__(self, stream=sys.stderr, verbosity=1,
500 exitfirst=False, capture=False, printonly=None,
501 pdbmode=False, cvg=None, test_pattern=None,
502 skipped_patterns=(), options=None):
503 super(SkipAwareTextTestRunner, self).__init__(stream=stream,
504 verbosity=verbosity)
505 self.exitfirst = exitfirst
506 self.capture = capture
507 self.printonly = printonly
508 self.pdbmode = pdbmode
509 self.cvg = cvg
510 self.test_pattern = test_pattern
511 self.skipped_patterns = skipped_patterns
512 self.options = options
513
514 - def _this_is_skipped(self, testedname):
515 return any([(pat in testedname) for pat in self.skipped_patterns])
516
517 - def _runcondition(self, test, skipgenerator=True):
518 if isinstance(test, InnerTest):
519 testname = test.name
520 else:
521 if isinstance(test, TestCase):
522 meth = test._get_test_method()
523 func = meth.im_func
524 testname = '%s.%s' % (meth.im_class.__name__, func.__name__)
525 elif isinstance(test, types.FunctionType):
526 func = test
527 testname = func.__name__
528 elif isinstance(test, types.MethodType):
529 func = test.im_func
530 testname = '%s.%s' % (test.im_class.__name__, func.__name__)
531 else:
532 return True
533
534 if is_generator(func) and skipgenerator:
535 return self.does_match_tags(func)
536
537
538 if self._this_is_skipped(testname):
539 return False
540 if self.test_pattern is not None:
541 try:
542 classpattern, testpattern = self.test_pattern.split('.')
543 klass, name = testname.split('.')
544 if classpattern not in klass or testpattern not in name:
545 return False
546 except ValueError:
547 if self.test_pattern not in testname:
548 return False
549
550 return self.does_match_tags(test)
551
553 if self.options is not None:
554 tags_pattern = getattr(self.options, 'tags_pattern', None)
555 if tags_pattern is not None:
556 tags = getattr(test, 'tags', Tags())
557 return tags.match(tags_pattern)
558 return True
559
560 - def _makeResult(self):
561 return SkipAwareTestResult(self.stream, self.descriptions,
562 self.verbosity, self.exitfirst, self.capture,
563 self.printonly, self.pdbmode, self.cvg)
564
565 - def run(self, test):
566 "Run the given test case or test suite."
567 result = self._makeResult()
568 startTime = time.time()
569 test(result, self._runcondition, self.options)
570 stopTime = time.time()
571 timeTaken = stopTime - startTime
572 result.printErrors()
573 self.stream.writeln(result.separator2)
574 run = result.testsRun
575 self.stream.writeln("Ran %d test%s in %.3fs" %
576 (run, run != 1 and "s" or "", timeTaken))
577 self.stream.writeln()
578 if not result.wasSuccessful():
579 if isatty(self.stream):
580 self.stream.write(textutils.colorize_ansi("FAILED", color='red'))
581 else:
582 self.stream.write("FAILED")
583 else:
584 if isatty(self.stream):
585 self.stream.write(textutils.colorize_ansi("OK", color='green'))
586 else:
587 self.stream.write("OK")
588 failed, errored, skipped = map(len, (result.failures, result.errors,
589 result.skipped))
590
591 det_results = []
592 for name, value in (("failures", result.failures),
593 ("errors",result.errors),
594 ("skipped", result.skipped)):
595 if value:
596 det_results.append("%s=%i" % (name, len(value)))
597 if det_results:
598 self.stream.write(" (")
599 self.stream.write(', '.join(det_results))
600 self.stream.write(")")
601 self.stream.writeln("")
602 return result
603
604
606 """Keyword args (**kwargs) support for generative tests."""
607
609 """Variable arguments (*args) for generative tests."""
611 return tuple.__new__(cls, args)
612
613
614
616 """
617 Overrides default testloader to be able to omit classname when
618 specifying tests to run on command line.
619
620 For example, if the file test_foo.py contains ::
621
622 class FooTC(TestCase):
623 def test_foo1(self): # ...
624 def test_foo2(self): # ...
625 def test_bar1(self): # ...
626
627 class BarTC(TestCase):
628 def test_bar2(self): # ...
629
630 'python test_foo.py' will run the 3 tests in FooTC
631 'python test_foo.py FooTC' will run the 3 tests in FooTC
632 'python test_foo.py test_foo' will run test_foo1 and test_foo2
633 'python test_foo.py test_foo1' will run test_foo1
634 'python test_foo.py test_bar' will run FooTC.test_bar1 and BarTC.test_bar2
635 """
636
638 self.skipped_patterns = []
639
641 suites = []
642 for name in names:
643 suites.extend(self.loadTestsFromName(name, module))
644 return self.suiteClass(suites)
645
647 tests = {}
648 for obj in vars(module).values():
649 if (issubclass(type(obj), (types.ClassType, type)) and
650 issubclass(obj, unittest.TestCase)):
651 classname = obj.__name__
652 if self._this_is_skipped(classname):
653 continue
654 methodnames = []
655
656 for attrname in dir(obj):
657 if attrname.startswith(self.testMethodPrefix):
658 attr = getattr(obj, attrname)
659 if callable(attr):
660 methodnames.append(attrname)
661
662 tests[classname] = (obj, methodnames)
663 return tests
664
666 try:
667 suite = getattr(module, suitename)()
668 except AttributeError:
669 return []
670 assert hasattr(suite, '_tests'), \
671 "%s.%s is not a valid TestSuite" % (module.__name__, suitename)
672
673
674 return suite._tests
675
677 parts = name.split('.')
678 if module is None or len(parts) > 2:
679
680 return [super(NonStrictTestLoader, self).loadTestsFromName(name)]
681 tests = self._collect_tests(module)
682
683
684 collected = []
685 if len(parts) == 1:
686 pattern = parts[0]
687 if callable(getattr(module, pattern, None)
688 ) and pattern not in tests:
689
690 return self.loadTestsFromSuite(module, pattern)
691 if pattern in tests:
692
693 klass, methodnames = tests[pattern]
694 for methodname in methodnames:
695 collected = [klass(methodname)
696 for methodname in methodnames]
697 else:
698
699 for klass, methodnames in tests.values():
700 collected += [klass(methodname)
701 for methodname in methodnames]
702 elif len(parts) == 2:
703
704 classname, pattern = parts
705 klass, methodnames = tests.get(classname, (None, []))
706 for methodname in methodnames:
707 collected = [klass(methodname) for methodname in methodnames]
708 return collected
709
711 return any([(pat in testedname) for pat in self.skipped_patterns])
712
714 """Return a sorted sequence of method names found within testCaseClass
715 """
716 is_skipped = self._this_is_skipped
717 if is_skipped(testCaseClass.__name__):
718 return []
719 testnames = super(NonStrictTestLoader, self).getTestCaseNames(
720 testCaseClass)
721 return [testname for testname in testnames if not is_skipped(testname)]
722
723
725
726 USAGE = """\
727 Usage: %(progName)s [options] [test] [...]
728
729 Options:
730 -h, --help Show this message
731 -v, --verbose Verbose output
732 -i, --pdb Enable test failure inspection
733 -x, --exitfirst Exit on first failure
734 -c, --capture Captures and prints standard out/err only on errors
735 -p, --printonly Only prints lines matching specified pattern
736 (implies capture)
737 -s, --skip skip test matching this pattern (no regexp for now)
738 -q, --quiet Minimal output
739
740 -m, --match Run only test whose tag match this pattern
741
742 Examples:
743 %(progName)s - run default set of tests
744 %(progName)s MyTestSuite - run suite 'MyTestSuite'
745 %(progName)s MyTestCase.testSomething - run MyTestCase.testSomething
746 %(progName)s MyTestCase - run all 'test*' test methods
747 in MyTestCase
748 """
749 - def __init__(self, module='__main__', defaultTest=None, batchmode=False,
750 cvg=None, options=None, outstream=sys.stderr):
751 self.batchmode = batchmode
752 self.cvg = cvg
753 self.options = options
754 self.outstream = outstream
755 super(SkipAwareTestProgram, self).__init__(
756 module=module, defaultTest=defaultTest,
757 testLoader=NonStrictTestLoader())
758
760 self.pdbmode = False
761 self.exitfirst = False
762 self.capture = 0
763 self.printonly = None
764 self.skipped_patterns = []
765 self.test_pattern = None
766 self.tags_pattern = None
767 import getopt
768 try:
769 options, args = getopt.getopt(argv[1:], 'hHvixrqcp:s:m:',
770 ['help', 'verbose', 'quiet', 'pdb',
771 'exitfirst', 'restart', 'capture', 'printonly=',
772 'skip=', 'match='])
773 for opt, value in options:
774 if opt in ('-h', '-H', '--help'):
775 self.usageExit()
776 if opt in ('-i', '--pdb'):
777 self.pdbmode = True
778 if opt in ('-x', '--exitfirst'):
779 self.exitfirst = True
780 if opt in ('-r', '--restart'):
781 self.restart = True
782 self.exitfirst = True
783 if opt in ('-q', '--quiet'):
784 self.verbosity = 0
785 if opt in ('-v', '--verbose'):
786 self.verbosity = 2
787 if opt in ('-c', '--capture'):
788 self.capture += 1
789 if opt in ('-p', '--printonly'):
790 self.printonly = re.compile(value)
791 if opt in ('-s', '--skip'):
792 self.skipped_patterns = [pat.strip() for pat in
793 value.split(', ')]
794 if opt in ('-m', '--match'):
795
796 self.options["tag_pattern"] = value
797 self.testLoader.skipped_patterns = self.skipped_patterns
798 if self.printonly is not None:
799 self.capture += 1
800 if len(args) == 0 and self.defaultTest is None:
801 suitefunc = getattr(self.module, 'suite', None)
802 if isinstance(suitefunc, (types.FunctionType,
803 types.MethodType)):
804 self.test = self.module.suite()
805 else:
806 self.test = self.testLoader.loadTestsFromModule(self.module)
807 return
808 if len(args) > 0:
809 self.test_pattern = args[0]
810 self.testNames = args
811 else:
812 self.testNames = (self.defaultTest, )
813 self.createTests()
814 except getopt.error, msg:
815 self.usageExit(msg)
816
817
819 if hasattr(self.module, 'setup_module'):
820 try:
821 self.module.setup_module(self.options)
822 except Exception, exc:
823 print 'setup_module error:', exc
824 sys.exit(1)
825 self.testRunner = SkipAwareTextTestRunner(verbosity=self.verbosity,
826 stream=self.outstream,
827 exitfirst=self.exitfirst,
828 capture=self.capture,
829 printonly=self.printonly,
830 pdbmode=self.pdbmode,
831 cvg=self.cvg,
832 test_pattern=self.test_pattern,
833 skipped_patterns=self.skipped_patterns,
834 options=self.options)
835
836 def removeSucceededTests(obj, succTests):
837 """ Recurcive function that removes succTests from
838 a TestSuite or TestCase
839 """
840 if isinstance(obj, TestSuite):
841 removeSucceededTests(obj._tests, succTests)
842 if isinstance(obj, list):
843 for el in obj[:]:
844 if isinstance(el, TestSuite):
845 removeSucceededTests(el, succTests)
846 elif isinstance(el, TestCase):
847 descr = '.'.join((el.__class__.__module__,
848 el.__class__.__name__,
849 el._testMethodName))
850 if descr in succTests:
851 obj.remove(el)
852
853 if self.options.restart:
854
855 try:
856 restartfile = open(FILE_RESTART, 'r')
857 try:
858 try:
859 succeededtests = list(elem.rstrip('\n\r') for elem in
860 restartfile.readlines())
861 removeSucceededTests(self.test, succeededtests)
862 except Exception, e:
863 raise e
864 finally:
865 restartfile.close()
866 except Exception ,e:
867 raise "Error while reading \
868 succeeded tests into", osp.join(os.getcwd(),FILE_RESTART)
869
870 result = self.testRunner.run(self.test)
871 if hasattr(self.module, 'teardown_module'):
872 try:
873 self.module.teardown_module(self.options)
874 except Exception, exc:
875 print 'teardown_module error:', exc
876 sys.exit(1)
877 if os.environ.get('PYDEBUG'):
878 warn("PYDEBUG usage is deprecated, use -i / --pdb instead",
879 DeprecationWarning)
880 self.pdbmode = True
881 if result.debuggers and self.pdbmode:
882 start_interactive_mode(result)
883 if not self.batchmode:
884 sys.exit(not result.wasSuccessful())
885 self.result = result
886
887
888
889
891 """adapted from py lib (http://codespeak.net/py)
892 Capture IO to/from a given os-level filedescriptor.
893 """
894 - def __init__(self, fd, attr='stdout', printonly=None):
895 self.targetfd = fd
896 self.tmpfile = os.tmpfile()
897 self.printonly = printonly
898
899 self._savefd = os.dup(fd)
900
901 os.dup2(self.tmpfile.fileno(), fd)
902
903 self.oldval = getattr(sys, attr)
904 setattr(sys, attr, self)
905 self.attr = attr
906
908
909 for line in msg.splitlines():
910 line += '\n'
911 if self.printonly is None or self.printonly.search(line) is None:
912 self.tmpfile.write(line)
913 else:
914 os.write(self._savefd, line)
915
916
917
918
919
920
921
922
924 """restore original fd and returns captured output"""
925
926 self.tmpfile.flush()
927 try:
928 ref_file = getattr(sys, '__%s__' % self.attr)
929 ref_file.flush()
930 except AttributeError:
931 pass
932 if hasattr(self.oldval, 'flush'):
933 self.oldval.flush()
934
935 os.dup2(self._savefd, self.targetfd)
936
937 setattr(sys, self.attr, self.oldval)
938
939 os.close(self._savefd)
940
941 self.tmpfile.seek(0)
942 return self.tmpfile.read()
943
944
945 -def _capture(which='stdout', printonly=None):
946 """private method, should not be called directly
947 (cf. capture_stdout() and capture_stderr())
948 """
949 assert which in ('stdout', 'stderr'
950 ), "Can only capture stdout or stderr, not %s" % which
951 if which == 'stdout':
952 fd = 1
953 else:
954 fd = 2
955 return FDCapture(fd, which, printonly)
956
958 """captures the standard output
959
960 returns a handle object which has a `restore()` method.
961 The restore() method returns the captured stdout and restores it
962 """
963 return _capture('stdout', printonly)
964
966 """captures the standard error output
967
968 returns a handle object which has a `restore()` method.
969 The restore() method returns the captured stderr and restores it
970 """
971 return _capture('stderr', printonly)
972
973
974 -def unittest_main(module='__main__', defaultTest=None,
975 batchmode=False, cvg=None, options=None,
976 outstream=sys.stderr):
977 """use this functon if you want to have the same functionality
978 as unittest.main"""
979 return SkipAwareTestProgram(module, defaultTest, batchmode,
980 cvg, options, outstream)
981
983 """raised when a test is skipped"""
984
986 flags = function.func_code.co_flags
987 return flags & CO_GENERATOR
988
989
991 args = []
992 varargs = ()
993 kwargs = {}
994 flags = 0
995 for param in params:
996 if isinstance(param, starargs):
997 varargs = param
998 if flags:
999 raise TypeError('found starargs after keywords !')
1000 flags |= 2
1001 args += list(varargs)
1002 elif isinstance(param, keywords):
1003 kwargs = param
1004 if flags & 4:
1005 raise TypeError('got multiple keywords parameters')
1006 flags |= 4
1007 elif flags & 2 or flags & 4:
1008 raise TypeError('found parameters after kwargs or args')
1009 else:
1010 args.append(param)
1011
1012 return args, kwargs
1013
1016 instance = tuple.__new__(cls, data)
1017 instance.name = name
1018 return instance
1019
1021 """this is a simple property-like class but for
1022 class attributes.
1023 """
1024
1026 self.getter = getter
1027
1029 "__get__(objn objtype) -> objtype"
1030 return self.getter(objtype)
1031
1032
1034 """unittest.TestCase with some additional methods"""
1035
1036 capture = False
1037 pdbclass = Debugger
1038
1039 - def __init__(self, methodName='runTest'):
1040 super(TestCase, self).__init__(methodName)
1041
1042 if sys.version_info >= (2, 5):
1043 self.__exc_info = self._exc_info
1044 self.__testMethodName = self._testMethodName
1045 else:
1046
1047 self._testMethodName = self.__testMethodName
1048 self._captured_stdout = ""
1049 self._captured_stderr = ""
1050 self._out = []
1051 self._err = []
1052 self._current_test_descr = None
1053 self._options_ = None
1054
1056 """helper attribute holding the standard test's data directory
1057
1058 NOTE: this is a logilab's standard
1059 """
1060 mod = __import__(cls.__module__)
1061 return osp.join(osp.dirname(osp.abspath(mod.__file__)), 'data')
1062
1063
1064 datadir = ClassGetProperty(cached(datadir))
1065
1067 """joins the object's datadir and `fname`"""
1068 return osp.join(self.datadir, fname)
1069
1071 """sets the current test's description.
1072 This can be useful for generative tests because it allows to specify
1073 a description per yield
1074 """
1075 self._current_test_descr = descr
1076
1077
1079 """override default unitest shortDescription to handle correctly
1080 generative tests
1081 """
1082 if self._current_test_descr is not None:
1083 return self._current_test_descr
1084 return super(TestCase, self).shortDescription()
1085
1086
1088 """return a two tuple with standard output and error stripped"""
1089 return self._captured_stdout.strip(), self._captured_stderr.strip()
1090
1092 """start_capture if enable"""
1093 if self.capture:
1094 self.start_capture()
1095
1097 """stop_capture and restore previous output"""
1098 self._force_output_restore()
1099
1101 """start_capture"""
1102 self._out.append(capture_stdout(printonly or self._printonly))
1103 self._err.append(capture_stderr(printonly or self._printonly))
1104
1106 """set the pattern of line to print"""
1107 rgx = re.compile(pattern, flags)
1108 if self._out:
1109 self._out[-1].printonly = rgx
1110 self._err[-1].printonly = rgx
1111 else:
1112 self.start_capture(printonly=rgx)
1113
1115 """stop output and error capture"""
1116 if self._out:
1117 _out = self._out.pop()
1118 _err = self._err.pop()
1119 return _out.restore(), _err.restore()
1120 return '', ''
1121
1123 """remove all capture set"""
1124 while self._out:
1125 self._captured_stdout += self._out.pop().restore()
1126 self._captured_stderr += self._err.pop().restore()
1127
1128 - def quiet_run(self, result, func, *args, **kwargs):
1129 self._start_capture()
1130 try:
1131 func(*args, **kwargs)
1132 except (KeyboardInterrupt, SystemExit):
1133 self._stop_capture()
1134 raise
1135 except:
1136 self._stop_capture()
1137 result.addError(self, self.__exc_info())
1138 return False
1139 self._stop_capture()
1140 return True
1141
1143 """return the test method"""
1144 return getattr(self, self.__testMethodName)
1145
1146
1147 - def optval(self, option, default=None):
1148 """return the option value or default if the option is not define"""
1149 return getattr(self._options_, option, default)
1150
1151 - def __call__(self, result=None, runcondition=None, options=None):
1152 """rewrite TestCase.__call__ to support generative tests
1153 This is mostly a copy/paste from unittest.py (i.e same
1154 variable names, same logic, except for the generative tests part)
1155 """
1156 if result is None:
1157 result = self.defaultTestResult()
1158 result.pdbclass = self.pdbclass
1159
1160
1161 self.capture = self.capture or getattr(result, 'capture', False)
1162 self._options_ = options
1163 self._printonly = getattr(result, 'printonly', None)
1164
1165
1166 testMethod = self._get_test_method()
1167 if runcondition and not runcondition(testMethod):
1168 return
1169 result.startTest(self)
1170 try:
1171 if not self.quiet_run(result, self.setUp):
1172 return
1173
1174 if is_generator(testMethod.im_func):
1175 success = self._proceed_generative(result, testMethod,
1176 runcondition)
1177 else:
1178 status = self._proceed(result, testMethod)
1179 success = (status == 0)
1180 if not self.quiet_run(result, self.tearDown):
1181 return
1182 if success:
1183 if hasattr(options, "exitfirst") and options.exitfirst:
1184
1185 try:
1186 restartfile = open(FILE_RESTART, 'a')
1187 try:
1188 try:
1189 descr = '.'.join((self.__class__.__module__,
1190 self.__class__.__name__,
1191 self._testMethodName))
1192 restartfile.write(descr+os.linesep)
1193 except Exception, e:
1194 raise e
1195 finally:
1196 restartfile.close()
1197 except Exception, e:
1198 print >> sys.__stderr__, "Error while saving \
1199 succeeded test into", osp.join(os.getcwd(),FILE_RESTART)
1200 raise e
1201 result.addSuccess(self)
1202 finally:
1203
1204
1205 result.stopTest(self)
1206
1207
1208
1210
1211 result.testsRun -= 1
1212 self._start_capture()
1213 success = True
1214 try:
1215 for params in testfunc():
1216 if runcondition and not runcondition(testfunc,
1217 skipgenerator=False):
1218 if not (isinstance(params, InnerTest)
1219 and runcondition(params)):
1220 continue
1221 if not isinstance(params, (tuple, list)):
1222 params = (params, )
1223 func = params[0]
1224 args, kwargs = parse_generative_args(params[1:])
1225
1226 result.testsRun += 1
1227 status = self._proceed(result, func, args, kwargs)
1228 if status == 0:
1229 result.addSuccess(self)
1230 success = True
1231 else:
1232 success = False
1233 if status == 2:
1234 result.shouldStop = True
1235 if result.shouldStop:
1236 break
1237 except:
1238
1239 result.addError(self, self.__exc_info())
1240 success = False
1241 self._stop_capture()
1242 return success
1243
1244 - def _proceed(self, result, testfunc, args=(), kwargs=None):
1245 """proceed the actual test
1246 returns 0 on success, 1 on failure, 2 on error
1247
1248 Note: addSuccess can't be called here because we have to wait
1249 for tearDown to be successfully executed to declare the test as
1250 successful
1251 """
1252 self._start_capture()
1253 kwargs = kwargs or {}
1254 try:
1255 testfunc(*args, **kwargs)
1256 self._stop_capture()
1257 except self.failureException:
1258 self._stop_capture()
1259 result.addFailure(self, self.__exc_info())
1260 return 1
1261 except KeyboardInterrupt:
1262 self._stop_capture()
1263 raise
1264 except:
1265 self._stop_capture()
1266 result.addError(self, self.__exc_info())
1267 return 2
1268 return 0
1269
1271 """return a new instance of the defaultTestResult"""
1272 return SkipAwareTestResult()
1273
1274 - def skip(self, msg=None):
1275 """mark a test as skipped for the <msg> reason"""
1276 msg = msg or 'test was skipped'
1277 raise TestSkipped(msg)
1278 skipped_test = deprecated_function(skip)
1279
1281 """assert <object> are in <set>"""
1282 self.assert_(object in set, "%s not in %s" % (object, set))
1283
1285 """assert <object> are not in <set>"""
1286 self.assert_(object not in set, "%s in %s" % (object, set))
1287
1289 """compares two dicts
1290
1291 If the two dict differ, the first difference is shown in the error
1292 message
1293 """
1294 dict1 = dict(dict1)
1295 msgs = []
1296 for key, value in dict2.items():
1297 try:
1298 if dict1[key] != value:
1299 msgs.append('%r != %r for key %r' % (dict1[key], value,
1300 key))
1301 del dict1[key]
1302 except KeyError:
1303 msgs.append('missing %r key' % key)
1304 if dict1:
1305 msgs.append('dict2 is lacking %r' % dict1)
1306 if msgs:
1307 self.fail(''.join(msgs))
1308 assertDictEqual = assertDictEquals
1309
1310
1311
1313 """compares two iterable and shows difference between both"""
1314 got, expected = list(got), list(expected)
1315 self.assertSetEqual(set(got), set(expected), msg)
1316 if len(got) != len(expected):
1317 if msg is None:
1318 msg = ['Iterable have the same elements but not the same number',
1319 '\t<element>\t<expected>i\t<got>']
1320 got_count = {}
1321 expected_count = {}
1322 for element in got:
1323 got_count[element] = got_count.get(element,0) + 1
1324 for element in expected:
1325 expected_count[element] = expected_count.get(element,0) + 1
1326
1327
1328 for element, count in got_count.iteritems():
1329 other_count = expected_count[element]
1330 if other_count != count:
1331 msg.append('\t%s\t%s\t%s' % (element, other_count, count))
1332
1333 self.fail(msg)
1334
1335 assertUnorderedIterableEqual = assertUnorderedIterableEquals
1336 assertUnordIterEquals = assertUnordIterEqual = assertUnorderedIterableEqual
1337
1339 if not(isinstance(got, set) and isinstance(expected, set)):
1340 warn("the assertSetEquals function if now intended for set only."\
1341 "use assertUnorderedIterableEquals instead.",
1342 DeprecationWarning, 2)
1343 return self.assertUnorderedIterableEquals(got,expected, msg)
1344
1345 items={}
1346 items['missing'] = expected - got
1347 items['unexpected'] = got - expected
1348 if any(items.itervalues()):
1349 if msg is None:
1350 msg = '\n'.join('%s:\n\t%s' % (key,"\n\t".join(str(value) for value in values))
1351 for key, values in items.iteritems() if values)
1352 self.fail(msg)
1353
1354
1355 assertSetEqual = assertSetEquals
1356
1358 """compares two lists
1359
1360 If the two list differ, the first difference is shown in the error
1361 message
1362 """
1363 _l1 = list_1[:]
1364 for i, value in enumerate(list_2):
1365 try:
1366 if _l1[0] != value:
1367 from pprint import pprint
1368 pprint(list_1)
1369 pprint(list_2)
1370 self.fail('%r != %r for index %d' % (_l1[0], value, i))
1371 del _l1[0]
1372 except IndexError:
1373 if msg is None:
1374 msg = 'list_1 has only %d elements, not %s '\
1375 '(at least %r missing)'% (i, len(list_2), value)
1376 self.fail(msg)
1377 if _l1:
1378 if msg is None:
1379 msg = 'list_2 is lacking %r' % _l1
1380 self.fail(msg)
1381 assertListEqual = assertListEquals
1382
1384 """assert list of lines are equal"""
1385 self.assertListEquals(list_1.splitlines(), list_2.splitlines(), msg)
1386 assertLineEqual = assertLinesEquals
1387
1398 assertXMLValid = deprecated_function(assertXMLWellFormed,
1399 'assertXMLValid renamed to more precise assertXMLWellFormed')
1400
1405
1406 assertXMLStringValid = deprecated_function(
1407 assertXMLStringWellFormed,
1408 'assertXMLStringValid renamed to more precise assertXMLStringWellFormed'
1409 )
1410
1412 """compare an ElementTree Element to a tuple formatted as follow:
1413 (tagname, [attrib[, children[, text[, tail]]]])"""
1414
1415 self.assertTextEquals(element.tag, tup[0])
1416
1417 if len(element.attrib) or len(tup)>1:
1418 if len(tup)<=1:
1419 self.fail( "tuple %s has no attributes (%s expected)"%(tup,
1420 dict(element.attrib)))
1421 self.assertDictEquals(element.attrib, tup[1])
1422
1423 if len(element) or len(tup)>2:
1424 if len(tup)<=2:
1425 self.fail( "tuple %s has no children (%i expected)"%(tup,
1426 len(element)))
1427 if len(element) != len(tup[2]):
1428 self.fail( "tuple %s has %i children%s (%i expected)"%(tup,
1429 len(tup[2]),
1430 ('', 's')[len(tup[2])>1], len(element)))
1431 for index in xrange(len(tup[2])):
1432 self.assertXMLEqualsTuple(element[index], tup[2][index])
1433
1434 if element.text or len(tup)>3:
1435 if len(tup)<=3:
1436 self.fail( "tuple %s has no text value (%r expected)"%(tup,
1437 element.text))
1438 self.assertTextEquals(element.text, tup[3])
1439
1440 if element.tail or len(tup)>4:
1441 if len(tup)<=4:
1442 self.fail( "tuple %s has no tail value (%r expected)"%(tup,
1443 element.tail))
1444 self.assertTextEquals(element.tail, tup[4])
1445
1446 - def _difftext(self, lines1, lines2, junk=None, msg_prefix='Texts differ'):
1447 junk = junk or (' ', '\t')
1448
1449 result = difflib.ndiff(lines1, lines2, charjunk=lambda x: x in junk)
1450 read = []
1451 for line in result:
1452 read.append(line)
1453
1454 if not line.startswith(' '):
1455 self.fail('\n'.join(['%s\n'%msg_prefix]+read + list(result)))
1456
1457 - def assertTextEquals(self, text1, text2, junk=None,
1458 msg_prefix='Text differ'):
1459 """compare two multiline strings (using difflib and splitlines())"""
1460 msg = []
1461 if not isinstance(text1, basestring):
1462 msg.append('text1 is not a string (%s)'%(type(text1)))
1463 if not isinstance(text2, basestring):
1464 msg.append('text2 is not a string (%s)'%(type(text2)))
1465 if msg:
1466 self.fail('\n'.join(msg))
1467 self._difftext(text1.splitlines(True), text2.splitlines(True), junk,
1468 msg_prefix)
1469 assertTextEqual = assertTextEquals
1470
1471 - def assertStreamEquals(self, stream1, stream2, junk=None,
1472 msg_prefix='Stream differ'):
1473 """compare two streams (using difflib and readlines())"""
1474
1475
1476 if stream1 is stream2:
1477 return
1478
1479 stream1.seek(0)
1480 stream2.seek(0)
1481
1482 self._difftext(stream1.readlines(), stream2.readlines(), junk,
1483 msg_prefix)
1484
1485 assertStreamEqual = assertStreamEquals
1487 """compares two files using difflib"""
1488 self.assertStreamEqual(file(fname1), file(fname2), junk,
1489 msg_prefix='Files differs\n-:%s\n+:%s\n'%(fname1, fname2))
1490 assertFileEqual = assertFileEquals
1491
1492
1494 """compares two files using difflib"""
1495 assert osp.exists(path_a), "%s doesn't exists" % path_a
1496 assert osp.exists(path_b), "%s doesn't exists" % path_b
1497
1498 all_a = [ (ipath[len(path_a):].lstrip('/'), idirs, ifiles)
1499 for ipath, idirs, ifiles in os.walk(path_a)]
1500 all_a.sort(key=itemgetter(0))
1501
1502 all_b = [ (ipath[len(path_b):].lstrip('/'), idirs, ifiles)
1503 for ipath, idirs, ifiles in os.walk(path_b)]
1504 all_b.sort(key=itemgetter(0))
1505
1506 iter_a, iter_b = iter(all_a), iter(all_b)
1507 partial_iter = True
1508 ipath_a, idirs_a, ifiles_a = data_a = None, None, None
1509 while True:
1510 try:
1511 ipath_a, idirs_a, ifiles_a = datas_a = iter_a.next()
1512 partial_iter = False
1513 ipath_b, idirs_b, ifiles_b = datas_b = iter_b.next()
1514 partial_iter = True
1515
1516
1517 self.assert_(ipath_a == ipath_b,
1518 "unexpected %s in %s while looking %s from %s" %
1519 (ipath_a, path_a, ipath_b, path_b))
1520
1521
1522 errors = {}
1523 sdirs_a = set(idirs_a)
1524 sdirs_b = set(idirs_b)
1525 errors["unexpected directories"] = sdirs_a - sdirs_b
1526 errors["missing directories"] = sdirs_b - sdirs_a
1527
1528 sfiles_a = set(ifiles_a)
1529 sfiles_b = set(ifiles_b)
1530 errors["unexpected files"] = sfiles_a - sfiles_b
1531 errors["missing files"] = sfiles_b - sfiles_a
1532
1533
1534 msgs = [ "%s: %s"% (name, items)
1535 for name, items in errors.iteritems() if items]
1536
1537 if msgs:
1538 msgs.insert(0,"%s and %s differ :" % (
1539 osp.join(path_a, ipath_a),
1540 osp.join(path_b, ipath_b),
1541 ))
1542 self.fail("\n".join(msgs))
1543
1544 for files in (ifiles_a, ifiles_b):
1545 files.sort()
1546
1547 for index, path in enumerate(ifiles_a):
1548 self.assertFileEquals(osp.join(path_a, ipath_a, path),
1549 osp.join(path_b, ipath_b, ifiles_b[index]))
1550
1551 except StopIteration:
1552 break
1553
1554
1555 assertDirEqual = assertDirEquals
1556
1557
1559 """compares two files using difflib"""
1560 if msg is None:
1561 if strict:
1562 msg = '%r is not of class %s but of %s'
1563 else:
1564 msg = '%r is not an instance of %s but of %s'
1565 msg = msg % (obj, klass, type(obj))
1566 if strict:
1567 self.assert_(obj.__class__ is klass, msg)
1568 else:
1569 self.assert_(isinstance(obj, klass), msg)
1570
1571 - def assertIs(self, obj, other, msg=None):
1572 """compares identity of two reference"""
1573 if msg is None:
1574 msg = "%r is not %r"%(obj, other)
1575 self.assert_(obj is other, msg)
1576
1577
1579 """compares identity of two reference"""
1580 if msg is None:
1581 msg = "%r is %r"%(obj, other)
1582 self.assert_(obj is not other, msg )
1583
1585 """assert obj is None"""
1586 if msg is None:
1587 msg = "reference to %r when None expected"%(obj,)
1588 self.assert_( obj is None, msg )
1589
1591 """assert obj is not None"""
1592 if msg is None:
1593 msg = "unexpected reference to None"
1594 self.assert_( obj is not None, msg )
1595
1597 """override default failUnlessRaise method to return the raised
1598 exception instance.
1599
1600 Fail unless an exception of class excClass is thrown
1601 by callableObj when invoked with arguments args and keyword
1602 arguments kwargs. If a different type of exception is
1603 thrown, it will not be caught, and the test case will be
1604 deemed to have suffered an error, exactly as for an
1605 unexpected exception.
1606 """
1607 try:
1608 callableObj(*args, **kwargs)
1609 except excClass, exc:
1610 return exc
1611 else:
1612 if hasattr(excClass, '__name__'):
1613 excName = excClass.__name__
1614 else:
1615 excName = str(excClass)
1616 raise self.failureException, "%s not raised" % excName
1617
1618 assertRaises = failUnlessRaises
1619
1620 import doctest
1621
1624 """just there to trigger test execution"""
1625 self.skipped_test('doctest module has no DocTestSuite class')
1626
1627
1628
1629 if sys.version_info >= (2, 4):
1631
1633 self.skipped = kwargs.pop('skipped', ())
1634 doctest.DocTestFinder.__init__(self, *args, **kwargs)
1635
1636 - def _get_test(self, obj, name, module, globs, source_lines):
1637 """override default _get_test method to be able to skip tests
1638 according to skipped attribute's value
1639
1640 Note: Python (<=2.4) use a _name_filter which could be used for that
1641 purpose but it's no longer available in 2.5
1642 Python 2.5 seems to have a [SKIP] flag
1643 """
1644 if getattr(obj, '__name__', '') in self.skipped:
1645 return None
1646 return doctest.DocTestFinder._get_test(self, obj, name, module,
1647 globs, source_lines)
1648 else:
1649
1652 self.skipped = skipped
1653 self.original_find_tests = doctest._find_tests
1654 doctest._find_tests = self._find_tests
1655
1657 tests = []
1658 for testinfo in self.original_find_tests(module, prefix):
1659 testname, _, _, _ = testinfo
1660
1661 testname = testname.split('.')[-1]
1662 if testname not in self.skipped:
1663 tests.append(testinfo)
1664 return tests
1665
1666
1668 """trigger module doctest
1669 I don't know how to make unittest.main consider the DocTestSuite instance
1670 without this hack
1671 """
1672 skipped = ()
1673 - def __call__(self, result=None, runcondition=None, options=None):\
1674
1675 try:
1676 finder = DocTestFinder(skipped=self.skipped)
1677 if sys.version_info >= (2, 4):
1678 suite = doctest.DocTestSuite(self.module, test_finder=finder)
1679 else:
1680 suite = doctest.DocTestSuite(self.module)
1681 except AttributeError:
1682 suite = SkippedSuite()
1683 return suite.run(result)
1684 run = __call__
1685
1687 """just there to trigger test execution"""
1688
1689 MAILBOX = None
1690
1692 """fake smtplib.SMTP"""
1693
1695 self.host = host
1696 self.port = port
1697 global MAILBOX
1698 self.reveived = MAILBOX = []
1699
1701 """ignore debug level"""
1702
1703 - def sendmail(self, fromaddr, toaddres, body):
1704 """push sent mail in the mailbox"""
1705 self.reveived.append((fromaddr, toaddres, body))
1706
1709
1710
1712 """fake ConfigParser.ConfigParser"""
1713
1715 ConfigParser.__init__(self)
1716 for section, pairs in options.iteritems():
1717 self.add_section(section)
1718 for key, value in pairs.iteritems():
1719 self.set(section,key,value)
1721 raise NotImplementedError()
1722
1723
1725 """fake DB-API 2.0 connexion AND cursor (i.e. cursor() return self)"""
1726
1728 self.received = []
1729 self.states = []
1730 self.results = results
1731
1733 """Mock cursor method"""
1734 return self
1735 - def execute(self, query, args=None):
1736 """Mock execute method"""
1737 self.received.append( (query, args) )
1739 """Mock fetchone method"""
1740 return self.results[0]
1742 """Mock fetchall method"""
1743 return self.results
1745 """Mock commiy method"""
1746 self.states.append( ('commit', len(self.received)) )
1748 """Mock rollback method"""
1749 self.states.append( ('rollback', len(self.received)) )
1751 """Mock close method"""
1752 pass
1753
1754 MockConnexion = class_renamed('MockConnexion', MockConnection)
1755
1757 """creates an object using params to set attributes
1758 >>> option = mock_object(verbose=False, index=range(5))
1759 >>> option.verbose
1760 False
1761 >>> option.index
1762 [0, 1, 2, 3, 4]
1763 """
1764 return type('Mock', (), params)()
1765
1766
1768 """Creates directories and files found in <path>.
1769
1770 :param paths: list of relative paths to files or directories
1771 :param chroot: the root directory in which paths will be created
1772
1773 >>> from os.path import isdir, isfile
1774 >>> isdir('/tmp/a')
1775 False
1776 >>> create_files(['a/b/foo.py', 'a/b/c/', 'a/b/c/d/e.py'], '/tmp')
1777 >>> isdir('/tmp/a')
1778 True
1779 >>> isdir('/tmp/a/b/c')
1780 True
1781 >>> isfile('/tmp/a/b/c/d/e.py')
1782 True
1783 >>> isfile('/tmp/a/b/foo.py')
1784 True
1785 """
1786 dirs, files = set(), set()
1787 for path in paths:
1788 path = osp.join(chroot, path)
1789 filename = osp.basename(path)
1790
1791 if filename == '':
1792 dirs.add(path)
1793
1794 else:
1795 dirs.add(osp.dirname(path))
1796 files.add(path)
1797 for dirpath in dirs:
1798 if not osp.isdir(dirpath):
1799 os.makedirs(dirpath)
1800 for filepath in files:
1801 file(filepath, 'w').close()
1802
1804 """
1805 Without arguments, return True if contracts can be enabled and should be
1806 enabled (see option -d), return False otherwise.
1807
1808 With arguments, return False if contracts can't or shouldn't be enabled,
1809 otherwise weave ContractAspect with items passed as arguments.
1810 """
1811 if not ENABLE_DBC:
1812 return False
1813 try:
1814 from logilab.aspects.weaver import weaver
1815 from logilab.aspects.lib.contracts import ContractAspect
1816 except ImportError:
1817 sys.stderr.write(
1818 'Warning: logilab.aspects is not available. Contracts disabled.')
1819 return False
1820 for arg in args:
1821 weaver.weave_module(arg, ContractAspect)
1822 return True
1823
1824
1827 self.__dict__.update(kwargs)
1828
1830 """descriptor adding tag to a function"""
1831 def desc(func):
1832 assert not hasattr(func, 'tags')
1833 func.tags = Tags(args)
1834 return func
1835 return desc
1836
1844
1846 """ Compare version of python interpretor to the given one. Skip the test
1847 if older.
1848 """
1849 def check_require_version(f):
1850 version_elements = version.split('.')
1851 try:
1852 compare = tuple([int(v) for v in version_elements])
1853 except ValueError:
1854 raise ValueError('%s is not a correct version : should be X.Y[.Z].' % version)
1855 current = sys.version_info[:3]
1856
1857 if current < compare:
1858
1859 def new_f(self, *args, **kwargs):
1860 self.skip('Need at least %s version of python. Current version is %s.' % (version, '.'.join([str(element) for element in current])))
1861 new_f.__name__ = f.__name__
1862 return new_f
1863 else:
1864
1865 return f
1866 return check_require_version
1867
1869 """ Check if the given module is loaded. Skip the test if not.
1870 """
1871 def check_require_module(f):
1872 try:
1873 __import__(module)
1874
1875 return f
1876 except ImportError:
1877
1878 def new_f(self, *args, **kwargs):
1879 self.skip('%s can not be imported.' % module)
1880 new_f.__name__ = f.__name__
1881 return new_f
1882 return check_require_module
1883