root / logilab.pylintinstaller / logilab / common / sqlgen.py

Revision 202:d67e86292521, 8.6 kB (checked in by tziade@…, 9 months ago)

added logilab.pylintinstaller

Line 
1# This program is free software; you can redistribute it and/or modify it under
2# the terms of the GNU General Public License as published by the Free Software
3# Foundation; either version 2 of the License, or (at your option) any later
4# version.
5#
6# This program is distributed in the hope that it will be useful, but WITHOUT
7# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
8# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
9#
10# You should have received a copy of the GNU General Public License along with
11# this program; if not, write to the Free Software Foundation, Inc.,
12# 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
13"""Help to generate SQL string usable by the Python DB-API
14
15:author:    Logilab
16:copyright: 2003-2008 LOGILAB S.A. (Paris, FRANCE)
17:contact:   http://www.logilab.fr/ -- mailto:python-projects@logilab.org
18"""
19__docformat__ = "restructuredtext en"
20
21
22# SQLGenerator ################################################################
23
24class SQLGenerator :
25    """
26    Helper class to generate SQL strings to use with python's DB-API
27    """
28
29    def where(self, keys, addon=None) :
30        """
31        keys : list of keys
32       
33        >>> s = SQLGenerator()
34        >>> s.where(['nom'])
35        'nom = %(nom)s'
36        >>> s.where(['nom','prenom'])
37        'nom = %(nom)s AND prenom = %(prenom)s'
38        >>> s.where(['nom','prenom'], 'x.id = y.id')
39        'x.id = y.id AND nom = %(nom)s AND prenom = %(prenom)s'
40        """
41        restriction = ["%s = %%(%s)s" % (x, x) for x in keys]
42        if addon:
43            restriction.insert(0, addon)
44        return " AND ".join(restriction)
45
46    def set(self, keys) :
47        """
48        keys : list of keys
49       
50        >>> s = SQLGenerator()
51        >>> s.set(['nom'])
52        'nom = %(nom)s'
53        >>> s.set(['nom','prenom'])
54        'nom = %(nom)s, prenom = %(prenom)s'
55        """
56        return ", ".join(["%s = %%(%s)s" % (x, x) for x in keys])
57
58    def insert(self, table, params) :
59        """
60        table : name of the table
61        params :  dictionnary that will be used as in cursor.execute(sql,params)
62       
63        >>> s = SQLGenerator()
64        >>> s.insert('test',{'nom':'dupont'})
65        'INSERT INTO test ( nom ) VALUES ( %(nom)s )'
66        >>> s.insert('test',{'nom':'dupont','prenom':'jean'})
67        'INSERT INTO test ( nom, prenom ) VALUES ( %(nom)s, %(prenom)s )'
68        """
69        keys = ', '.join(params.keys())
70        values = ', '.join(["%%(%s)s" % x for x in params])
71        sql = 'INSERT INTO %s ( %s ) VALUES ( %s )' % (table, keys, values)
72        return sql
73
74    def select(self, table, params) :
75        """
76        table : name of the table
77        params :  dictionnary that will be used as in cursor.execute(sql,params)
78
79        >>> s = SQLGenerator()
80        >>> s.select('test',{})
81        'SELECT * FROM test'
82        >>> s.select('test',{'nom':'dupont'})
83        'SELECT * FROM test WHERE nom = %(nom)s'
84        >>> s.select('test',{'nom':'dupont','prenom':'jean'})
85        'SELECT * FROM test WHERE nom = %(nom)s AND prenom = %(prenom)s'
86        """
87        sql = 'SELECT * FROM %s' % table
88        where = self.where(params.keys())
89        if where :
90            sql = sql + ' WHERE %s' % where
91        return sql
92
93    def adv_select(self, model, tables, params, joins=None) :
94        """
95        model  : list of columns to select
96        tables : list of tables used in from
97        params   :  dictionnary that will be used as in cursor.execute(sql, params)
98        joins  : optional list of restriction statements to insert in the where
99                 clause. Usually used to perform joins.
100
101        >>> s = SQLGenerator()
102        >>> s.adv_select(['column'],[('test', 't')], {})
103        'SELECT column FROM test AS t'
104        >>> s.adv_select(['column'],[('test', 't')], {'nom':'dupont'})
105        'SELECT column FROM test AS t WHERE nom = %(nom)s'
106        """
107        table_names = ["%s AS %s" % (k, v) for k, v in tables]
108        sql = 'SELECT %s FROM %s' % (', '.join(model), ', '.join(table_names))
109        if joins and type(joins) != type(''):
110            joins = ' AND '.join(joins)
111        where = self.where(params.keys(), joins)
112        if where :
113            sql = sql + ' WHERE %s' % where
114        return sql
115
116    def delete(self, table, params) :
117        """
118        table : name of the table
119        params :  dictionnary that will be used as in cursor.execute(sql,params)
120
121        >>> s = SQLGenerator()
122        >>> s.delete('test',{'nom':'dupont'})
123        'DELETE FROM test WHERE nom = %(nom)s'
124        >>> s.delete('test',{'nom':'dupont','prenom':'jean'})
125        'DELETE FROM test WHERE nom = %(nom)s AND prenom = %(prenom)s'
126        """
127        where = self.where(params.keys())
128        sql = 'DELETE FROM %s WHERE %s' % (table, where)
129        return sql
130
131    def update(self, table, params, unique) :
132        """
133        table : name of the table
134        params :  dictionnary that will be used as in cursor.execute(sql,params)
135
136        >>> s = SQLGenerator()
137        >>> s.update('test', {'id':'001','nom':'dupont'}, ['id'])
138        'UPDATE test SET nom = %(nom)s WHERE id = %(id)s'
139        >>> s.update('test',{'id':'001','nom':'dupont','prenom':'jean'},['id'])
140        'UPDATE test SET nom = %(nom)s, prenom = %(prenom)s WHERE id = %(id)s'
141        """
142        where = self.where(unique)
143        set = self.set([key for key in params if key not in unique])
144        sql = 'UPDATE %s SET %s WHERE %s' % (table, set, where)
145        return sql
146
147class BaseTable:
148    """
149    Another helper class to ease SQL table manipulation
150    """
151    # table_name = "default"
152    # supported types are s/i/d
153    # table_fields = ( ('first_field','s'), )
154    # primary_key = 'first_field'
155
156    def __init__(self, table_name, table_fields, primary_key=None):
157        if primary_key is None:
158            self._primary_key = table_fields[0][0]
159        else:
160            self._primary_key = primary_key
161
162        self._table_fields = table_fields
163        self._table_name = table_name
164        info = {
165            'key' : self._primary_key,
166            'table' : self._table_name,
167            'columns' : ",".join( [ f for f,t in self._table_fields ] ),
168            'values' : ",".join( [sql_repr(t, "%%(%s)s" % f)
169                                  for f,t in self._table_fields] ),
170            'updates' : ",".join( ["%s=%s" % (f, sql_repr(t, "%%(%s)s" % f))
171                                   for f,t in self._table_fields] ),
172            }
173        self._insert_stmt = ("INSERT into %(table)s (%(columns)s) "
174                             "VALUES (%(values)s) WHERE %(key)s=%%(key)s") % info
175        self._update_stmt = ("UPDATE %(table)s SET (%(updates)s) "
176                             "VALUES WHERE %(key)s=%%(key)s") % info
177        self._select_stmt = ("SELECT %(columns)s FROM %(table)s "
178                             "WHERE %(key)s=%%(key)s") % info
179        self._delete_stmt = ("DELETE FROM %(table)s "
180                             "WHERE %(key)s=%%(key)s") % info
181
182        for k, t in table_fields:
183            if hasattr(self, k):
184                raise ValueError("Cannot use %s as a table field" % k)
185            setattr(self, k,None)
186
187
188    def as_dict(self):
189        d = {}
190        for k, t in self._table_fields:
191            d[k] = getattr(self, k)
192        return d
193
194    def select(self, cursor):
195        d = { 'key' : getattr(self,self._primary_key) }
196        cursor.execute(self._select_stmt % d)
197        rows = cursor.fetchall()
198        if len(rows)!=1:
199            msg = "Select: ambiguous query returned %d rows"
200            raise ValueError(msg % len(rows))
201        for (f, t), v in zip(self._table_fields, rows[0]):
202            setattr(self, f, v)
203
204    def update(self, cursor):
205        d = self.as_dict()
206        cursor.execute(self._update_stmt % d)
207
208    def delete(self, cursor):
209        d = { 'key' : getattr(self,self._primary_key) }
210
211
212# Helper functions #############################################################
213
214def name_fields(cursor, records) :
215    """
216    Take a cursor and a list of records fetched with that cursor, then return a
217    list of dictionnaries (one for each record) whose keys are column names and
218    values are records' values.
219
220    cursor : cursor used to execute the query
221    records : list returned by fetch*()
222    """
223    result = []
224    for record in records :
225        record_dict = {}
226        for i in range(len(record)) :
227            record_dict[cursor.description[i][0]] = record[i]
228        result.append(record_dict)
229    return result
230
231def sql_repr(type, val):
232    if type == 's':
233        return "'%s'" % (val,)
234    else:
235        return val
236           
237       
238if __name__ == "__main__":
239    import doctest
240    from logilab.common import sqlgen
241    print doctest.testmod(sqlgen)
Note: See TracBrowser for help on using the browser.