|
import py |
from py.__.apigen.tracer import model |
from py.__.code.source import getsource |
|
import types |
import inspect |
import copy |
|
MAX_CALL_SITES = 20 |
|
set = py.builtin.set |
|
def is_private(name): |
return name.startswith('_') and not name.startswith('__') |
|
class CallFrame(object): |
def __init__(self, frame): |
self.filename = frame.code.raw.co_filename |
self.lineno = frame.lineno |
self.firstlineno = frame.code.firstlineno |
try: |
self.source = getsource(frame.code.raw) |
except IOError: |
self.source = "could not get to source" |
|
def _getval(self): |
return (self.filename, self.lineno) |
|
def __hash__(self): |
return hash(self._getval()) |
|
def __eq__(self, other): |
return self._getval() == other._getval() |
|
def __ne__(self, other): |
return not self == other |
|
class CallStack(object): |
def __init__(self, tb): |
|
|
|
|
self.tb = [CallFrame(frame) for frame in tb] |
|
|
|
|
|
def __hash__(self): |
return hash(tuple(self.tb)) |
|
def __eq__(self, other): |
return self.tb == other.tb |
|
def __ne__(self, other): |
return not self == other |
|
|
|
|
def __iter__(self): |
return iter(self.tb) |
|
def __getitem__(self, item): |
return self.tb[item] |
|
def __len__(self): |
return len(self.tb) |
|
def __cmp__(self, other): |
return cmp(self.tb, other.tb) |
|
def cut_stack(stack, frame, upward_frame=None): |
if hasattr(frame, 'raw'): |
frame = frame.raw |
if upward_frame: |
if hasattr(upward_frame, 'raw'): |
upward_frame = upward_frame.raw |
lst = [py.code.Frame(i) for i in stack[stack.index(frame):\ |
stack.index(upward_frame)+1]] |
if len(lst) > 1: |
return CallStack(lst[:-1]) |
return CallStack(lst) |
return CallStack([py.code.Frame(i) for i in stack[stack.index(frame):]]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class NonHashableObject(object): |
def __init__(self, cls): |
self.cls = cls |
|
def __hash__(self): |
raise NotImplementedError("Object of type %s are unhashable" % self.cls) |
|
class Desc(object): |
def __init__(self, name, pyobj, **kwargs): |
self.pyobj = pyobj |
self.is_degenerated = False |
self.name = name |
if type(self) is Desc: |
|
self.code = NonHashableObject(self.__class__) |
|
|
|
def __hash__(self): |
return hash(self.code) |
|
def __eq__(self, other): |
if isinstance(other, Desc): |
return self.code == other.code |
if isinstance(other, types.CodeType): |
return self.code == other |
if isinstance(other, tuple) and len(other) == 2: |
return self.code == other |
return False |
|
def __ne__(self, other): |
return not self == other |
|
|
|
class FunctionDesc(Desc): |
def __init__(self, *args, **kwargs): |
super(FunctionDesc, self).__init__(*args, **kwargs) |
self.inputcells = [model.s_ImpossibleValue for i in xrange(self.\ |
code.co_argcount)] |
self.call_sites = {} |
self.keep_frames = kwargs.get('keep_frames', False) |
self.frame_copier = kwargs.get('frame_copier', lambda x:x) |
self.retval = model.s_ImpossibleValue |
self.exceptions = {} |
|
def consider_call(self, inputcells): |
for cell_num, cell in enumerate(inputcells): |
self.inputcells[cell_num] = model.unionof(cell, self.inputcells[cell_num]) |
|
def consider_call_site(self, frame, cut_frame): |
if len(self.call_sites) > MAX_CALL_SITES: |
return |
stack = [i[0] for i in inspect.stack()] |
cs = cut_stack(stack, frame, cut_frame) |
self.call_sites[cs] = cs |
|
def consider_exception(self, exc, value): |
self.exceptions[exc] = True |
|
def get_call_sites(self): |
|
if not self.keep_frames: |
return [(key, val) for key, val in self.call_sites.iteritems()] |
else: |
lst = [] |
for key, val in self.call_sites.iteritems(): |
for frame in val: |
lst.append((key, frame)) |
return lst |
|
def consider_return(self, arg): |
self.retval = model.unionof(arg, self.retval) |
|
def consider_start_locals(self, frame): |
pass |
|
def consider_end_locals(self, frame): |
pass |
|
def getcode(self): |
return self.pyobj.func_code |
code = property(getcode) |
|
def get_local_changes(self): |
return {} |
|
class ClassDesc(Desc): |
def __init__(self, *args, **kwargs): |
super(ClassDesc, self).__init__(*args, **kwargs) |
self.fields = {} |
|
|
|
def getcode(self): |
|
|
if hasattr(self.pyobj, '__init__'): |
if hasattr(self.pyobj.__init__, 'im_func') and \ |
hasattr(self.pyobj.__init__.im_func, 'func_code'): |
result = self.pyobj.__init__.im_func.func_code |
else: |
result = self.pyobj.__init__ |
else: |
result = self.pyobj |
try: |
hash(result) |
except KeyboardInterrupt, SystemExit: |
raise |
except: |
try: |
hash(self.pyobj) |
result = self.pyobj |
except: |
result = self |
return result |
code = property(getcode) |
|
def consider_call(self, inputcells): |
if '__init__' in self.fields: |
md = self.fields['__init__'] |
else: |
md = MethodDesc(self.name + '.__init__', self.pyobj.__init__) |
self.fields['__init__'] = md |
md.consider_call(inputcells) |
|
def consider_return(self, arg): |
pass |
|
def consider_exception(self, exc, value): |
if '__init__' in self.fields: |
md = self.fields['__init__'] |
else: |
md = MethodDesc(self.name + '.__init__', self.pyobj.__init__) |
self.fields['__init__'] = md |
md.consider_exception(exc, value) |
|
def consider_start_locals(self, frame): |
if '__init__' in self.fields: |
md = self.fields['__init__'] |
md.consider_start_locals(frame) |
|
def consider_end_locals(self, frame): |
if '__init__' in self.fields: |
md = self.fields['__init__'] |
md.consider_end_locals(frame) |
|
def consider_call_site(self, frame, cut_frame): |
self.fields['__init__'].consider_call_site(frame, cut_frame) |
|
def add_method_desc(self, name, methoddesc): |
self.fields[name] = methoddesc |
|
def getfields(self): |
|
l = [i for i, v in self.fields.iteritems() if not is_private(i)] |
return l |
|
def getbases(self): |
bases = [] |
tovisit = [self.pyobj] |
while tovisit: |
current = tovisit.pop() |
if current is not self.pyobj: |
bases.append(current) |
tovisit += [b for b in current.__bases__ if b not in bases] |
return bases |
bases = property(getbases) |
|
|
|
|
|
|
|
|
|
class MethodDesc(FunctionDesc): |
def __init__(self, *args, **kwargs): |
super(MethodDesc, self).__init__(*args, **kwargs) |
self.old_dict = {} |
self.changeset = {} |
|
|
def getcode(self): |
return self.pyobj.im_func.func_code |
code = property(getcode) |
|
|
|
def __hash__(self): |
return hash((self.code, self.pyobj.im_class)) |
|
def __eq__(self, other): |
if isinstance(other, tuple): |
return self.code is other[0] and self.pyobj.im_class is other[1] |
if isinstance(other, MethodDesc): |
return self.pyobj is other.pyobj |
return False |
|
def consider_start_locals(self, frame): |
|
obj = frame.f_locals[self.pyobj.im_func.func_code.co_varnames[0]] |
try: |
if not obj: |
|
return |
except AttributeError: |
return |
self.old_dict = self.perform_dict_copy(obj.__dict__) |
|
def perform_dict_copy(self, d): |
if d is None: |
return {} |
return d.copy() |
|
def consider_end_locals(self, frame): |
obj = frame.f_locals[self.pyobj.im_func.func_code.co_varnames[0]] |
try: |
if not obj: |
|
return |
except AttributeError: |
return |
|
|
self.update_changeset(obj.__dict__) |
|
def get_local_changes(self): |
return self.changeset |
|
def set_changeset(changeset, key, value): |
if key not in changeset: |
changeset[key] = set([value]) |
else: |
changeset[key].add(value) |
set_changeset = staticmethod(set_changeset) |
|
def update_changeset(self, new_dict): |
changeset = self.changeset |
for k, v in self.old_dict.iteritems(): |
if k not in new_dict: |
self.set_changeset(changeset, k, "deleted") |
elif new_dict[k] != v: |
self.set_changeset(changeset, k, "changed") |
for k, v in new_dict.iteritems(): |
if k not in self.old_dict: |
self.set_changeset(changeset, k, "created") |
return changeset |
|
|