# # This file is part of GNU Enterprise. # # GNU Enterprise is free software; you can redistribute it # and/or modify it under the terms of the GNU General Public # License as published by the Free Software Foundation; either # version 2, or (at your option) any later version. # # GNU Enterprise is distributed in the hope that it will be # useful, but WITHOUT ANY WARRANTY; without even the implied # warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR # PURPOSE. See the GNU General Public License for more details. # # You should have received a copy of the GNU General Public # License along with program; see the file COPYING. If not, # write to the Free Software Foundation, Inc., 59 Temple Place # - Suite 330, Boston, MA 02111-1307, USA. # # Copyright 2000-2002 Free Software Foundation # # FILE: # interbase/DBdriver.py # # DESCRIPTION: # Driver to provide access to data via the Kinterbasdb Interbase/Firebird Python Driver # Requires Kinterbasdb 3.0 (http://kinterbasdb.sourceforge.net/) # # NOTES: # # Supported attributes (via connections.conf or tag) # # host= This is the Interbase host for your connection (required) # dbame= This is the Interbase database to use (required) # from string import lower, rstrip import sys from gnue.common import GDebug, GDataObjects, GConnections from gnue.common.dbdrivers._dbsig.DBdriver \ import DBSIG_RecordSet, DBSIG_ResultSet, DBSIG_DataObject, \ DBSIG_DataObject_SQL, DBSIG_DataObject_Object try: import kinterbasdb as SIG2api except ImportError, message: raise GConnections.AdapterNotInstalled, \ _("Driver not installed: Kinterbasdb for Interbase [%s]") % message class Interbase_RecordSet(DBSIG_RecordSet): pass class Interbase_ResultSet(DBSIG_ResultSet): def __init__(self, dataObject, cursor=None, defaultValues={}, masterRecordSet=None): DBSIG_ResultSet.__init__(self, dataObject, \ cursor, defaultValues, masterRecordSet) self._recordSetClass = Interbase_RecordSet # Override the base _loadNextRecord as the fetchmany() from the cursor # returns an exception instead of an empty sequence. # TODO: will be unneeded when next (>3.0) kinterbasdb release comes out # TODO: in the kinterbasdb CVS the fetchmany() error has been fixed yet (btami) def _loadNextRecord(self): if self._cursor: rs = None try: rsets = self._cursor.fetchmany() except self._dataObject._DatabaseError, err: rsets = None except: rsets = None if rsets and len(rsets): for rs in(rsets): if rs: i = 0 dict = {} for f in (rs): dict[lower(self._fieldNames[i])] = f i += 1 self._cachedRecords.append (self._recordSetClass(parent=self, \ initialData=dict)) else: return 0 return 1 else: return 0 else: return 0 class Interbase_DataObject(DBSIG_DataObject): def __init__(self): DBSIG_DataObject.__init__(self) self._DatabaseError = SIG2api.DatabaseError self._resultSetClass = Interbase_ResultSet def connect(self, connectData={}): GDebug.printMesg(1,"Interbase database driver initializing") try: self._dataConnection = SIG2api.connect( \ user=connectData['_username'], \ password=connectData['_password'], \ database=connectData['dbname'], \ host=connectData['host']) except self._DatabaseError, value: raise GDataObjects.LoginError, value self._postConnect() # # Schema (metadata) functions # # Return a list of the types of Schema objects this driver provides def getSchemaTypes(self): return [('view','View',1), ('table','Table',1)] # Return a list of Schema objects def getSchemaList(self, type=None): # This excludes any system tables and views. statement = "select rdb$relation_name, rdb$view_source "+\ "from rdb$relations " + \ "where rdb$system_flag=0 " + \ "order by rdb$relation_name" cursor = self._dataConnection.cursor() cursor.execute(statement) # TODO: rdb$view_source is null for table and rdb$view_source is not null for view list = [] for rs in cursor.fetchall(): list.append(GDataObjects.Schema(attrs={'id':rs[0], 'name':rstrip(rs[0]), 'type':'table'}, getChildSchema=self.__getFieldSchema)) cursor.close() return list # Find a schema object with specified name def getSchemaByName(self, name, type=None): statement = "select rdb$relation_name, rdb$view_source "+\ "from rdb$relations " + \ "where rdb$relation_name = '%s'" % (name) cursor = self._dataConnection.cursor() cursor.execute(statement) rs = cursor.fetchone() if rs: schema = GDataObjects.Schema(attrs={'id':rs[0], 'name':rstrip(rs[0]), 'type':'table'}, getChildSchema=self.__getFieldSchema) else: schema = None cursor.close() return schema # Get fields for a table def __getFieldSchema(self, parent): # statement = "select a.rdb$field_name,a.rdb$null_flag,"+\ # "c.rdb$type_name,"+\ # "b.rdb$field_length, "+\ # "b.rdb$field_scale "+\ # "from rdb$relation_fields a, rdb$fields b, rdb$types c "+\ # "where c.rdb$field_name='RDB$FIELD_TYPE' and "+\ # "c.rdb$type=b.rdb$field_type and "+\ # "b.rdb$field_name=a.rdb$field_source and "+\ # "a.rdb$relation_name= '%s'"%(parent.name) statement = "select * from %s"%(parent.name) + " where (0=1)" cursor = self._dataConnection.cursor() cursor.execute(statement) list = [] # for rs in cursor.fetchall(): # nativetype = rstrip(rs[2]) # # attrs={'id': rs[0], 'name': lower(rstrip(rs[0])), # 'type':'field', 'nativetype': nativetype, # 'required': rs[1]==1, 'length': rs[3] } # # if nativetype in ('SHORT','LONG','QUAD','FLOAT','DOUBLE'): # attrs['datatype']='number' # attrs['precision'] = rs[4] # elif nativetype in ('DATE','TIME','TIMESTAMP'): # attrs['datatype']='date' # else: # attrs['datatype']='text' for d in cursor.description: nativetype = lower(d[SIG2api.DESCRIPTION_TYPE_CODE].__name__) attrs={'id':d[SIG2api.DESCRIPTION_NAME], 'name':d[SIG2api.DESCRIPTION_NAME], 'type':'field', 'nativetype': nativetype, 'required': d[SIG2api.DESCRIPTION_NULL_OK]==0, 'length': d[SIG2api.DESCRIPTION_DISPLAY_SIZE]} if nativetype in ('int','float','long'): attrs['datatype']='number' attrs['precision']=d[SIG2api.DESCRIPTION_SCALE] elif nativetype == 'datetime': attrs['datatype']='date' else: attrs['datatype']='text' list.append(GDataObjects.Schema(attrs=attrs)) cursor.close() return list def _postConnect(self): self.triggerExtensions = TriggerExtensions(self._dataConnection) class Interbase_DataObject_Object(Interbase_DataObject, \ DBSIG_DataObject_Object): def __init__(self): Interbase_DataObject.__init__(self) def _buildQuery(self, conditions={}): return DBSIG_DataObject_Object._buildQuery(self, conditions) class Interbase_DataObject_SQL(Interbase_DataObject, \ DBSIG_DataObject_SQL): def __init__(self): # Call DBSIG init first because Interbase_DataObject needs to overwrite # some of its values DBSIG_DataObject_SQL.__init__(self) Interbase_DataObject.__init__(self) def _buildQuery(self, conditions={}): return DBSIG_DataObject_SQL._buildQuery(self, conditions) # # Extensions to Trigger Namespaces # class TriggerExtensions: def __init__(self, connection): self.__connection = connection # Return the current date, according to database def getTimeStamp(self): return self.__singleQuery("select cast('now' as date) from rdb$database") # Return a sequence number from sequence 'name' def getSequence(self, name): return self.__singleQuery("select gen_id(%s,1) from rdb$database" % name) # Run the SQL statement 'statement' def sql(self, statement): cursor = self.__connection.cursor() try: cursor.execute(statement) cursor.close() except: cursor.close() raise # Used internally def __singleQuery(self, statement): cursor = self.__connection.cursor() try: cursor.execute(statement) rv = cursor.fetchone() cursor.close() except mesg: GDebug.printMesg(1,"**** Unable to execute extension query") GDebug.printMesg(1,"**** %s" % mesg) cursor.close() return None try: return rv[0] except: return None ###################################### # # The following hashes describe # this driver's characteristings. # ###################################### # # All datasouce "types" and corresponding DataObject class # supportedDataObjects = { 'object': Interbase_DataObject_Object, 'sql': Interbase_DataObject_SQL }