From 42caf98aef4065b83614ea63f9677e9a5c42ed4c Mon Sep 17 00:00:00 2001 From: Nagy Karoly Gabriel Date: Mon, 29 Mar 2010 18:22:45 +0300 Subject: [PATCH] ecn-robots: added sanitising function for strings. --- pgdb.py | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/pgdb.py b/pgdb.py index 972df5c..823b8a6 100644 --- a/pgdb.py +++ b/pgdb.py @@ -30,12 +30,12 @@ class Db(object): return lout.strip() def _get_owner(self): - sql = "SELECT pg_get_userbyid(datdba) FROM pg_database WHERE datname ='"+self._name+"';" + sql = "SELECT pg_get_userbyid(datdba) FROM pg_database WHERE datname ='"+self.sanitize(self._name)+"';" own = self._runsql(sql) return own def _set_owner(self, owner): - sql = "ALTER DATABASE "+self._name+" OWNER TO "+owner+";" + sql = "ALTER DATABASE "+self._name+" OWNER TO "+self.sanitize(owner)+";" own = self._runsql(sql) return own @@ -43,33 +43,33 @@ class Db(object): @property def OID(self): - sql = "SELECT oid FROM pg_database WHERE datname = '"+self._name+"';" + sql = "SELECT oid FROM pg_database WHERE datname = '"+self.sanitize(self._name)+"';" oid = self._runsql(sql) return oid @property def info(self): information = {'size':'', 'encoding':'', 'collation':'','ctype':''} - information['size'] = self._runsql("SELECT pg_size_pretty(pg_database_size('"+self._name+"'));") + information['size'] = self._runsql("SELECT pg_size_pretty(pg_database_size('"+self.sanitize(self._name)+"'));") information['encoding'], information['collation'], \ - information['ctype'] = self._runsql("SELECT pg_encoding_to_char(encoding), datcollate, datctype FROM pg_database WHERE datname='"+self._name+"';").split('|') + information['ctype'] = self._runsql("SELECT pg_encoding_to_char(encoding), datcollate, datctype FROM pg_database WHERE datname='"+self.sanitize(self._name)+"';").split('|') return information @property def connections(self): - sql = "SELECT numbackends from pg_stat_database WHERE datname = '"+self._name+"';" + sql = "SELECT numbackends from pg_stat_database WHERE datname = '"+self.sanitize(self._name)+"';" cncs = self._runsql(sql) return cncs def user_exists(self, user): - sql = "SELECT rolname FROM pg_authid WHERE rolname = '"+user+"';" + sql = "SELECT rolname FROM pg_authid WHERE rolname = '"+self.sanitize(user)+"';" u = self._runsql(sql) if (u == ""): return False return True def db_exists(self, xdb): - sql = "SELECT datname FROM pg_database WHERE datname = '"+xdb+"';" + sql = "SELECT datname FROM pg_database WHERE datname = '"+self.sanitize(xdb)+"';" d = self._runsql(sql) if (d == ""): return False @@ -77,14 +77,14 @@ class Db(object): def delete(self): if self.db_exists(self._name) == True: - sql = "DROP DATABASE "+self._name+";" + sql = "DROP DATABASE "+self.sanitize(self._name)+";" drop = self._runsql(sql) return drop return "Failed" def create(self, own, coll, ctyp, enc=u'UTF8'): if self.db_exists(self._name) == False: - sql = "CREATE DATABASE "+self._name+" WITH OWNER = "+own+" ENCODING = '"+enc+"' LC_COLLATE = '"+coll+"' LC_CTYPE = '"+ctyp+"';" + sql = "CREATE DATABASE "+self.sanitize(self._name)+" WITH OWNER = "+self.sanitize(own)+" ENCODING = '"+self.sanitize(enc)+"' LC_COLLATE = '"+self.sanitize(coll)+"' LC_CTYPE = '"+self.sanitize(ctyp)+"';" create = self._runsql(sql) return create return "Failed" @@ -100,7 +100,7 @@ class Db(object): def rename(self,old, new): if self.db_exists(new) == True or self.db_exists(old) == False: return "Cannot" - sql = "ALTER DATABASE "+old+" RENAME TO "+new+";" + sql = "ALTER DATABASE "+self.sanitize(old)+" RENAME TO "+self.sanitize(new)+";" rename = self._runsql(sql) return rename @@ -116,6 +116,10 @@ class Db(object): sql = "SELECT rolname FROM pg_authid WHERE rolcanlogin=true;" usrl = self._runsql(sql) return usrl + + def sanitize(self,s): + mset = '0123456789ABCDEFGHIJKLMNOPRSTUVXYZabcdefghijklmnoprstuvxyz-_.' + return ''.join([c for c in s if c in mset]) def _test(): test = Db(u'postgres') @@ -123,6 +127,7 @@ def _test(): print test.usrlist() print test.info['encoding'], test.info['collation'], test.info['ctype'] #print test.owner + print test.sanitize("aaaa-fgdg?sd/!_fb*gs'h;s'hdghj.dn ") #print test.connections #print "User aaa is ",test.user_exists("aaa") #print "User postgres is ",test.user_exists("postgres")