• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python sqlalchemy.create_engine函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.create_engine函数的典型用法代码示例。如果您正苦于以下问题:Python create_engine函数的具体用法?Python create_engine怎么用?Python create_engine使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了create_engine函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: __init__

    def __init__(self,
                 dbtype="postgresql",
                 dbhost="localhost",
                 dbuser="postgres",
                 dbpass="postgres",
                 dbname="postgres"):

        self.dbname = dbname
        self.dbtype = dbtype
        self.dbhost = dbhost
        self.dbuser = dbuser
        self.dbpass = dbpass

        self.engine = create_engine(self.dbstring)
        try:
            self.engine.connect()
        except OperationalError:
            self.create_database(dbname)
            self.engine = create_engine(self.dbstring)
            self.engine.connect()

        self.metadata = MetaData(self.engine)
        self.Session = sessionmaker(bind=self.engine, expire_on_commit=False)

        Base.metadata.create_all(self.engine)
开发者ID:konstantinfarrell,项目名称:dbfoo,代码行数:25,代码来源:database.py


示例2: _db_params

def _db_params(request, db_name):
    db_url = request.param['url']
    sudo_engine = sqlalchemy.create_engine(db_url % request.param.get('admin_db', ''), poolclass=sqlalchemy.pool.NullPool)
    db_connection_url = db_url % db_name

    def tear_down():
        with sudo_engine.connect() as conn:
            conn.execute('rollback')
            if 'mssql' in db_url:
                conn.connection.connection.autocommit = True
            conn.execute('drop database if exists %s' % db_name)

    try:
        with sqlalchemy.create_engine(db_connection_url).connect():
            pass
    except (sqlalchemy.exc.OperationalError, sqlalchemy.exc.InternalError, DBAPIError):
        with sudo_engine.connect() as conn:
            conn.execute('rollback')
            if 'mssql' in db_url:
                conn.connection.connection.autocommit = True
            conn.execute('create database %s' % db_name)
    else:
        raise Exception('Database %s already exists; refusing to overwrite' % db_name)

    request.addfinalizer(tear_down)

    params = request.param.copy()
    params['url'] = db_connection_url
    return params
开发者ID:dimagi,项目名称:commcare-export,代码行数:29,代码来源:conftest.py


示例3: _has_sqlite

 def _has_sqlite(self):
     from sqlalchemy import create_engine
     try:
         create_engine('sqlite://')
         return True
     except ImportError:
         return False
开发者ID:NeilTruick,项目名称:sqlalchemy,代码行数:7,代码来源:requirements.py


示例4: test_mysql_innodb

    def test_mysql_innodb(self):
        """Test that table creation on mysql only builds InnoDB tables."""
        # add this to the global lists to make reset work with it, it's removed
        # automaticaly in tearDown so no need to clean it up here.
        connect_string = _get_connect_string('mysql')
        engine = sqlalchemy.create_engine(connect_string)
        self.engines["mysqlcitest"] = engine
        self.test_databases["mysqlcitest"] = connect_string

        # build a fully populated mysql database with all the tables
        self._reset_databases()
        self._walk_versions(engine, False, False)

        uri = _get_connect_string('mysql', database="information_schema")
        connection = sqlalchemy.create_engine(uri).connect()

        # sanity check
        total = connection.execute("SELECT count(*) "
                                   "from information_schema.TABLES "
                                   "where TABLE_SCHEMA='openstack_citest'")
        self.assertGreater(total.scalar(), 0,
                           msg="No tables found. Wrong schema?")

        noninnodb = connection.execute("SELECT count(*) "
                                       "from information_schema.TABLES "
                                       "where TABLE_SCHEMA='openstack_citest' "
                                       "and ENGINE!='InnoDB' "
                                       "and TABLE_NAME!='migrate_version'")
        count = noninnodb.scalar()
        self.assertEqual(count, 0, "%d non InnoDB tables created" % count)
开发者ID:japplewhite,项目名称:cinder-1,代码行数:30,代码来源:test_migrations.py


