@ -30,12 +30,12 @@ class Db(object):
return lout . strip ( )
return lout . strip ( )
def _get_owner ( self ) :
def _get_owner ( self ) :
sql = " SELECT pg_get_userbyid(datdba) FROM pg_database WHERE datname = ' " + self . _name + " ' ; "
sql = " SELECT pg_get_userbyid(datdba) FROM pg_database WHERE datname = ' " + self . sanitize ( self . _name ) + " ' ; "
own = self . _runsql ( sql )
own = self . _runsql ( sql )
return own
return own
def _set_owner ( self , owner ) :
def _set_owner ( self , owner ) :
sql = " ALTER DATABASE " + self . _name + " OWNER TO " + owner + " ; "
sql = " ALTER DATABASE " + self . _name + " OWNER TO " + self . sanitize ( owner ) + " ; "
own = self . _runsql ( sql )
own = self . _runsql ( sql )
return own
return own
@ -43,33 +43,33 @@ class Db(object):
@property
@property
def OID ( self ) :
def OID ( self ) :
sql = " SELECT oid FROM pg_database WHERE datname = ' " + self . _name + " ' ; "
sql = " SELECT oid FROM pg_database WHERE datname = ' " + self . sanitize ( self . _name ) + " ' ; "
oid = self . _runsql ( sql )
oid = self . _runsql ( sql )
return oid
return oid
@property
@property
def info ( self ) :
def info ( self ) :
information = { ' size ' : ' ' , ' encoding ' : ' ' , ' collation ' : ' ' , ' ctype ' : ' ' }
information = { ' size ' : ' ' , ' encoding ' : ' ' , ' collation ' : ' ' , ' ctype ' : ' ' }
information [ ' size ' ] = self . _runsql ( " SELECT pg_size_pretty(pg_database_size( ' " + self . _name + " ' )); " )
information [ ' size ' ] = self . _runsql ( " SELECT pg_size_pretty(pg_database_size( ' " + self . sanitize ( self . _name ) + " ' )); " )
information [ ' encoding ' ] , information [ ' collation ' ] , \
information [ ' encoding ' ] , information [ ' collation ' ] , \
information [ ' ctype ' ] = self . _runsql ( " SELECT pg_encoding_to_char(encoding), datcollate, datctype FROM pg_database WHERE datname= ' " + self . _name + " ' ; " ) . split ( ' | ' )
information [ ' ctype ' ] = self . _runsql ( " SELECT pg_encoding_to_char(encoding), datcollate, datctype FROM pg_database WHERE datname= ' " + self . sanitize ( self . _name ) + " ' ; " ) . split ( ' | ' )
return information
return information
@property
@property
def connections ( self ) :
def connections ( self ) :
sql = " SELECT numbackends from pg_stat_database WHERE datname = ' " + self . _name + " ' ; "
sql = " SELECT numbackends from pg_stat_database WHERE datname = ' " + self . sanitize ( self . _name ) + " ' ; "
cncs = self . _runsql ( sql )
cncs = self . _runsql ( sql )
return cncs
return cncs
def user_exists ( self , user ) :
def user_exists ( self , user ) :
sql = " SELECT rolname FROM pg_authid WHERE rolname = ' " + user + " ' ; "
sql = " SELECT rolname FROM pg_authid WHERE rolname = ' " + self . sanitize ( user ) + " ' ; "
u = self . _runsql ( sql )
u = self . _runsql ( sql )
if ( u == " " ) :
if ( u == " " ) :
return False
return False
return True
return True
def db_exists ( self , xdb ) :
def db_exists ( self , xdb ) :
sql = " SELECT datname FROM pg_database WHERE datname = ' " + xdb + " ' ; "
sql = " SELECT datname FROM pg_database WHERE datname = ' " + self . sanitize ( xdb ) + " ' ; "
d = self . _runsql ( sql )
d = self . _runsql ( sql )
if ( d == " " ) :
if ( d == " " ) :
return False
return False
@ -77,14 +77,14 @@ class Db(object):
def delete ( self ) :
def delete ( self ) :
if self . db_exists ( self . _name ) == True :
if self . db_exists ( self . _name ) == True :
sql = " DROP DATABASE " + self . _name + " ; "
sql = " DROP DATABASE " + self . sanitize ( self . _name ) + " ; "
drop = self . _runsql ( sql )
drop = self . _runsql ( sql )
return drop
return drop
return " Failed "
return " Failed "
def create ( self , own , coll , ctyp , enc = u ' UTF8 ' ) :
def create ( self , own , coll , ctyp , enc = u ' UTF8 ' ) :
if self . db_exists ( self . _name ) == False :
if self . db_exists ( self . _name ) == False :
sql = " CREATE DATABASE " + self . _name + " WITH OWNER = " + own + " ENCODING = ' " + enc + " ' LC_COLLATE = ' " + coll + " ' LC_CTYPE = ' " + ctyp + " ' ; "
sql = " CREATE DATABASE " + self . sanitize ( self . _name ) + " WITH OWNER = " + self . sanitize ( own ) + " ENCODING = ' " + self . sanitiz e( enc ) + " ' LC_COLLATE = ' " + self . sanitize ( coll ) + " ' LC_CTYPE = ' " + self . sanitize ( ctyp ) + " ' ; "
create = self . _runsql ( sql )
create = self . _runsql ( sql )
return create
return create
return " Failed "
return " Failed "
@ -100,7 +100,7 @@ class Db(object):
def rename ( self , old , new ) :
def rename ( self , old , new ) :
if self . db_exists ( new ) == True or self . db_exists ( old ) == False :
if self . db_exists ( new ) == True or self . db_exists ( old ) == False :
return " Cannot "
return " Cannot "
sql = " ALTER DATABASE " + old + " RENAME TO " + new + " ; "
sql = " ALTER DATABASE " + self . sanitize ( old ) + " RENAME TO " + self . sanitize ( new ) + " ; "
rename = self . _runsql ( sql )
rename = self . _runsql ( sql )
return rename
return rename
@ -116,6 +116,10 @@ class Db(object):
sql = " SELECT rolname FROM pg_authid WHERE rolcanlogin=true; "
sql = " SELECT rolname FROM pg_authid WHERE rolcanlogin=true; "
usrl = self . _runsql ( sql )
usrl = self . _runsql ( sql )
return usrl
return usrl
def sanitize ( self , s ) :
mset = ' 0123456789ABCDEFGHIJKLMNOPRSTUVXYZabcdefghijklmnoprstuvxyz-_. '
return ' ' . join ( [ c for c in s if c in mset ] )
def _test ( ) :
def _test ( ) :
test = Db ( u ' postgres ' )
test = Db ( u ' postgres ' )
@ -123,6 +127,7 @@ def _test():
print test . usrlist ( )
print test . usrlist ( )
print test . info [ ' encoding ' ] , test . info [ ' collation ' ] , test . info [ ' ctype ' ]
print test . info [ ' encoding ' ] , test . info [ ' collation ' ] , test . info [ ' ctype ' ]
#print test.owner
#print test.owner
print test . sanitize ( " aaaa-fgdg?sd/!_fb*gs ' h;s ' hdghj.dn " )
#print test.connections
#print test.connections
#print "User aaa is ",test.user_exists("aaa")
#print "User aaa is ",test.user_exists("aaa")
#print "User postgres is ",test.user_exists("postgres")
#print "User postgres is ",test.user_exists("postgres")