'Python interface to PostgreSQL for managing database nodes.' from subprocess import Popen, PIPE import psycopg2 import psycopg2.extensions class Db(object): def __init__(self,name): self._name = name self.conn = psycopg2.connect("dbname='postgres' user='postgres' host= \ 'localhost' password=''"); self.conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) self.cursor = self.conn.cursor() def _run(self, command,args=[]): execfn = [command] + list(args) try: p = Popen(execfn, stdout=PIPE, stderr=PIPE) return p.communicate() except Exception,e: print str(e) return -1 def _runsql(self, sql, db='postgres'): given_sql = sql self.cursor.execute(given_sql) try: out = self.cursor.fetchall() except Exception,e: out = "Exception " +str(e)+" ocured" return out def _get_owner(self): sql = "SELECT pg_get_userbyid(datdba) FROM pg_database WHERE datname = \ '"+self.sanitize(self._name)+"';" own = self._runsql(sql) return own def _set_owner(self, owner): sql = "ALTER DATABASE "+self._name+" OWNER TO "+self.sanitize(owner)+";" own = self._runsql(sql) return own db_owner = property(_get_owner, _set_owner) @property def db_OID(self): sql = "SELECT oid FROM pg_database WHERE datname = '" \ +self.sanitize(self._name)+"';" oid = self._runsql(sql) return oid @property def db_info(self): information = {'size':'', 'encoding':'', 'collation':'','ctype':''} information['size'] = self._runsql("SELECT pg_size_pretty( \ pg_database_size('"+self.sanitize( self._name)+"'));") inf = self._runsql("""SELECT pg_encoding_to_char(encoding), datcollate, datctype FROM pg_database WHERE datname='""" +self._name+"';") information['encoding'] = inf[0][0] information['collation'] = inf[0][1] information['ctype'] = inf[0][2] return information @property def db_connections(self): sql = "SELECT numbackends from pg_stat_database WHERE datname = \ '"+self.sanitize(self._name)+"';" cncs = self._runsql(sql) return cncs def user_exists(self, user): sql = "SELECT rolname FROM pg_authid WHERE rolname = \ '"+self.sanitize(user)+"';" u = self._runsql(sql) if (len(u) == 0): return False return True def db_exists(self, xdb): sql = "SELECT datname FROM pg_database WHERE datname = \ '"+self.sanitize(xdb)+"';" d = self._runsql(sql) if (len(d) == 0): return False return True def db_delete(self): if self.db_exists(self._name) == True: sql = "DROP DATABASE "+self.sanitize(self._name)+";" drop = self._runsql(sql) return drop return "Failed" def db_create(self, own, coll, ctyp, enc=u'UTF8'): if self.db_exists(self._name) == False: sql = "CREATE DATABASE "+self.sanitize(self._name)+" WITH OWNER = "\ +self.sanitize(own)+" ENCODING = '"+self.sanitize(enc) \ +"' LC_COLLATE = '"+self.sanitize(coll)+"' LC_CTYPE = \ '"+self.sanitize(ctyp)+"';" create = self._runsql(sql) return create return "Failed" def db_dump(self, path, method): dump = Popen(['/usr/bin/pg_dump', '-U','postgres','-F'+ method, self._name], stdout=PIPE) fl = open(path,"wb") gz = Popen(['gzip'], stdin = dump.stdout, stdout = fl) fl.close return "Finished dumping "+self._name def db_rename(self,old, new): if self.db_exists(new) == True or self.db_exists(old) == False: return "Cannot" sql = "ALTER DATABASE "+self.sanitize(old)+" RENAME TO " \ +self.sanitize(new)+";" rename = self._runsql(sql) return rename def copy(): pass def db_list(self): sql = "SELECT datname FROM pg_database WHERE datname NOT IN ('template0' \ , 'template1', 'postgres');" dbl = self._runsql(sql) return dbl def usr_list(self): sql = "SELECT rolname FROM pg_authid WHERE rolcanlogin=true;" usrl = self._runsql(sql) return usrl def usr_add(self, us, passw): sql = "CREATE ROLE "+self.sanitize(us)+" WITH NOSUPERUSER NOCREATEDB \ NOCREATEROLE NOCREATEUSER LOGIN PASSWORD '" \ +self.sanitize(passw)+"';" usra = self._runsql(sql) return usra def usr_delete(self, us): sql = "DROP ROLE IF EXISTS "+self.sanitize(us)+";" usrd = self._runsql(sql) return usrd def sanitize(self,s): mset = '0123456789ABCDEFGHIJKLMNOPRSTUVXYZabcdefghijklmnoprstuvxyz-_.' return ''.join([c for c in s if c in mset]) def _test(): test = Db(u'postgres') print test.db_list() print test.usr_list() print test.db_info['encoding'], test.db_info['collation'], test.db_info['ctype'] print test.db_info['encoding'], test.db_info['collation'], test.db_info['ctype'] print test.db_info['encoding'], test.db_info['collation'], test.db_info['ctype'] print test.db_info['encoding'], test.db_info['collation'], test.db_info['ctype'] print test.sanitize("aaaa-fgdg?sd/!_fb*gs'h;s'hdghj.dn ") if __name__ == '__main__': _test()