示例5: get_session

def get_session(verbose, test):
    """Returns current DB session from SQLAlchemy pool.
    
    * **verbose** if *True* SQLAlchemy **echo** is set to *True*.
    * **test** if *True* database is crea   ted in RAM only.
    
    >>> get_session(False, True) #doctest: +ELLIPSIS
    <sqlalchemy.orm.session.Session object at 0x...>
    
    >>> get_session(False, False) #doctest: +ELLIPSIS
    <sqlalchemy.orm.session.Session object at 0x...>
    """
    if test:
        engine = create_engine('sqlite:///:memory:', echo=verbose)
        log_load.debug('DB in RAM.')
    else:
        engine = create_engine('sqlite:///DB/3did.db', echo=verbose)
        log_load.debug('DB stored in file: %s' % 'DB/3did.db')
    
    # Create TABLEs: Domain, Interaction, PDB, Interacting_PDBs
    meta.create_all(engine)
    
    # Create session to be used with INSERTs
    Session_3DID = sessionmaker(bind=engine)
    session_3DID = Session_3DID()
    
    return session_3DID
开发者ID:piobyz,项目名称:protein-protein-interactions-motifs.,代码行数:27,代码来源:DB_3DID.py


示例6: __init__

    def __init__(self,configfile='~/.ddr_compress/still.cfg',test=False):
        """
        Connect to the database and initiate a session creator.
         or
        create a FALSE database

        db.cfg is the default setup. Config files live in ddr_compress/configs
        To use a config file, copy the desired file ~/.paperstill/db.cfg
        """
        if not configfile is None:
            config = configparser.ConfigParser()
            configfile = os.path.expanduser(configfile)
            if os.path.exists(configfile):
                logger.info('loading file '+configfile)
                config.read(configfile)
                self.dbinfo = config['dbinfo']
                self.dbinfo['password'] = self.dbinfo['password'].decode('string-escape')
            else:
                logging.info(configfile+" Not Found")
        if test:
            self.engine = create_engine('sqlite:///',
                                        connect_args={'check_same_thread':False},
                                        poolclass=StaticPool)
            self.createdb()
        else:
            self.engine = create_engine(
                    'mysql://{username}:{password}@{hostip}:{port}/{dbname}'.format(
                                **self.dbinfo),
                                pool_size=20,
                                max_overflow=40)
        self.Session = sessionmaker(bind=self.engine)
开发者ID:acliu,项目名称:capo,代码行数:31,代码来源:dbi.py


示例7: yeild_engine

    def yeild_engine(self):
        '''create a sql alchemy engine'''
        head = 'postgresql://'
        tail = ''.join(['@', self.host, '/', self.db])

        # test for both user and password provided
        if not (self.user is None or self.password is None):
            jnd = ''.join([head, self.user, ':', self.password, tail])
            return sqlalchemy.create_engine(jnd)

        # if user or password not provided pull from pgpass file
        path2pass = os.path.join(os.environ['HOME'], '.pgpass')
        with open(path2pass, 'r') as f:
            for line in f.readlines():
                line = line.split(':')
                if (line[0] == self.host) and (int(line[1]) == int(self.port)):
                    if self.password is None:
                        password = line[-1].strip()
                    else:
                        password = self.password
                    if self.user is None:
                        user = line[-2].strip()
                    else:
                        user = self.user
                    jnd = ''.join([head, user, ':', password, tail])
                    return sqlalchemy.create_engine(jnd)
        raise RuntimeError('unable to resolve credentials')
开发者ID:marshall245,项目名称:Analytics,代码行数:27,代码来源:dbconnect.py


