Package logilab-common-0 :: Package 36 :: Package 1 :: Module sqlgen
[frames] | no frames]

Source Code for Module logilab-common-0.36.1.sqlgen

  1  """Help to generate SQL strings usable by the Python DB-API. 
  2   
  3  :author: Logilab 
  4  :copyright: 2000-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved. 
  5  :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr 
  6  :license: General Public License version 2 - http://www.gnu.org/licenses 
  7  """ 
  8  __docformat__ = "restructuredtext en" 
  9   
 10  # SQLGenerator ################################################################ 
 11   
12 -class SQLGenerator :
13 """ 14 Helper class to generate SQL strings to use with python's DB-API. 15 """ 16
17 - def where(self, keys, addon=None) :
18 """ 19 :param keys: list of keys 20 21 >>> s = SQLGenerator() 22 >>> s.where(['nom']) 23 'nom = %(nom)s' 24 >>> s.where(['nom','prenom']) 25 'nom = %(nom)s AND prenom = %(prenom)s' 26 >>> s.where(['nom','prenom'], 'x.id = y.id') 27 'x.id = y.id AND nom = %(nom)s AND prenom = %(prenom)s' 28 """ 29 restriction = ["%s = %%(%s)s" % (x, x) for x in keys] 30 if addon: 31 restriction.insert(0, addon) 32 return " AND ".join(restriction)
33
34 - def set(self, keys) :
35 """ 36 :param keys: list of keys 37 38 >>> s = SQLGenerator() 39 >>> s.set(['nom']) 40 'nom = %(nom)s' 41 >>> s.set(['nom','prenom']) 42 'nom = %(nom)s, prenom = %(prenom)s' 43 """ 44 return ", ".join(["%s = %%(%s)s" % (x, x) for x in keys])
45
46 - def insert(self, table, params) :
47 """ 48 :param table: name of the table 49 :param params: dictionnary that will be used as in cursor.execute(sql,params) 50 51 >>> s = SQLGenerator() 52 >>> s.insert('test',{'nom':'dupont'}) 53 'INSERT INTO test ( nom ) VALUES ( %(nom)s )' 54 >>> s.insert('test',{'nom':'dupont','prenom':'jean'}) 55 'INSERT INTO test ( nom, prenom ) VALUES ( %(nom)s, %(prenom)s )' 56 """ 57 keys = ', '.join(params.keys()) 58 values = ', '.join(["%%(%s)s" % x for x in params]) 59 sql = 'INSERT INTO %s ( %s ) VALUES ( %s )' % (table, keys, values) 60 return sql
61
62 - def select(self, table, params) :
63 """ 64 :param table: name of the table 65 :param params: dictionnary that will be used as in cursor.execute(sql,params) 66 67 >>> s = SQLGenerator() 68 >>> s.select('test',{}) 69 'SELECT * FROM test' 70 >>> s.select('test',{'nom':'dupont'}) 71 'SELECT * FROM test WHERE nom = %(nom)s' 72 >>> s.select('test',{'nom':'dupont','prenom':'jean'}) 73 'SELECT * FROM test WHERE nom = %(nom)s AND prenom = %(prenom)s' 74 """ 75 sql = 'SELECT * FROM %s' % table 76 where = self.where(params.keys()) 77 if where : 78 sql = sql + ' WHERE %s' % where 79 return sql
80
81 - def adv_select(self, model, tables, params, joins=None) :
82 """ 83 :param model: list of columns to select 84 :param tables: list of tables used in from 85 :param params: dictionnary that will be used as in cursor.execute(sql, params) 86 :param joins: optional list of restriction statements to insert in the 87 where clause. Usually used to perform joins. 88 89 >>> s = SQLGenerator() 90 >>> s.adv_select(['column'],[('test', 't')], {}) 91 'SELECT column FROM test AS t' 92 >>> s.adv_select(['column'],[('test', 't')], {'nom':'dupont'}) 93 'SELECT column FROM test AS t WHERE nom = %(nom)s' 94 """ 95 table_names = ["%s AS %s" % (k, v) for k, v in tables] 96 sql = 'SELECT %s FROM %s' % (', '.join(model), ', '.join(table_names)) 97 if joins and type(joins) != type(''): 98 joins = ' AND '.join(joins) 99 where = self.where(params.keys(), joins) 100 if where : 101 sql = sql + ' WHERE %s' % where 102 return sql
103
104 - def delete(self, table, params) :
105 """ 106 :param table: name of the table 107 :param params: dictionnary that will be used as in cursor.execute(sql,params) 108 109 >>> s = SQLGenerator() 110 >>> s.delete('test',{'nom':'dupont'}) 111 'DELETE FROM test WHERE nom = %(nom)s' 112 >>> s.delete('test',{'nom':'dupont','prenom':'jean'}) 113 'DELETE FROM test WHERE nom = %(nom)s AND prenom = %(prenom)s' 114 """ 115 where = self.where(params.keys()) 116 sql = 'DELETE FROM %s WHERE %s' % (table, where) 117 return sql
118
119 - def update(self, table, params, unique) :
120 """ 121 :param table: name of the table 122 :param params: dictionnary that will be used as in cursor.execute(sql,params) 123 124 >>> s = SQLGenerator() 125 >>> s.update('test', {'id':'001','nom':'dupont'}, ['id']) 126 'UPDATE test SET nom = %(nom)s WHERE id = %(id)s' 127 >>> s.update('test',{'id':'001','nom':'dupont','prenom':'jean'},['id']) 128 'UPDATE test SET nom = %(nom)s, prenom = %(prenom)s WHERE id = %(id)s' 129 """ 130 where = self.where(unique) 131 set = self.set([key for key in params if key not in unique]) 132 sql = 'UPDATE %s SET %s WHERE %s' % (table, set, where) 133 return sql
134
135 -class BaseTable:
136 """ 137 Another helper class to ease SQL table manipulation. 138 """ 139 # table_name = "default" 140 # supported types are s/i/d 141 # table_fields = ( ('first_field','s'), ) 142 # primary_key = 'first_field' 143
144 - def __init__(self, table_name, table_fields, primary_key=None):
145 if primary_key is None: 146 self._primary_key = table_fields[0][0] 147 else: 148 self._primary_key = primary_key 149 150 self._table_fields = table_fields 151 self._table_name = table_name 152 info = { 153 'key' : self._primary_key, 154 'table' : self._table_name, 155 'columns' : ",".join( [ f for f,t in self._table_fields ] ), 156 'values' : ",".join( [sql_repr(t, "%%(%s)s" % f) 157 for f,t in self._table_fields] ), 158 'updates' : ",".join( ["%s=%s" % (f, sql_repr(t, "%%(%s)s" % f)) 159 for f,t in self._table_fields] ), 160 } 161 self._insert_stmt = ("INSERT into %(table)s (%(columns)s) " 162 "VALUES (%(values)s) WHERE %(key)s=%%(key)s") % info 163 self._update_stmt = ("UPDATE %(table)s SET (%(updates)s) " 164 "VALUES WHERE %(key)s=%%(key)s") % info 165 self._select_stmt = ("SELECT %(columns)s FROM %(table)s " 166 "WHERE %(key)s=%%(key)s") % info 167 self._delete_stmt = ("DELETE FROM %(table)s " 168 "WHERE %(key)s=%%(key)s") % info 169 170 for k, t in table_fields: 171 if hasattr(self, k): 172 raise ValueError("Cannot use %s as a table field" % k) 173 setattr(self, k,None)
174 175
176 - def as_dict(self):
177 d = {} 178 for k, t in self._table_fields: 179 d[k] = getattr(self, k) 180 return d
181
182 - def select(self, cursor):
183 d = { 'key' : getattr(self,self._primary_key) } 184 cursor.execute(self._select_stmt % d) 185 rows = cursor.fetchall() 186 if len(rows)!=1: 187 msg = "Select: ambiguous query returned %d rows" 188 raise ValueError(msg % len(rows)) 189 for (f, t), v in zip(self._table_fields, rows[0]): 190 setattr(self, f, v)
191
192 - def update(self, cursor):
193 d = self.as_dict() 194 cursor.execute(self._update_stmt % d)
195
196 - def delete(self, cursor):
197 d = { 'key' : getattr(self,self._primary_key) }
198 199 200 # Helper functions ############################################################# 201
202 -def name_fields(cursor, records) :
203 """ 204 Take a cursor and a list of records fetched with that cursor, then return a 205 list of dictionnaries (one for each record) whose keys are column names and 206 values are records' values. 207 208 :param cursor: cursor used to execute the query 209 :param records: list returned by fetch*() 210 """ 211 result = [] 212 for record in records : 213 record_dict = {} 214 for i in range(len(record)) : 215 record_dict[cursor.description[i][0]] = record[i] 216 result.append(record_dict) 217 return result
218
219 -def sql_repr(type, val):
220 if type == 's': 221 return "'%s'" % (val,) 222 else: 223 return val
224 225 226 if __name__ == "__main__": 227 import doctest 228 from logilab.common import sqlgen 229 print doctest.testmod(sqlgen) 230