Browse Source

ecn-robots: reimplemented PostgreSQL robot using psycopg2

master
Nagy Károly Gábriel 16 years ago
parent
commit
ab2caa0f28
1 changed files with 66 additions and 57 deletions
  1. +66
    -57
      pgdb.py

+ 66
- 57
pgdb.py

@ -1,12 +1,17 @@
'Python interface to PostgreSQL for managing database nodes.'
r'''Python interface to PostgreSQL for managing database nodes.
'''
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
import psycopg2
import psycopg2.extensions
class Db(object): class Db(object):
def __init__(self,name): def __init__(self,name):
self._name = name self._name = name
self._p = Popen('/usr/bin/psql -Aqt -U postgres', stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True)
self.conn = psycopg2.connect("dbname='postgres' user='postgres' host='localhost' password=''");
self.conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
self.cursor = self.conn.cursor()
def _run(self, command,args=[]): def _run(self, command,args=[]):
execfn = [command] + list(args) execfn = [command] + list(args)
@ -19,77 +24,77 @@ class Db(object):
def _runsql(self, sql, db='postgres'): def _runsql(self, sql, db='postgres'):
given_sql = sql given_sql = sql
sql = sql +'\echo this is the end' +'\n'
self._p.stdout.flush()
self._p.stdin.write(str(sql))
out = ""
out = self._p.stdout.readline()
while out.find("this is the end") == -1:
out = out +self._p.stdout.readline()
lout = out.replace('this is the end','')
return lout.strip()
self.cursor.execute(given_sql)
try:
out = self.cursor.fetchall()
except Exception,e:
out = "Executed with exception "
return out
def _get_owner(self): def _get_owner(self):
sql = "SELECT pg_get_userbyid(datdba) FROM pg_database WHERE datname ='"+self.sanitize(self._name)+"';"
sql = "SELECT pg_get_userbyid(datdba) FROM pg_database WHERE datname ='"+self._name+"';"
own = self._runsql(sql) own = self._runsql(sql)
return own return own
def _set_owner(self, owner): def _set_owner(self, owner):
sql = "ALTER DATABASE "+self._name+" OWNER TO "+self.sanitize(owner)+";"
sql = "ALTER DATABASE "+self._name+" OWNER TO "+owner+";"
own = self._runsql(sql) own = self._runsql(sql)
return own return own
db_owner = property(_get_owner, _set_owner)
owner = property(_get_owner, _set_owner)
@property @property
def db_OID(self):
sql = "SELECT oid FROM pg_database WHERE datname = '"+self.sanitize(self._name)+"';"
def OID(self):
sql = "SELECT oid FROM pg_database WHERE datname = '"+self._name+"';"
oid = self._runsql(sql) oid = self._runsql(sql)
return oid return oid
@property @property
def db_info(self):
def info(self):
information = {'size':'', 'encoding':'', 'collation':'','ctype':''} information = {'size':'', 'encoding':'', 'collation':'','ctype':''}
information['size'] = self._runsql("SELECT pg_size_pretty(pg_database_size('"+self.sanitize(self._name)+"'));")
information['encoding'], information['collation'], \
information['ctype'] = self._runsql("SELECT pg_encoding_to_char(encoding), datcollate, datctype FROM pg_database WHERE datname='"+self.sanitize(self._name)+"';").split('|')
information['size'] = self._runsql("SELECT pg_size_pretty(pg_database_size('"+self._name+"'));")
inf = self._runsql("""SELECT pg_encoding_to_char(encoding),
datcollate, datctype FROM pg_database WHERE datname='"""+self._name+"';")
information['encoding'] = inf[0][0]
information['collation'] = inf[0][1]
information['ctype'] = inf[0][2]
return information return information
@property @property
def db_connections(self):
sql = "SELECT numbackends from pg_stat_database WHERE datname = '"+self.sanitize(self._name)+"';"
def connections(self):
sql = "SELECT numbackends from pg_stat_database WHERE datname = '"+self._name+"';"
cncs = self._runsql(sql) cncs = self._runsql(sql)
return cncs return cncs
def user_exists(self, user): def user_exists(self, user):
sql = "SELECT rolname FROM pg_authid WHERE rolname = '"+self.sanitize(user)+"';"
sql = "SELECT rolname FROM pg_authid WHERE rolname = '"+user+"';"
u = self._runsql(sql) u = self._runsql(sql)
if (u == ""):
if (len(u) == 0):
return False return False
return True return True
def db_exists(self, xdb): def db_exists(self, xdb):
sql = "SELECT datname FROM pg_database WHERE datname = '"+self.sanitize(xdb)+"';"
sql = "SELECT datname FROM pg_database WHERE datname = '"+xdb+"';"
d = self._runsql(sql) d = self._runsql(sql)
if (d == ""):
if (len(d) == 0):
return False return False
return True return True
def db_delete(self):
def delete(self):
if self.db_exists(self._name) == True: if self.db_exists(self._name) == True:
sql = "DROP DATABASE "+self.sanitize(self._name)+";"
sql = "DROP DATABASE "+self._name+";"
drop = self._runsql(sql) drop = self._runsql(sql)
return drop return drop
return "Failed" return "Failed"
def db_create(self, own, coll, ctyp, enc=u'UTF8'):
def create(self, own, coll, ctyp, enc=u'UTF8'):
if self.db_exists(self._name) == False: if self.db_exists(self._name) == False:
sql = "CREATE DATABASE "+self.sanitize(self._name)+" WITH OWNER = "+self.sanitize(own)+" ENCODING = '"+self.sanitize(enc)+"' LC_COLLATE = '"+self.sanitize(coll)+"' LC_CTYPE = '"+self.sanitize(ctyp)+"';"
sql = "CREATE DATABASE "+self._name+" WITH OWNER = "+own+" ENCODING = '"+enc+"' LC_COLLATE = '"+coll+"' LC_CTYPE = '"+ctyp+"';"
create = self._runsql(sql) create = self._runsql(sql)
return create return create
return "Failed" return "Failed"
def db_dump(self, path, method):
def dump(self, path, method):
dump = Popen(['/usr/bin/pg_dump', '-U','postgres','-F'+ method, self._name], stdout=PIPE) dump = Popen(['/usr/bin/pg_dump', '-U','postgres','-F'+ method, self._name], stdout=PIPE)
fl = open(path,"wb") fl = open(path,"wb")
gz = Popen(['gzip'], stdin = dump.stdout, stdout = fl) gz = Popen(['gzip'], stdin = dump.stdout, stdout = fl)
@ -97,46 +102,50 @@ class Db(object):
return "Finished dumping "+self._name return "Finished dumping "+self._name
def db_rename(self,old, new):
def rename(self,old, new):
if self.db_exists(new) == True or self.db_exists(old) == False: if self.db_exists(new) == True or self.db_exists(old) == False:
return "Cannot" return "Cannot"
sql = "ALTER DATABASE "+self.sanitize(old)+" RENAME TO "+self.sanitize(new)+";"
sql = "ALTER DATABASE "+old+" RENAME TO "+new+";"
rename = self._runsql(sql) rename = self._runsql(sql)
return rename return rename
def copy(): def copy():
pass pass
def db_list(self):
def dblist(self):
sql = "SELECT datname FROM pg_database WHERE datname NOT IN ('template0', 'template1', 'postgres');" sql = "SELECT datname FROM pg_database WHERE datname NOT IN ('template0', 'template1', 'postgres');"
dbl = self._runsql(sql)
return dbl
dblist = self._runsql(sql)
return dblist
def usr_list(self):
def usrlist(self):
sql = "SELECT rolname FROM pg_authid WHERE rolcanlogin=true;" sql = "SELECT rolname FROM pg_authid WHERE rolcanlogin=true;"
usrl = self._runsql(sql)
return usrl
def usr_add(self, us, passw):
sql = "CREATE ROLE "+self.sanitize(us)+" WITH NOSUPERUSER NOCREATEDB NOCREATEROLE NOCREATEUSER LOGIN PASSWORD '"+self.sanitize(passw)+"';"
usra = self._runsql(sql)
return usra
def usr_delete(self, us):
sql = "DROP ROLE IF EXISTS "+self.sanitize(us)+";"
usrd = self._runsql(sql)
return usrd
def sanitize(self,s):
mset = '0123456789ABCDEFGHIJKLMNOPRSTUVXYZabcdefghijklmnoprstuvxyz-_.'
return ''.join([c for c in s if c in mset])
usrlist = self._runsql(sql)
return usrlist
def _test(): def _test():
test = Db(u'postgres') test = Db(u'postgres')
print test.db_list()
print test.usr_list()
print test.db_info['encoding'], test.info['collation'], test.info['ctype']
print test.sanitize("aaaa-fgdg?sd/!_fb*gs'h;s'hdghj.dn ")
print " "
print 'Encoding is ',test.info['encoding'], 'Collation is ',test.info['collation'], 'CType is ',test.info['ctype']
# print test.owner
# print test.connections
print "User aaa is ",test.user_exists("aaa")
print "User postgres is ",test.user_exists("postgres")
print "database xxxaaa is ", test.db_exists("xxxaaa")
print "database postgres is ", test.db_exists("postgres")
print test.dblist()
test2 = Db(u'test')
# print test2.create(u'postgres', u'en_US.UTF-8', u'en_US.UTF-8')
print test2.dblist()
test2.rename(u'test',u'bbb')
print test2.dblist()
#print test2.usrlist()
#print test2.owner
#test2.owner = u'karasz'
#print test2.owner
#test = Db(u'aaa')
#test.create(u'postgres', u'en_US.UTF-8', u'en_US.UTF-8')
# print test.dump('/tmp/aaa.gz',u'p')
#test.delete()
if __name__ == '__main__': if __name__ == '__main__':
_test() _test()

Loading…
Cancel
Save