示例8: __init__

    def __init__(self, uri, init_func=None):
        # pool_size is set to 1 because each Bridge service is single threaded, so
        # there's no reason to use more than one connection per app.
        # pool_recycle is set to 60 to avoid MySQL timing out connections after
        # awhile. Without it we'd get mysql disconnections after long idle periods.
        # pool_timeout will cause SQLAlchemy to wait longer before giving up on new
        # connections to the server. It's not strictly necessary, but it doesn't hurt.
        #
        # MySQLdb's default cursor doesn't let you stream rows as you receive them.
        # Even if you use SQLAlchemy iterators, _all_ rows will be read before any
        # are returned. The SSCursor lets you stream data, but has the side effect
        # of not allowing multiple queries on the same Connection at the same time.
        # That's OK for us because each service is single threaded, but it means
        # we must ensure that "close" is called whenever we do queries. "fetchall"
        # does this automatically, so we always use it.
        if "mysql" in uri:
            from MySQLdb.cursors import SSCursor
            self.db = sa.create_engine(uri, pool_size=1, pool_recycle=60, pool_timeout=60,
                                       connect_args={"cursorclass": SSCursor})
        else:
            self.db = sa.create_engine(uri, pool_size=1, pool_recycle=60)

        if init_func:
            init_func(self.db)

        metadata = sa.MetaData(self.db)
        metadata.reflect()
        self.buildrequests_table = metadata.tables["buildrequests"]
        self.builds_table = metadata.tables["builds"]
        self.sourcestamps_table = metadata.tables["sourcestamps"]
        self.buildset_properties_table = metadata.tables["buildset_properties"]
        self.buildsets_table = metadata.tables["buildsets"]
开发者ID:Callek,项目名称:buildbot-bridge,代码行数:32,代码来源:servicebase.py


示例9: __init__

    def __init__(self, dsn=None):
        """@param dsn: database connection string."""
        cfg = Config()

        if dsn:
            self.engine = create_engine(dsn, poolclass=NullPool)
        elif cfg.database.connection:
            self.engine = create_engine(cfg.database.connection, poolclass=NullPool)
        else:
            db_file = os.path.join(CUCKOO_ROOT, "db", "cuckoo.db")
            if not os.path.exists(db_file):
                db_dir = os.path.dirname(db_file)
                if not os.path.exists(db_dir):
                    try:
                        create_folder(folder=db_dir)
                    except CuckooOperationalError as e:
                        raise CuckooDatabaseError("Unable to create database directory: {0}".format(e))

            self.engine = create_engine("sqlite:///{0}".format(db_file), poolclass=NullPool)

        # Disable SQL logging. Turn it on for debugging.
        self.engine.echo = False
        # Connection timeout.
        if cfg.database.timeout:
            self.engine.pool_timeout = cfg.database.timeout
        else:
            self.engine.pool_timeout = 60
        # Create schema.
        try:
            Base.metadata.create_all(self.engine)
        except SQLAlchemyError as e:
            raise CuckooDatabaseError("Unable to create or connect to database: {0}".format(e))

        # Get db session.
        self.Session = sessionmaker(bind=self.engine)
开发者ID:AmesianX,项目名称:HackingStuff,代码行数:35,代码来源:database.py


示例10: construct_maker

def construct_maker(databases, models=None, query_cls=Query, engine_params=None,
                    session_class=DBSession):
    '''
    databases - str with db uri or dict.
    models - str name of models package (module), default is 'module'
    query_cls - additional query class
    engine_params - additional engine params
    '''
    models = models or 'models'
    engine_params = engine_params or {}
    db_dict = {}
    if isinstance(databases, basestring):
        engine = create_engine(databases, **engine_params)
        return orm.sessionmaker(class_=session_class, query_cls=query_cls,
                                bind=engine, autoflush=False)
    for ref, uri in databases.items():
        md_ref = '.'.join(filter(None, [models, ref]))
        metadata = import_string(md_ref, 'metadata')
        engine = create_engine(uri, **engine_params)
        #TODO: find out why this is broken since sqlalchemy 0.7
        #engine.logger.logger.name += '(%s)' % ref
        for table in metadata.sorted_tables:
            db_dict[table] = engine
    return orm.sessionmaker(class_=session_class, query_cls=query_cls, binds=db_dict,
                            autoflush=False)
开发者ID:riffm,项目名称:mage,代码行数:25,代码来源:sqla.py


