Source code for concurrent.core.db.dbengines

# -*- coding: utf-8 -*-
"""
SQL Alchemy engines
"""
from concurrent.core.components.component import Component, implements, ExtensionPoint
from concurrent.core.db.api import IDBEngine
from sqlalchemy import create_engine
from concurrent.core.config.config import ConfigItem, BoolItem
from concurrent.core.util.texttransforms import _
from concurrent.core.exceptions.baseerror import BaseError

__all__ = ['PostGreSQLEngine','EngineCreationFailedError']

[docs]class EngineCreationFailedError(BaseError): """ Error raised when we get an engine and we failed """ title="[DB Engine Error]"
[docs]class PostGreSQLEngine(Component): """ PostGreSQL DB Engine """ implements(IDBEngine) db_user = ConfigItem('postgresqlengine', 'user', 'postgresql', """User name of the postgresql database.""") db_pass = ConfigItem('postgresqlengine', 'password', '', """Password of the postgresql database.""") db_host = ConfigItem('postgresqlengine', 'host', 'localhost', """Host of the postgresql database.""") db_port = ConfigItem('postgresqlengine', 'port', '5432', """Port of the postgresql database.""") db_name = ConfigItem('postgresqlengine', 'databasename', 'mydb', """Name of the postgresql database.""") db_echo = BoolItem('postgresqlengine', 'echo', True, """Use SQL Alchemy debug output.""") def __init__(self): """ Initialize engine """ self.engine = None def _get_connection_string(self): """ Private method to build the current connection string """ return _("postgresql://%(user)s:%(password)s@%(host)s:%(port)s/%(dbname)s", user=self.db_user, password=self.db_pass, host=self.db_host, port=self.db_port, dbname=self.db_name)
[docs] def get_engine(self): """ Create a PostGreSQL DB Engine """ if not self.engine: self.engine = create_engine( self._get_connection_string(), echo=self.db_echo ) return self.engine
[docs] def initdb(self): """ Called from the dbmanager once it gets initialized """ #if not self.get_engine(): # raise EngineCreationFailedError(_("Failed to create db engine for: %(connectionstring)s", connectionstring=self._get_connection_string()))
[docs] def dbshutdown(self): """ Called from the dbmanager once it gets shutdown """