Package logilab-common-0 :: Package 36 :: Package 1 :: Module umessage
[frames] | no frames]

Source Code for Module logilab-common-0.36.1.umessage

  1  """Unicode email support (extends email from stdlib). 
  2   
  3  :copyright: 2000-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved. 
  4  :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr 
  5  :license: General Public License version 2 - http://www.gnu.org/licenses 
  6  """ 
  7  __docformat__ = "restructuredtext en" 
  8   
  9  import email 
 10  from encodings import search_function 
 11  from email.Utils import parseaddr, parsedate 
 12  from email.Header import decode_header 
 13   
 14  try: 
 15      from mx.DateTime import DateTime 
 16  except ImportError: 
17 - def DateTime(*args): return None
18
19 -def decode_QP(string):
20 parts = [] 21 for decoded, charset in decode_header(string): 22 if charset is None: 23 charset = 'iso-8859-15' 24 parts.append(unicode(decoded, charset, 'replace')) 25 26 return u' '.join(parts)
27
28 -def message_from_file(fd):
29 try: 30 return UMessage(email.message_from_file(fd)) 31 except email.Errors.MessageParseError: 32 return ''
33
34 -def message_from_string(string):
35 try: 36 return UMessage(email.message_from_string(string)) 37 except email.Errors.MessageParseError: 38 return ''
39
40 -class UMessage:
41 """Encapsulates an email.Message instance and returns only unicode objects. 42 """ 43
44 - def __init__(self, message):
45 self.message = message
46 47 # email.Message interface ################################################# 48
49 - def get(self, header, default=None):
50 value = self.message.get(header, default) 51 if value: 52 return decode_QP(value) 53 return value
54
55 - def get_all(self, header, default=()):
56 return [decode_QP(val) for val in self.message.get_all(header, default) 57 if val is not None]
58
59 - def get_payload(self, index=None, decode=False):
60 message = self.message 61 if index is None: 62 payload = message.get_payload(index, decode) 63 if isinstance(payload, list): 64 return [UMessage(msg) for msg in payload] 65 if message.get_content_maintype() != 'text': 66 return payload 67 68 charset = message.get_content_charset() or 'iso-8859-1' 69 if search_function(charset) is None: 70 charset = 'iso-8859-1' 71 return unicode(payload or '', charset, "replace") 72 else: 73 payload = UMessage(message.get_payload(index, decode)) 74 return payload
75
76 - def is_multipart(self):
77 return self.message.is_multipart()
78
79 - def get_boundary(self):
80 return self.message.get_boundary()
81
82 - def walk(self):
83 for part in self.message.walk(): 84 yield UMessage(part)
85
86 - def get_content_maintype(self):
87 return unicode(self.message.get_content_maintype())
88
89 - def get_content_type(self):
90 return unicode(self.message.get_content_type())
91
92 - def get_filename(self, failobj=None):
93 value = self.message.get_filename(failobj) 94 if value is failobj: 95 return value 96 try: 97 return unicode(value) 98 except UnicodeDecodeError: 99 return u'error decoding filename'
100 101 # other convenience methods ############################################### 102
103 - def headers(self):
104 """return an unicode string containing all the message's headers""" 105 values = [] 106 for header in self.message.keys(): 107 values.append(u'%s: %s' % (header, self.get(header))) 108 return '\n'.join(values)
109
110 - def multi_addrs(self, header):
111 """return a list of 2-uple (name, address) for the given address (which 112 is exepected to be an header containing address such as from, to, cc...) 113 """ 114 persons = [] 115 for person in self.get_all(header, ()): 116 name, mail = parseaddr(person) 117 persons.append((name, mail)) 118 return persons
119
120 - def date(self, alternative_source=False, return_str=False):
121 """return a mx.DateTime object for the email's date or None if no date is 122 set or if it can't be parsed 123 """ 124 value = self.get('date') 125 if value is None and alternative_source: 126 unix_from = self.message.get_unixfrom() 127 if unix_from is not None: 128 try: 129 value = unix_from.split(" ", 2)[2] 130 except IndexError: 131 pass 132 if value is not None: 133 datetuple = parsedate(value) 134 if datetuple: 135 return DateTime(*datetuple[:6]) 136 elif not return_str: 137 return None 138 return value
139