示例11: test_mysqldb_connection

def test_mysqldb_connection():
    sphinx_engine = create_engine("sphinx://")
    assert isinstance(sphinx_engine.dialect, SphinxDialect)
    assert isinstance(sphinx_engine.dialect, myDialect)
    sphinx_engine = create_engine("sphinx+mysqldb://")
    assert isinstance(sphinx_engine.dialect, SphinxDialect)
    assert isinstance(sphinx_engine.dialect, myDialect)
开发者ID:AdrielVelazquez,项目名称:sqlalchemy-sphinx,代码行数:7,代码来源:test_connections.py


示例12: create_db_connection

def create_db_connection(username, password, host, db_name, flavor):
    if flavor.lower() == 'mysql':
        connection = create_engine('mysql+mysqldb://' + username + ':' + password + '@' + host + '/' + db_name + '?charset=utf8mb4', pool_recycle=3600, execution_options={'autocommit':True})
    else:
        connection = create_engine('postgresql+psycopg2://' + username + ':' + password + '@' + host + '/' + db_name, pool_recycle=3600, execution_options={'autocommit':True})
    #connection = connection.connect()
    return connection
开发者ID:rpmaxwell,项目名称:twitter_search_utilities,代码行数:7,代码来源:connections_util.py


示例13: setup_database

  def setup_database(self):
    # DB library imports
    from sqlalchemy import (create_engine, Table, MetaData, Column, Integer, 
                            String, Unicode, Text, UnicodeText, Date, Numeric, 
                            Time, Float, DateTime, Interval, Binary, Boolean, 
                            PickleType)
    from sqlalchemy.orm import sessionmaker, mapper
    # Create global name mappings for model()
    global column_mapping
    column_mapping = {
              'string': String,       'str': String,
             'integer': Integer,      'int': Integer, 
             'unicode': Unicode,     'text': Text,
         'unicodetext': UnicodeText, 'date': Date,
             'numeric': Numeric,     'time': Time,
               'float': Float,   'datetime': DateTime,
            'interval': Interval,  'binary': Binary,
             'boolean': Boolean,     'bool': Boolean,
          'pickletype': PickleType,
    }
 
    # Add a few SQLAlchemy types to globals() so we can use them in models
    globals().update({'Column': Column, 'Table': Table, 'Integer': Integer,
                      'MetaData': MetaData, 'mapper': mapper})
    # Ensures correct slash number for sqlite
    if self.config['db_type'] == 'sqlite':
      self.config['db_location'] = '/' + self.config['db_location']
      eng_name = self.config['db_type'] + '://' + self.config['db_location']
      self.config['db_engine'] = create_engine(eng_name)
      self.config['db_session'] = sessionmaker(bind=self.config['db_engine'])()
    elif self.config['db_type'] == 'postgres':
      eng_name = self.config['db_type'] + '://' + self.config['db_location']
      self.config['db_engine'] = create_engine(eng_name, encoding="utf8", convert_unicode=True)
      self.config['db_session'] = sessionmaker(bind=self.config['db_engine'])()
开发者ID:malaniz,项目名称:webyarara,代码行数:34,代码来源:yarara.py


示例14: getDatabaseEngine

    def getDatabaseEngine(self, section, key="database"):
        """
        Return a database engine from the registry.

        ========= ============
        Parameter Description
        ========= ============
        section   name of the configuration section where the config is placed.
        key       optional value for the key where the database information is stored, defaults to *database*.
        ========= ============

        ``Return``: database engine
        """
        config_key = "%s.%s" % (section, key)
        index = "%s:%s" % (os.getpid(), config_key)

        if not index in self.__db:
            if not self.config.get(config_key):
                raise Exception("No database connection defined for '%s'!" % index)
            if self.config.get(config_key).startswith("sqlite://"):
                from sqlalchemy.pool import StaticPool
                self.__db[index] = create_engine(self.config.get(config_key),
                                                 connect_args={'check_same_thread': False},
                                                 poolclass=StaticPool, encoding="utf-8")
            else:
                self.__db[index] = create_engine(self.config.get(config_key), encoding="utf-8")

            #TODO: configure engine
            #self.__db[index] = create_engine(self.config.get(index),
            #        pool_size=40, pool_recycle=120, echo=True)

        return self.__db[index]
