Package logilab :: Package common :: Module umessage
[frames] | no frames]

Source Code for Module logilab.common.umessage

  1  # copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved. 
  2  # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr 
  3  # 
  4  # This file is part of logilab-common. 
  5  # 
  6  # logilab-common is free software: you can redistribute it and/or modify it under 
  7  # the terms of the GNU Lesser General Public License as published by the Free 
  8  # Software Foundation, either version 2.1 of the License, or (at your option) any 
  9  # later version. 
 10  # 
 11  # logilab-common is distributed in the hope that it will be useful, but WITHOUT 
 12  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
 13  # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more 
 14  # details. 
 15  # 
 16  # You should have received a copy of the GNU Lesser General Public License along 
 17  # with logilab-common.  If not, see <http://www.gnu.org/licenses/>. 
 18  """Unicode email support (extends email from stdlib). 
 19   
 20   
 21   
 22   
 23  """ 
 24  __docformat__ = "restructuredtext en" 
 25   
 26  import email 
 27  from encodings import search_function 
 28  import sys 
 29  if sys.version_info >= (2, 5): 
 30      from email.utils import parseaddr, parsedate 
 31      from email.header import decode_header 
 32  else: 
 33      from email.Utils import parseaddr, parsedate 
 34      from email.Header import decode_header 
 35   
 36  from datetime import datetime 
 37   
 38  try: 
 39      from mx.DateTime import DateTime 
 40  except ImportError: 
 41      DateTime = datetime 
 42   
 43  import logilab.common as lgc 
 44   
 45   
46 -def decode_QP(string):
47 parts = [] 48 for decoded, charset in decode_header(string): 49 if not charset : 50 charset = 'iso-8859-15' 51 parts.append(unicode(decoded, charset, 'replace')) 52 53 return u' '.join(parts)
54
55 -def message_from_file(fd):
56 try: 57 return UMessage(email.message_from_file(fd)) 58 except email.Errors.MessageParseError: 59 return ''
60
61 -def message_from_string(string):
62 try: 63 return UMessage(email.message_from_string(string)) 64 except email.Errors.MessageParseError: 65 return ''
66
67 -class UMessage:
68 """Encapsulates an email.Message instance and returns only unicode objects. 69 """ 70
71 - def __init__(self, message):
72 self.message = message
73 74 # email.Message interface ################################################# 75
76 - def get(self, header, default=None):
77 value = self.message.get(header, default) 78 if value: 79 return decode_QP(value) 80 return value
81
82 - def get_all(self, header, default=()):
83 return [decode_QP(val) for val in self.message.get_all(header, default) 84 if val is not None]
85
86 - def get_payload(self, index=None, decode=False):
87 message = self.message 88 if index is None: 89 payload = message.get_payload(index, decode) 90 if isinstance(payload, list): 91 return [UMessage(msg) for msg in payload] 92 if message.get_content_maintype() != 'text': 93 return payload 94 95 charset = message.get_content_charset() or 'iso-8859-1' 96 if search_function(charset) is None: 97 charset = 'iso-8859-1' 98 return unicode(payload or '', charset, "replace") 99 else: 100 payload = UMessage(message.get_payload(index, decode)) 101 return payload
102
103 - def is_multipart(self):
104 return self.message.is_multipart()
105
106 - def get_boundary(self):
107 return self.message.get_boundary()
108
109 - def walk(self):
110 for part in self.message.walk(): 111 yield UMessage(part)
112
113 - def get_content_maintype(self):
114 return unicode(self.message.get_content_maintype())
115
116 - def get_content_type(self):
117 return unicode(self.message.get_content_type())
118
119 - def get_filename(self, failobj=None):
120 value = self.message.get_filename(failobj) 121 if value is failobj: 122 return value 123 try: 124 return unicode(value) 125 except UnicodeDecodeError: 126 return u'error decoding filename'
127 128 # other convenience methods ############################################### 129
130 - def headers(self):
131 """return an unicode string containing all the message's headers""" 132 values = [] 133 for header in self.message.keys(): 134 values.append(u'%s: %s' % (header, self.get(header))) 135 return '\n'.join(values)
136
137 - def multi_addrs(self, header):
138 """return a list of 2-uple (name, address) for the given address (which 139 is expected to be an header containing address such as from, to, cc...) 140 """ 141 persons = [] 142 for person in self.get_all(header, ()): 143 name, mail = parseaddr(person) 144 persons.append((name, mail)) 145 return persons
146
147 - def date(self, alternative_source=False, return_str=False):
148 """return a datetime object for the email's date or None if no date is 149 set or if it can't be parsed 150 """ 151 value = self.get('date') 152 if value is None and alternative_source: 153 unix_from = self.message.get_unixfrom() 154 if unix_from is not None: 155 try: 156 value = unix_from.split(" ", 2)[2] 157 except IndexError: 158 pass 159 if value is not None: 160 datetuple = parsedate(value) 161 if datetuple: 162 if lgc.USE_MX_DATETIME: 163 return DateTime(*datetuple[:6]) 164 return datetime(*datetuple[:6]) 165 elif not return_str: 166 return None 167 return value
168