本文整理汇总了Python中sqlalchemy.create_engine函数的典型用法代码示例。如果您正苦于以下问题:Python create_engine函数的具体用法?Python create_engine怎么用?Python create_engine使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了create_engine函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(self,
dbtype="postgresql",
dbhost="localhost",
dbuser="postgres",
dbpass="postgres",
dbname="postgres"):
self.dbname = dbname
self.dbtype = dbtype
self.dbhost = dbhost
self.dbuser = dbuser
self.dbpass = dbpass
self.engine = create_engine(self.dbstring)
try:
self.engine.connect()
except OperationalError:
self.create_database(dbname)
self.engine = create_engine(self.dbstring)
self.engine.connect()
self.metadata = MetaData(self.engine)
self.Session = sessionmaker(bind=self.engine, expire_on_commit=False)
Base.metadata.create_all(self.engine)
开发者ID:konstantinfarrell,项目名称:dbfoo,代码行数:25,代码来源:database.py
示例2: _db_params
def _db_params(request, db_name):
db_url = request.param['url']
sudo_engine = sqlalchemy.create_engine(db_url % request.param.get('admin_db', ''), poolclass=sqlalchemy.pool.NullPool)
db_connection_url = db_url % db_name
def tear_down():
with sudo_engine.connect() as conn:
conn.execute('rollback')
if 'mssql' in db_url:
conn.connection.connection.autocommit = True
conn.execute('drop database if exists %s' % db_name)
try:
with sqlalchemy.create_engine(db_connection_url).connect():
pass
except (sqlalchemy.exc.OperationalError, sqlalchemy.exc.InternalError, DBAPIError):
with sudo_engine.connect() as conn:
conn.execute('rollback')
if 'mssql' in db_url:
conn.connection.connection.autocommit = True
conn.execute('create database %s' % db_name)
else:
raise Exception('Database %s already exists; refusing to overwrite' % db_name)
request.addfinalizer(tear_down)
params = request.param.copy()
params['url'] = db_connection_url
return params
开发者ID:dimagi,项目名称:commcare-export,代码行数:29,代码来源:conftest.py
示例3: _has_sqlite
def _has_sqlite(self):
from sqlalchemy import create_engine
try:
create_engine('sqlite://')
return True
except ImportError:
return False
开发者ID:NeilTruick,项目名称:sqlalchemy,代码行数:7,代码来源:requirements.py
示例4: test_mysql_innodb
def test_mysql_innodb(self):
"""Test that table creation on mysql only builds InnoDB tables."""
# add this to the global lists to make reset work with it, it's removed
# automaticaly in tearDown so no need to clean it up here.
connect_string = _get_connect_string('mysql')
engine = sqlalchemy.create_engine(connect_string)
self.engines["mysqlcitest"] = engine
self.test_databases["mysqlcitest"] = connect_string
# build a fully populated mysql database with all the tables
self._reset_databases()
self._walk_versions(engine, False, False)
uri = _get_connect_string('mysql', database="information_schema")
connection = sqlalchemy.create_engine(uri).connect()
# sanity check
total = connection.execute("SELECT count(*) "
"from information_schema.TABLES "
"where TABLE_SCHEMA='openstack_citest'")
self.assertGreater(total.scalar(), 0,
msg="No tables found. Wrong schema?")
noninnodb = connection.execute("SELECT count(*) "
"from information_schema.TABLES "
"where TABLE_SCHEMA='openstack_citest' "
"and ENGINE!='InnoDB' "
"and TABLE_NAME!='migrate_version'")
count = noninnodb.scalar()
self.assertEqual(count, 0, "%d non InnoDB tables created" % count)
开发者ID:japplewhite,项目名称:cinder-1,代码行数:30,代码来源:test_migrations.py
示例5: get_session
def get_session(verbose, test):
"""Returns current DB session from SQLAlchemy pool.
* **verbose** if *True* SQLAlchemy **echo** is set to *True*.
* **test** if *True* database is crea ted in RAM only.
>>> get_session(False, True) #doctest: +ELLIPSIS
<sqlalchemy.orm.session.Session object at 0x...>
>>> get_session(False, False) #doctest: +ELLIPSIS
<sqlalchemy.orm.session.Session object at 0x...>
"""
if test:
engine = create_engine('sqlite:///:memory:', echo=verbose)
log_load.debug('DB in RAM.')
else:
engine = create_engine('sqlite:///DB/3did.db', echo=verbose)
log_load.debug('DB stored in file: %s' % 'DB/3did.db')
# Create TABLEs: Domain, Interaction, PDB, Interacting_PDBs
meta.create_all(engine)
# Create session to be used with INSERTs
Session_3DID = sessionmaker(bind=engine)
session_3DID = Session_3DID()
return session_3DID
开发者ID:piobyz,项目名称:protein-protein-interactions-motifs.,代码行数:27,代码来源:DB_3DID.py
示例6: __init__
def __init__(self,configfile='~/.ddr_compress/still.cfg',test=False):
"""
Connect to the database and initiate a session creator.
or
create a FALSE database
db.cfg is the default setup. Config files live in ddr_compress/configs
To use a config file, copy the desired file ~/.paperstill/db.cfg
"""
if not configfile is None:
config = configparser.ConfigParser()
configfile = os.path.expanduser(configfile)
if os.path.exists(configfile):
logger.info('loading file '+configfile)
config.read(configfile)
self.dbinfo = config['dbinfo']
self.dbinfo['password'] = self.dbinfo['password'].decode('string-escape')
else:
logging.info(configfile+" Not Found")
if test:
self.engine = create_engine('sqlite:///',
connect_args={'check_same_thread':False},
poolclass=StaticPool)
self.createdb()
else:
self.engine = create_engine(
'mysql://{username}:{password}@{hostip}:{port}/{dbname}'.format(
**self.dbinfo),
pool_size=20,
max_overflow=40)
self.Session = sessionmaker(bind=self.engine)
开发者ID:acliu,项目名称:capo,代码行数:31,代码来源:dbi.py
示例7: yeild_engine
def yeild_engine(self):
'''create a sql alchemy engine'''
head = 'postgresql://'
tail = ''.join(['@', self.host, '/', self.db])
# test for both user and password provided
if not (self.user is None or self.password is None):
jnd = ''.join([head, self.user, ':', self.password, tail])
return sqlalchemy.create_engine(jnd)
# if user or password not provided pull from pgpass file
path2pass = os.path.join(os.environ['HOME'], '.pgpass')
with open(path2pass, 'r') as f:
for line in f.readlines():
line = line.split(':')
if (line[0] == self.host) and (int(line[1]) == int(self.port)):
if self.password is None:
password = line[-1].strip()
else:
password = self.password
if self.user is None:
user = line[-2].strip()
else:
user = self.user
jnd = ''.join([head, user, ':', password, tail])
return sqlalchemy.create_engine(jnd)
raise RuntimeError('unable to resolve credentials')
开发者ID:marshall245,项目名称:Analytics,代码行数:27,代码来源:dbconnect.py
示例8: __init__
def __init__(self, uri, init_func=None):
# pool_size is set to 1 because each Bridge service is single threaded, so
# there's no reason to use more than one connection per app.
# pool_recycle is set to 60 to avoid MySQL timing out connections after
# awhile. Without it we'd get mysql disconnections after long idle periods.
# pool_timeout will cause SQLAlchemy to wait longer before giving up on new
# connections to the server. It's not strictly necessary, but it doesn't hurt.
#
# MySQLdb's default cursor doesn't let you stream rows as you receive them.
# Even if you use SQLAlchemy iterators, _all_ rows will be read before any
# are returned. The SSCursor lets you stream data, but has the side effect
# of not allowing multiple queries on the same Connection at the same time.
# That's OK for us because each service is single threaded, but it means
# we must ensure that "close" is called whenever we do queries. "fetchall"
# does this automatically, so we always use it.
if "mysql" in uri:
from MySQLdb.cursors import SSCursor
self.db = sa.create_engine(uri, pool_size=1, pool_recycle=60, pool_timeout=60,
connect_args={"cursorclass": SSCursor})
else:
self.db = sa.create_engine(uri, pool_size=1, pool_recycle=60)
if init_func:
init_func(self.db)
metadata = sa.MetaData(self.db)
metadata.reflect()
self.buildrequests_table = metadata.tables["buildrequests"]
self.builds_table = metadata.tables["builds"]
self.sourcestamps_table = metadata.tables["sourcestamps"]
self.buildset_properties_table = metadata.tables["buildset_properties"]
self.buildsets_table = metadata.tables["buildsets"]
开发者ID:Callek,项目名称:buildbot-bridge,代码行数:32,代码来源:servicebase.py
示例9: __init__
def __init__(self, dsn=None):
"""@param dsn: database connection string."""
cfg = Config()
if dsn:
self.engine = create_engine(dsn, poolclass=NullPool)
elif cfg.database.connection:
self.engine = create_engine(cfg.database.connection, poolclass=NullPool)
else:
db_file = os.path.join(CUCKOO_ROOT, "db", "cuckoo.db")
if not os.path.exists(db_file):
db_dir = os.path.dirname(db_file)
if not os.path.exists(db_dir):
try:
create_folder(folder=db_dir)
except CuckooOperationalError as e:
raise CuckooDatabaseError("Unable to create database directory: {0}".format(e))
self.engine = create_engine("sqlite:///{0}".format(db_file), poolclass=NullPool)
# Disable SQL logging. Turn it on for debugging.
self.engine.echo = False
# Connection timeout.
if cfg.database.timeout:
self.engine.pool_timeout = cfg.database.timeout
else:
self.engine.pool_timeout = 60
# Create schema.
try:
Base.metadata.create_all(self.engine)
except SQLAlchemyError as e:
raise CuckooDatabaseError("Unable to create or connect to database: {0}".format(e))
# Get db session.
self.Session = sessionmaker(bind=self.engine)
开发者ID:AmesianX,项目名称:HackingStuff,代码行数:35,代码来源:database.py
示例10: construct_maker
def construct_maker(databases, models=None, query_cls=Query, engine_params=None,
session_class=DBSession):
'''
databases - str with db uri or dict.
models - str name of models package (module), default is 'module'
query_cls - additional query class
engine_params - additional engine params
'''
models = models or 'models'
engine_params = engine_params or {}
db_dict = {}
if isinstance(databases, basestring):
engine = create_engine(databases, **engine_params)
return orm.sessionmaker(class_=session_class, query_cls=query_cls,
bind=engine, autoflush=False)
for ref, uri in databases.items():
md_ref = '.'.join(filter(None, [models, ref]))
metadata = import_string(md_ref, 'metadata')
engine = create_engine(uri, **engine_params)
#TODO: find out why this is broken since sqlalchemy 0.7
#engine.logger.logger.name += '(%s)' % ref
for table in metadata.sorted_tables:
db_dict[table] = engine
return orm.sessionmaker(class_=session_class, query_cls=query_cls, binds=db_dict,
autoflush=False)
开发者ID:riffm,项目名称:mage,代码行数:25,代码来源:sqla.py
示例11: test_mysqldb_connection
def test_mysqldb_connection():
sphinx_engine = create_engine("sphinx://")
assert isinstance(sphinx_engine.dialect, SphinxDialect)
assert isinstance(sphinx_engine.dialect, myDialect)
sphinx_engine = create_engine("sphinx+mysqldb://")
assert isinstance(sphinx_engine.dialect, SphinxDialect)
assert isinstance(sphinx_engine.dialect, myDialect)
开发者ID:AdrielVelazquez,项目名称:sqlalchemy-sphinx,代码行数:7,代码来源:test_connections.py
示例12: create_db_connection
def create_db_connection(username, password, host, db_name, flavor):
if flavor.lower() == 'mysql':
connection = create_engine('mysql+mysqldb://' + username + ':' + password + '@' + host + '/' + db_name + '?charset=utf8mb4', pool_recycle=3600, execution_options={'autocommit':True})
else:
connection = create_engine('postgresql+psycopg2://' + username + ':' + password + '@' + host + '/' + db_name, pool_recycle=3600, execution_options={'autocommit':True})
#connection = connection.connect()
return connection
开发者ID:rpmaxwell,项目名称:twitter_search_utilities,代码行数:7,代码来源:connections_util.py
示例13: setup_database
def setup_database(self):
# DB library imports
from sqlalchemy import (create_engine, Table, MetaData, Column, Integer,
String, Unicode, Text, UnicodeText, Date, Numeric,
Time, Float, DateTime, Interval, Binary, Boolean,
PickleType)
from sqlalchemy.orm import sessionmaker, mapper
# Create global name mappings for model()
global column_mapping
column_mapping = {
'string': String, 'str': String,
'integer': Integer, 'int': Integer,
'unicode': Unicode, 'text': Text,
'unicodetext': UnicodeText, 'date': Date,
'numeric': Numeric, 'time': Time,
'float': Float, 'datetime': DateTime,
'interval': Interval, 'binary': Binary,
'boolean': Boolean, 'bool': Boolean,
'pickletype': PickleType,
}
# Add a few SQLAlchemy types to globals() so we can use them in models
globals().update({'Column': Column, 'Table': Table, 'Integer': Integer,
'MetaData': MetaData, 'mapper': mapper})
# Ensures correct slash number for sqlite
if self.config['db_type'] == 'sqlite':
self.config['db_location'] = '/' + self.config['db_location']
eng_name = self.config['db_type'] + '://' + self.config['db_location']
self.config['db_engine'] = create_engine(eng_name)
self.config['db_session'] = sessionmaker(bind=self.config['db_engine'])()
elif self.config['db_type'] == 'postgres':
eng_name = self.config['db_type'] + '://' + self.config['db_location']
self.config['db_engine'] = create_engine(eng_name, encoding="utf8", convert_unicode=True)
self.config['db_session'] = sessionmaker(bind=self.config['db_engine'])()
开发者ID:malaniz,项目名称:webyarara,代码行数:34,代码来源:yarara.py
示例14: getDatabaseEngine
def getDatabaseEngine(self, section, key="database"):
"""
Return a database engine from the registry.
========= ============
Parameter Description
========= ============
section name of the configuration section where the config is placed.
key optional value for the key where the database information is stored, defaults to *database*.
========= ============
``Return``: database engine
"""
config_key = "%s.%s" % (section, key)
index = "%s:%s" % (os.getpid(), config_key)
if not index in self.__db:
if not self.config.get(config_key):
raise Exception("No database connection defined for '%s'!" % index)
if self.config.get(config_key).startswith("sqlite://"):
from sqlalchemy.pool import StaticPool
self.__db[index] = create_engine(self.config.get(config_key),
connect_args={'check_same_thread': False},
poolclass=StaticPool, encoding="utf-8")
else:
self.__db[index] = create_engine(self.config.get(config_key), encoding="utf-8")
#TODO: configure engine
#self.__db[index] = create_engine(self.config.get(index),
# pool_size=40, pool_recycle=120, echo=True)
return self.__db[index]
开发者ID:gonicus,项目名称:gosa,代码行数:32,代码来源:env.py
示例15: __init__
def __init__(self, config=None, sqluri=None, create_tables=True,
pool_size=100, pool_recycle=60, pool_timeout=30,
max_overflow=10, pool_reset_on_return='rollback', **kw):
self.config = config or {}
self.sqluri = sqluri or config['SQLURI']
if pool_reset_on_return.lower() in ('', 'none'):
pool_reset_on_return = None
if self.sqluri.startswith(('mysql', 'pymysql')):
self._engine = create_engine(
self.sqluri,
pool_size=pool_size,
pool_recycle=pool_recycle,
pool_timeout=pool_timeout,
pool_reset_on_return=pool_reset_on_return,
max_overflow=max_overflow,
logging_name='addonreg')
else:
self._engine = create_engine(sqluri, poolclass=NullPool)
self._engine.echo = kw.get('echo', False)
self.hashes = hash_table
self.hashes.metadata.bind = self._engine
if create_tables:
self.hashes.create(checkfirst=True)
开发者ID:jasonthomas,项目名称:addon-registration,代码行数:27,代码来源:rawsql.py
示例16: __init__
def __init__(self):
if app.config['SQL_DRIVER'] == 'pymssql':
engine = create_engine(r"mssql+pymssql://{0}:{1}@{2}:{3}/{4}".format(
app.config['DATABASE_USER'],
app.config['DATABASE_PASSWORD'],
app.config['DATABASE_HOST'],
app.config['DATABASE_PORT'],
app.config['DATABASE_NAME']))
else:
quoted = urllib.quote_plus('DRIVER={FreeTDS};Server=%s;Database=%s;UID=%s;PWD=%s;TDS_Version=8.0;CHARSET=UTF8;Port=1433;'
%(app.config['DATABASE_HOST'],
app.config['DATABASE_NAME'],
app.config['DATABASE_USER'],
app.config['DATABASE_PASSWORD']))
engine = create_engine('mssql+pyodbc:///?odbc_connect={}'.format(quoted), connect_args={'convert_unicode': True})
# create a Session
Session = sessionmaker(bind=engine)
try:
self.session = Session()
event.listen(Session, "after_transaction_create", self.session.execute('SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED'))
Log.info('Connection Openned')
except:
Log.info('Can\'t create database session')
raise
开发者ID:culturagovbr,项目名称:salic-api,代码行数:30,代码来源:mssql_connector.py
示例17: __init__
def __init__(self, url):
self.table = Table(self.__tablename__, MetaData(),
Column('name', String(64)),
Column('group', String(64)),
Column('status', String(16)),
Column('script', Text),
Column('comments', String(1024)),
Column('rate', Float(11)),
Column('burst', Float(11)),
Column('updatetime', Float(32)),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
self.url = make_url(url)
if self.url.database:
database = self.url.database
self.url.database = None
try:
engine = create_engine(self.url)
conn = engine.connect()
conn.execute("commit")
conn.execute("CREATE DATABASE %s" % database)
except sqlalchemy.exc.SQLAlchemyError:
pass
self.url.database = database
self.engine = create_engine(url)
self.table.create(self.engine, checkfirst=True)
开发者ID:01jiagnwei01,项目名称:pyspider,代码行数:28,代码来源:projectdb.py
示例18: test_unlock_scenario_structure
def test_unlock_scenario_structure(fresh_database_config, scenario_manager):
"""Test that unlock method removes records from the table for locks.
1. Create 3 scenarios with scenario manager.
2. Create records in the ScenarioStructureLock table for the first and third scenarios.
3. Unlock the first scenario.
4. Check that the ScenarioStructureLock table contains only one record for the third scenario.
"""
scenarios = scenario_manager.create_scenarios([
NewScenarioSpec(name=u'First'),
NewScenarioSpec(name=u'Second'),
NewScenarioSpec(name=u'Third'),
])
session = Session(bind=create_engine(fresh_database_config.get_connection_string()))
session.add_all([
ScenarioStructureLockRecord(scenario_id=scenarios[0].scenario_id),
ScenarioStructureLockRecord(scenario_id=scenarios[2].scenario_id),
])
session.commit()
scenarios[0]._unlock_structure()
session = Session(bind=create_engine(fresh_database_config.get_connection_string()))
lock_record = session.query(ScenarioStructureLockRecord).one()
assert lock_record.scenario_id == scenarios[2].scenario_id, "Wrong scenario has been unlocked"
开发者ID:KillAChicken,项目名称:AutoStorage,代码行数:26,代码来源:test_lock_scenario_structure.py
示例19: __init__
def __init__(self, url):
self.table = Table('__tablename__', MetaData(),
Column('taskid', String(64), primary_key=True, nullable=False),
Column('project', String(64)),
Column('url', String(1024)),
Column('status', Integer),
Column('schedule', LargeBinary),
Column('fetch', LargeBinary),
Column('process', LargeBinary),
Column('track', LargeBinary),
Column('lastcrawltime', Float(32)),
Column('updatetime', Float(32)),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
self.url = make_url(url)
if self.url.database:
database = self.url.database
self.url.database = None
try:
engine = create_engine(self.url, convert_unicode=True)
engine.execute("CREATE DATABASE IF NOT EXISTS %s" % database)
except sqlalchemy.exc.SQLAlchemyError:
pass
self.url.database = database
self.engine = create_engine(self.url, convert_unicode=True)
self._list_project()
开发者ID:bartqiao,项目名称:pyspider,代码行数:29,代码来源:taskdb.py
示例20: setup_main_database
def setup_main_database(self, conf_parser):
if conf_parser.getboolean("main-database", "enabled"):
connection_string = conf_parser.get("main-database", "connection_string")
logger.info("Connecting to main database with: {0}".format(connection_string))
if connection_string.startswith("mongodb://"):
maindb = log_mongodb.Database(connection_string)
dorkdb = database_mongo.Database(connection_string)
return (maindb, dorkdb)
elif connection_string.startswith(("sqlite", "mysql",
"oracle", "postgresql")):
sqla_engine = create_engine(connection_string)
maindb = log_sql.Database(sqla_engine)
dorkdb = database_sqla.Database(sqla_engine)
return (maindb, dorkdb)
else:
logger.error("Invalid connection string.")
sys.exit(1)
else:
default_db = "sqlite://db/glastopf.db"
logger.info("Main datbase has been disabled, dorks will be stored in: {0}".format(default_db))
#db will only be used for dorks
sqla_engine = create_engine("sqlite://db/glastopf.db")
maindb = log_sql.Database(sqla_engine)
dorkdb = database_sqla.Database(sqla_engine)
#disable usage of main logging datbase
return (None, dorkdb)
开发者ID:RBoutot,项目名称:glastopf,代码行数:27,代码来源:glastopf.py
注:本文中的sqlalchemy.create_engine函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论