开发者ID:gonicus,项目名称:gosa,代码行数:32,代码来源:env.py


示例15: __init__

    def __init__(self, config=None, sqluri=None, create_tables=True,
                 pool_size=100, pool_recycle=60, pool_timeout=30,
                 max_overflow=10, pool_reset_on_return='rollback', **kw):
        self.config = config or {}
        self.sqluri = sqluri or config['SQLURI']
        if pool_reset_on_return.lower() in ('', 'none'):
            pool_reset_on_return = None

        if self.sqluri.startswith(('mysql', 'pymysql')):
            self._engine = create_engine(
                self.sqluri,
                pool_size=pool_size,
                pool_recycle=pool_recycle,
                pool_timeout=pool_timeout,
                pool_reset_on_return=pool_reset_on_return,
                max_overflow=max_overflow,
                logging_name='addonreg')

        else:
            self._engine = create_engine(sqluri, poolclass=NullPool)

        self._engine.echo = kw.get('echo', False)
        self.hashes = hash_table

        self.hashes.metadata.bind = self._engine
        if create_tables:
            self.hashes.create(checkfirst=True)
开发者ID:jasonthomas,项目名称:addon-registration,代码行数:27,代码来源:rawsql.py


示例16: __init__

    def __init__(self):

        if app.config['SQL_DRIVER'] == 'pymssql':
          engine = create_engine(r"mssql+pymssql://{0}:{1}@{2}:{3}/{4}".format(
                                          app.config['DATABASE_USER'],
                                          app.config['DATABASE_PASSWORD'],
                                          app.config['DATABASE_HOST'],
                                          app.config['DATABASE_PORT'],
                                          app.config['DATABASE_NAME']))

        else:
          quoted = urllib.quote_plus('DRIVER={FreeTDS};Server=%s;Database=%s;UID=%s;PWD=%s;TDS_Version=8.0;CHARSET=UTF8;Port=1433;'
                                          %(app.config['DATABASE_HOST'],
                                            app.config['DATABASE_NAME'],
                                            app.config['DATABASE_USER'],
                                            app.config['DATABASE_PASSWORD']))

          engine = create_engine('mssql+pyodbc:///?odbc_connect={}'.format(quoted), connect_args={'convert_unicode': True})

        # create a Session
        Session = sessionmaker(bind=engine)

        try:
            self.session = Session()
            event.listen(Session, "after_transaction_create", self.session.execute('SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED'))
            
            Log.info('Connection Openned')
        except:
            Log.info('Can\'t create database session')
            raise
开发者ID:culturagovbr,项目名称:salic-api,代码行数:30,代码来源:mssql_connector.py


示例17: __init__

    def __init__(self, url):
        self.table = Table(self.__tablename__, MetaData(),
                           Column('name', String(64)),
                           Column('group', String(64)),
                           Column('status', String(16)),
                           Column('script', Text),
                           Column('comments', String(1024)),
                           Column('rate', Float(11)),
                           Column('burst', Float(11)),
                           Column('updatetime', Float(32)),
                           mysql_engine='InnoDB',
                           mysql_charset='utf8'
                           )

        self.url = make_url(url)
        if self.url.database:
            database = self.url.database
            self.url.database = None
            try:
                engine = create_engine(self.url)
                conn = engine.connect()
                conn.execute("commit")
                conn.execute("CREATE DATABASE %s" % database)
            except sqlalchemy.exc.SQLAlchemyError:
                pass
            self.url.database = database
        self.engine = create_engine(url)
        self.table.create(self.engine, checkfirst=True)
开发者ID:01jiagnwei01,项目名称:pyspider,代码行数:28,代码来源:projectdb.py


示例18: test_unlock_scenario_structure

