You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

142 lines
4.7 KiB

'Python interface to PostgreSQL for managing database nodes.'
from subprocess import Popen, PIPE
class Db(object):
def __init__(self,name):
self._name = name
self._p = Popen('/usr/bin/psql -Aqt -U postgres', stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True)
def _run(self, command,args=[]):
execfn = [command] + list(args)
try:
p = Popen(execfn, stdout=PIPE, stderr=PIPE)
return p.communicate()
except Exception,e:
print str(e)
return -1
def _runsql(self, sql, db='postgres'):
given_sql = sql
sql = sql +'\echo this is the end' +'\n'
self._p.stdout.flush()
self._p.stdin.write(str(sql))
out = ""
out = self._p.stdout.readline()
while out.find("this is the end") == -1:
out = out +self._p.stdout.readline()
lout = out.replace('this is the end','')
return lout.strip()
def _get_owner(self):
sql = "SELECT pg_get_userbyid(datdba) FROM pg_database WHERE datname ='"+self.sanitize(self._name)+"';"
own = self._runsql(sql)
return own
def _set_owner(self, owner):
sql = "ALTER DATABASE "+self._name+" OWNER TO "+self.sanitize(owner)+";"
own = self._runsql(sql)
return own
db_owner = property(_get_owner, _set_owner)
@property
def db_OID(self):
sql = "SELECT oid FROM pg_database WHERE datname = '"+self.sanitize(self._name)+"';"
oid = self._runsql(sql)
return oid
@property
def db_info(self):
information = {'size':'', 'encoding':'', 'collation':'','ctype':''}
information['size'] = self._runsql("SELECT pg_size_pretty(pg_database_size('"+self.sanitize(self._name)+"'));")
information['encoding'], information['collation'], \
information['ctype'] = self._runsql("SELECT pg_encoding_to_char(encoding), datcollate, datctype FROM pg_database WHERE datname='"+self.sanitize(self._name)+"';").split('|')
return information
@property
def db_connections(self):
sql = "SELECT numbackends from pg_stat_database WHERE datname = '"+self.sanitize(self._name)+"';"
cncs = self._runsql(sql)
return cncs
def user_exists(self, user):
sql = "SELECT rolname FROM pg_authid WHERE rolname = '"+self.sanitize(user)+"';"
u = self._runsql(sql)
if (u == ""):
return False
return True
def db_exists(self, xdb):
sql = "SELECT datname FROM pg_database WHERE datname = '"+self.sanitize(xdb)+"';"
d = self._runsql(sql)
if (d == ""):
return False
return True
def db_delete(self):
if self.db_exists(self._name) == True:
sql = "DROP DATABASE "+self.sanitize(self._name)+";"
drop = self._runsql(sql)
return drop
return "Failed"
def db_create(self, own, coll, ctyp, enc=u'UTF8'):
if self.db_exists(self._name) == False:
sql = "CREATE DATABASE "+self.sanitize(self._name)+" WITH OWNER = "+self.sanitize(own)+" ENCODING = '"+self.sanitize(enc)+"' LC_COLLATE = '"+self.sanitize(coll)+"' LC_CTYPE = '"+self.sanitize(ctyp)+"';"
create = self._runsql(sql)
return create
return "Failed"
def db_dump(self, path, method):
dump = Popen(['/usr/bin/pg_dump', '-U','postgres','-F'+ method, self._name], stdout=PIPE)
fl = open(path,"wb")
gz = Popen(['gzip'], stdin = dump.stdout, stdout = fl)
fl.close
return "Finished dumping "+self._name
def db_rename(self,old, new):
if self.db_exists(new) == True or self.db_exists(old) == False:
return "Cannot"
sql = "ALTER DATABASE "+self.sanitize(old)+" RENAME TO "+self.sanitize(new)+";"
rename = self._runsql(sql)
return rename
def copy():
pass
def db_list(self):
sql = "SELECT datname FROM pg_database WHERE datname NOT IN ('template0', 'template1', 'postgres');"
dbl = self._runsql(sql)
return dbl
def usr_list(self):
sql = "SELECT rolname FROM pg_authid WHERE rolcanlogin=true;"
usrl = self._runsql(sql)
return usrl
def usr_add(self, us, passw):
sql = "CREATE ROLE "+self.sanitize(us)+" WITH NOSUPERUSER NOCREATEDB NOCREATEROLE NOCREATEUSER LOGIN PASSWORD '"+self.sanitize(passw)+"';"
usra = self._runsql(sql)
return usra
def usr_delete(self, us):
sql = "DROP ROLE IF EXISTS "+self.sanitize(us)+";"
usrd = self._runsql(sql)
return usrd
def sanitize(self,s):
mset = '0123456789ABCDEFGHIJKLMNOPRSTUVXYZabcdefghijklmnoprstuvxyz-_.'
return ''.join([c for c in s if c in mset])
def _test():
test = Db(u'postgres')
print test.db_list()
print test.usr_list()
print test.db_info['encoding'], test.info['collation'], test.info['ctype']
print test.sanitize("aaaa-fgdg?sd/!_fb*gs'h;s'hdghj.dn ")
if __name__ == '__main__':
_test()