You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

167 lines
5.7 KiB

'Python interface to PostgreSQL for managing database nodes.'
from subprocess import Popen, PIPE
import psycopg2
import psycopg2.extensions
class Db(object):
def __init__(self,name):
self._name = name
self.conn = psycopg2.connect("dbname='postgres' user='postgres' host= \
'localhost' password=''");
self.conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
self.cursor = self.conn.cursor()
def _run(self, command,args=[]):
execfn = [command] + list(args)
try:
p = Popen(execfn, stdout=PIPE, stderr=PIPE)
return p.communicate()
except Exception,e:
print str(e)
return -1
def _runsql(self, sql, db='postgres'):
given_sql = sql
self.cursor.execute(given_sql)
try:
out = self.cursor.fetchall()
except Exception,e:
out = "Exception " +str(e)+" ocured"
return out
def _get_owner(self):
sql = "SELECT pg_get_userbyid(datdba) FROM pg_database WHERE datname = \
'"+self.sanitize(self._name)+"';"
own = self._runsql(sql)
return own
def _set_owner(self, owner):
sql = "ALTER DATABASE "+self._name+" OWNER TO "+self.sanitize(owner)+";"
own = self._runsql(sql)
return own
db_owner = property(_get_owner, _set_owner)
@property
def db_OID(self):
sql = "SELECT oid FROM pg_database WHERE datname = '" \
+self.sanitize(self._name)+"';"
oid = self._runsql(sql)
return oid
@property
def db_info(self):
information = {'size':'', 'encoding':'', 'collation':'','ctype':''}
information['size'] = self._runsql("SELECT pg_size_pretty( \
pg_database_size('"+self.sanitize(
self._name)+"'));")
inf = self._runsql("""SELECT pg_encoding_to_char(encoding),
datcollate, datctype FROM pg_database WHERE datname='"""
+self._name+"';")
information['encoding'] = inf[0][0]
information['collation'] = inf[0][1]
information['ctype'] = inf[0][2]
return information
@property
def db_connections(self):
sql = "SELECT numbackends from pg_stat_database WHERE datname = \
'"+self.sanitize(self._name)+"';"
cncs = self._runsql(sql)
return cncs
def user_exists(self, user):
sql = "SELECT rolname FROM pg_authid WHERE rolname = \
'"+self.sanitize(user)+"';"
u = self._runsql(sql)
if (len(u) == 0):
return False
return True
def db_exists(self, xdb):
sql = "SELECT datname FROM pg_database WHERE datname = \
'"+self.sanitize(xdb)+"';"
d = self._runsql(sql)
if (len(d) == 0):
return False
return True
def db_delete(self):
if self.db_exists(self._name) == True:
sql = "DROP DATABASE "+self.sanitize(self._name)+";"
drop = self._runsql(sql)
return drop
return "Failed"
def db_create(self, own, coll, ctyp, enc=u'UTF8'):
if self.db_exists(self._name) == False:
sql = "CREATE DATABASE "+self.sanitize(self._name)+" WITH OWNER = "\
+self.sanitize(own)+" ENCODING = '"+self.sanitize(enc) \
+"' LC_COLLATE = '"+self.sanitize(coll)+"' LC_CTYPE = \
'"+self.sanitize(ctyp)+"';"
create = self._runsql(sql)
return create
return "Failed"
def db_dump(self, path, method):
dump = Popen(['/usr/bin/pg_dump', '-U','postgres','-F'+
method, self._name], stdout=PIPE)
fl = open(path,"wb")
gz = Popen(['gzip'], stdin = dump.stdout, stdout = fl)
fl.close
return "Finished dumping "+self._name
def db_rename(self,old, new):
if self.db_exists(new) == True or self.db_exists(old) == False:
return "Cannot"
sql = "ALTER DATABASE "+self.sanitize(old)+" RENAME TO " \
+self.sanitize(new)+";"
rename = self._runsql(sql)
return rename
def copy():
pass
def db_list(self):
sql = "SELECT datname FROM pg_database WHERE datname NOT IN ('template0' \
, 'template1', 'postgres');"
dbl = self._runsql(sql)
return dbl
def usr_list(self):
sql = "SELECT rolname FROM pg_authid WHERE rolcanlogin=true;"
usrl = self._runsql(sql)
return usrl
def usr_add(self, us, passw):
sql = "CREATE ROLE "+self.sanitize(us)+" WITH NOSUPERUSER NOCREATEDB \
NOCREATEROLE NOCREATEUSER LOGIN PASSWORD '" \
+self.sanitize(passw)+"';"
usra = self._runsql(sql)
return usra
def usr_delete(self, us):
sql = "DROP ROLE IF EXISTS "+self.sanitize(us)+";"
usrd = self._runsql(sql)
return usrd
def sanitize(self,s):
mset = '0123456789ABCDEFGHIJKLMNOPRSTUVXYZabcdefghijklmnoprstuvxyz-_.'
return ''.join([c for c in s if c in mset])
def _test():
test = Db(u'postgres')
print test.db_list()
print test.usr_list()
print test.db_info['encoding'], test.db_info['collation'], test.db_info['ctype']
print test.db_info['encoding'], test.db_info['collation'], test.db_info['ctype']
print test.db_info['encoding'], test.db_info['collation'], test.db_info['ctype']
print test.db_info['encoding'], test.db_info['collation'], test.db_info['ctype']
print test.sanitize("aaaa-fgdg?sd/!_fb*gs'h;s'hdghj.dn ")
if __name__ == '__main__':
_test()