def test_unlock_scenario_structure(fresh_database_config, scenario_manager):
    """Test that unlock method removes records from the table for locks.

    1. Create 3 scenarios with scenario manager.
    2. Create records in the ScenarioStructureLock table for the first and third scenarios.
    3. Unlock the first scenario.
    4. Check that the ScenarioStructureLock table contains only one record for the third scenario.
    """
    scenarios = scenario_manager.create_scenarios([
        NewScenarioSpec(name=u'First'),
        NewScenarioSpec(name=u'Second'),
        NewScenarioSpec(name=u'Third'),
        ])

    session = Session(bind=create_engine(fresh_database_config.get_connection_string()))
    session.add_all([
        ScenarioStructureLockRecord(scenario_id=scenarios[0].scenario_id),
        ScenarioStructureLockRecord(scenario_id=scenarios[2].scenario_id),
        ])
    session.commit()

    scenarios[0]._unlock_structure()

    session = Session(bind=create_engine(fresh_database_config.get_connection_string()))
    lock_record = session.query(ScenarioStructureLockRecord).one()
    assert lock_record.scenario_id == scenarios[2].scenario_id, "Wrong scenario has been unlocked"
开发者ID:KillAChicken,项目名称:AutoStorage,代码行数:26,代码来源:test_lock_scenario_structure.py


示例19: __init__

    def __init__(self, url):
        self.table = Table('__tablename__', MetaData(),
                           Column('taskid', String(64), primary_key=True, nullable=False),
                           Column('project', String(64)),
                           Column('url', String(1024)),
                           Column('status', Integer),
                           Column('schedule', LargeBinary),
                           Column('fetch', LargeBinary),
                           Column('process', LargeBinary),
                           Column('track', LargeBinary),
                           Column('lastcrawltime', Float(32)),
                           Column('updatetime', Float(32)),
                           mysql_engine='InnoDB',
                           mysql_charset='utf8'
                           )

        self.url = make_url(url)
        if self.url.database:
            database = self.url.database
            self.url.database = None
            try:
                engine = create_engine(self.url, convert_unicode=True)
                engine.execute("CREATE DATABASE IF NOT EXISTS %s" % database)
            except sqlalchemy.exc.SQLAlchemyError:
                pass
            self.url.database = database
        self.engine = create_engine(self.url, convert_unicode=True)

        self._list_project()
开发者ID:bartqiao,项目名称:pyspider,代码行数:29,代码来源:taskdb.py


示例20: setup_main_database

    def setup_main_database(self, conf_parser):

        if conf_parser.getboolean("main-database", "enabled"):
            connection_string = conf_parser.get("main-database", "connection_string")
            logger.info("Connecting to main database with: {0}".format(connection_string))
            if connection_string.startswith("mongodb://"):
                maindb = log_mongodb.Database(connection_string)
                dorkdb = database_mongo.Database(connection_string)
                return (maindb, dorkdb)
            elif connection_string.startswith(("sqlite", "mysql",
                                               "oracle", "postgresql")):
                sqla_engine = create_engine(connection_string)
                maindb = log_sql.Database(sqla_engine)
                dorkdb = database_sqla.Database(sqla_engine)
                return (maindb, dorkdb)
            else:
                logger.error("Invalid connection string.")
                sys.exit(1)
        else:
            default_db = "sqlite://db/glastopf.db"
            logger.info("Main datbase has been disabled, dorks will be stored in: {0}".format(default_db))
            #db will only be used for dorks
            sqla_engine = create_engine("sqlite://db/glastopf.db")
            maindb = log_sql.Database(sqla_engine)
            dorkdb = database_sqla.Database(sqla_engine)
            #disable usage of main logging datbase
            return (None, dorkdb)
开发者ID:RBoutot,项目名称:glastopf,代码行数:27,代码来源:glastopf.py



注:本文中的sqlalchemy.create_engine函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sqlalchemy.delete函数代码示例发布时间:2022-05-27
下一篇:
Python sqlalchemy.column函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap