• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python engine.create_engine函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.engine.create_engine函数的典型用法代码示例。如果您正苦于以下问题:Python create_engine函数的具体用法?Python create_engine怎么用?Python create_engine使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了create_engine函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: __init__

    def __init__(self, environ=None):
        """
        Establish an environ for this instance.
        """

        global ENGINE, MAPPED

        if environ is None:
            environ = {}
        self.environ = environ

        if not ENGINE:
            db_config = self._db_config()
            if 'mysql' in db_config:
                try:
                    from tiddlywebplugins.mysql2 import LookLively
                    listeners = [LookLively()]
                except ImportError:
                    listeners = []
                ENGINE = create_engine(db_config,
                        pool_recycle=3600,
                        pool_size=20,
                        max_overflow=-1,
                        pool_timeout=2,
                        listeners=listeners)
            else:
                ENGINE = create_engine(db_config)
            METADATA.bind = ENGINE
            SESSION.configure(bind=ENGINE)

        self.session = SESSION()

        if not MAPPED:
            METADATA.create_all(ENGINE)
            MAPPED = True
开发者ID:jdlrobson,项目名称:tiddlywebplugins.links,代码行数:35,代码来源:linksmanager.py


示例2: __init__

    def __init__(self, environ=None):
        """
        Establish an environ for this instance.
        """

        global ENGINE, MAPPED

        if environ is None:
            environ = {}
        self.environ = environ

        if not ENGINE:
            db_config = self._db_config()
            if 'mysql' in db_config:
                ENGINE = create_engine(db_config,
                        pool_recycle=3600,
                        pool_size=20,
                        max_overflow=-1,
                        pool_timeout=2)
                try:
                    from tiddlywebplugins.mysql3 import on_checkout
                    from sqlalchemy import event
                    event.listen(ENGINE, 'checkout', on_checkout)
                except ImportError:
                    pass
            else:
                ENGINE = create_engine(db_config)
            METADATA.bind = ENGINE
            SESSION.configure(bind=ENGINE)

        self.session = SESSION()

        if not MAPPED:
            METADATA.create_all(ENGINE)
            MAPPED = True
开发者ID:tiddlyweb,项目名称:tiddlywebplugins.links,代码行数:35,代码来源:linksmanager.py


示例3: __init__

 def __init__(self, url, pool_size=sqlalchemy_dao.POOL_DEFAULT):
     if pool_size == sqlalchemy_dao.POOL_DISABLED:
         self._engine = engine.create_engine(url, poolclass=NullPool)
     else:
         self._engine = engine.create_engine(url, pool_size=pool_size, pool_recycle=3600,
                 max_overflow=sys.maxsize)
     self._Session = session.sessionmaker(bind=self._engine, class_=self.session_class) # pylint: disable=invalid-name
开发者ID:CooledCoffee,项目名称:sqlalchemy-dao,代码行数:7,代码来源:dao.py


示例4: compute

    def compute(self):
        url = URL(drivername=self.get_input('protocol'),
                  username=self.force_get_input('user', None),
                  password=self.force_get_input('password', None),
                  host=self.force_get_input('host', None),
                  port=self.force_get_input('port', None),
                  database=self.get_input('db_name'))

        try:
            engine = create_engine(url)
        except ImportError, e:
            driver = url.drivername
            installed = False
            if driver == 'sqlite':
                raise ModuleError(self,
                                  "Python was built without sqlite3 support")
            elif (driver == 'mysql' or
                    driver == 'drizzle'): # drizzle is a variant of MySQL
                installed = install({
                        'pip': 'mysql-python',
                        'linux-debian': 'python-mysqldb',
                        'linux-ubuntu': 'python-mysqldb',
                        'linux-fedora': 'MySQL-python'})
            elif (driver == 'postgresql' or
                    driver == 'postgre'):   # deprecated alias
                installed = install({
                        'pip': 'psycopg2',
                        'linux-debian':'python-psycopg2',
                        'linux-ubuntu':'python-psycopg2',
                        'linux-fedora':'python-psycopg2'})
            elif driver == 'firebird':
                installed = install({
                        'pip': 'fdb',
                        'linux-fedora':'python-fdb'})
            elif driver == 'mssql' or driver == 'sybase':
                installed = install({
                        'pip': 'pyodbc',
                        'linux-debian':'python-pyodbc',
                        'linux-ubuntu':'python-pyodbc',
                        'linux-fedora':'pyodbc'})
            elif driver == 'oracle':
                installed = install({
                        'pip': 'cx_Oracle'})
            else:
                raise ModuleError(self,
                                  "SQLAlchemy couldn't connect: %s" %
                                  debug.format_exception(e))
            if not installed:
                raise ModuleError(self,
                                  "Failed to install required driver")
            try:
                engine = create_engine(url)
            except Exception, e:
                raise ModuleError(self,
                                  "Couldn't connect to the database: %s" %
                                  debug.format_exception(e))
开发者ID:Nikea,项目名称:VisTrails,代码行数:56,代码来源:init.py


示例5: alchemyEngine

def alchemyEngine() -> Engine:
   

    if database_url().startswith('sqlite://'):
        engine = create_engine(database_url(), pool_recycle=alchemy_pool_recycle())
        @event.listens_for(engine, 'connect')
        def setSQLiteFKs(dbapi_con, con_record):
            dbapi_con.execute('PRAGMA foreign_keys=ON')
    else:  engine = create_engine(database_url(), pool_recycle=alchemy_pool_recycle(), pool_size=30, max_overflow=60)        

    return engine
开发者ID:biddyweb,项目名称:Ally-Py,代码行数:11,代码来源:database_config.py


示例6: database_exists

def database_exists(url):
    """Check if a database exists.

    :param url: A SQLAlchemy engine URL.

    Performs backend-specific testing to quickly determine if a database
    exists on the server. ::

        database_exists('postgresql://[email protected]/name')  #=> False
        create_database('postgresql://[email protected]/name')
        database_exists('postgresql://[email protected]/name')  #=> True

    Supports checking against a constructed URL as well. ::

        engine = create_engine('postgresql://[email protected]/name')
        database_exists(engine.url)  #=> False
        create_database(engine.url)
        database_exists(engine.url)  #=> True

    """

    url = copy(make_url(url))
    database = url.database
    if url.drivername.startswith('postgres'):
        url.database = 'postgres'
    elif not url.drivername.startswith('sqlite'):
        url.database = None

    engine = create_engine(url)

    if engine.dialect.name == 'postgresql':
        text = "SELECT 1 FROM pg_database WHERE datname='%s'" % database
        return bool(engine.execute(text).scalar())

    elif engine.dialect.name == 'mysql':
        text = ("SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA "
                "WHERE SCHEMA_NAME = '%s'" % database)
        return bool(engine.execute(text).scalar())

    elif engine.dialect.name == 'sqlite':
        return database == ':memory:' or os.path.exists(database)

    else:
        text = 'SELECT 1'
        try:
            url.database = database
            engine = create_engine(url)
            engine.execute(text)
            return True

        except (ProgrammingError, OperationalError):
            return False
开发者ID:plq,项目名称:spyne,代码行数:52,代码来源:util.py


示例7: __init__

    def __init__(self, path, delete_everything=False):
        if path.startswith('sqlite:'):
            self.engine = create_engine(path, poolclass=SingletonThreadPool)
            sqlalchemy.event.listen(
                self.engine, 'connect', self._apply_pragmas_callback)
        else:
            self.engine = create_engine(path)

        Session.configure(bind=self.engine)

        if delete_everything == 'yes-really!':
            self._delete_everything()

        Base.metadata.create_all(self.engine)
开发者ID:Deewiant,项目名称:terroroftinytown,代码行数:14,代码来源:database.py


示例8: create_sa_proxies

    def create_sa_proxies(self):

        # create the table and mapper
        metadata = schema.MetaData()
        user_table = schema.Table(
            'user',
            metadata,
            schema.Column('id', types.Integer, primary_key=True),
            schema.Column('first_name', types.Unicode(25)),
            schema.Column('last_name', types.Unicode(25))
        )

        class User(object):
            pass
        orm.mapper(User, user_table)

        # create the session
        engine = create_engine('sqlite:///:memory:')
        metadata.bind = engine
        metadata.create_all()
        session = orm.sessionmaker(bind=engine)()

        # add some dummy data
        user_table.insert().execute([
            {'first_name': 'Jonathan', 'last_name': 'LaCour'},
            {'first_name': 'Yoann', 'last_name': 'Roman'}
        ])

        # get the SA objects
        self.sa_object = session.query(User).first()
        select = user_table.select()
        self.result_proxy = select.execute()
        self.row_proxy = select.execute().fetchone()
开发者ID:ryanpetrello,项目名称:pecan,代码行数:33,代码来源:test_jsonify.py


示例9: tester

 def tester():
     class FooObj(DataObject):
         partition_attribute = 'id'
     
     @schema([IntegerColumn(name='id')])
     class Foo(FooObj):
         identity_key_ = (('id', 'id'),)
         sort_key_ = ('id',)
     
     # Create a test database and table
     engine = create_engine('sqlite://')
     metadata = MetaData(bind=engine)
     foos_table = Foo.to_sqa_table(metadata, 'foos')
     metadata.create_all()
     
     # Define the mapping between tables and objects for writing
     writer_config = {Foo: SqaWriterConfig(foos_table),
                     }
     
     # Define the mapping between tables and objects for reading
     reader_config = {Foo: SqaReaderConfig(foos_table, engine_wrapper(engine), **reader_args)}
     
     # Create some objects
     foos = [Foo(id=i) for i in range(1000)]
     
     # Write the objects to the database
     writer = Foo.writer(writer_config)
     for foo in foos:
         writer.write(foo)
     
     def do_the_reading():
         return [foo for foo in Foo.reader(reader_config)]
     
     assertion(do_the_reading)
开发者ID:jcrudy,项目名称:oreader,代码行数:34,代码来源:test_readers_and_writers.py


示例10: upgrade

def upgrade():
    """ this script is made to upgrade data from 9 -> 10 """
    from sqlalchemy.engine import create_engine
    engine = create_engine('sqlite:///OSMTM.db')
    connection = engine.connect()

    from OSMTM.models import TileHistory, Tile, Job
    from OSMTM.history_meta import VersionedListener
    from sqlalchemy import orm
    from sqlalchemy.sql.expression import and_

    sm = orm.sessionmaker(bind=engine, autoflush=True, autocommit=False,
                expire_on_commit=True,
                extension=[VersionedListener()])
    session = orm.scoped_session(sm)

    jobs = session.query(Job).all()

    for job in jobs:

        print "job: %s" % job.id

        tiles = session.query(Tile).filter(and_(Tile.change==True, Tile.job_id==job.id))
        for tile in tiles:
            tile.change = False
            tile.username = None
            tile.comment = None
            session.add(tile)

        session.commit()

    pass
开发者ID:FlavioFalcao,项目名称:osm-tasking-manager,代码行数:32,代码来源:2faee55a9ed8_upgrade_after_workfl.py


示例11: main

def main():
    parser = argparse.ArgumentParser(description='Generates SQLAlchemy model code from an existing database.')
    parser.add_argument('url', nargs='?', help='SQLAlchemy url to the database')
    parser.add_argument('--version', action='store_true', help="print the version number and exit")
    parser.add_argument('--schema', help='load tables from an alternate schema')
    parser.add_argument('--tables', help='tables to process (comma-separated, default: all)')
    parser.add_argument('--noviews', action='store_true', help="ignore views")
    parser.add_argument('--noindexes', action='store_true', help='ignore indexes')
    parser.add_argument('--noconstraints', action='store_true', help='ignore constraints')
    parser.add_argument('--nojoined', action='store_true', help="don't autodetect joined table inheritance")
    parser.add_argument('--noinflect', action='store_true', help="don't try to convert tables names to singular form")
    parser.add_argument('--noclasses', action='store_true', help="don't generate classes, only tables")
    parser.add_argument('--alwaysclasses', action='store_true', help="always generate classes")
    parser.add_argument('--nosequences', action='store_true', help="don't auto-generate postgresql sequences")
    parser.add_argument('--outfile', help='file to write output to (default: stdout)')
    args = parser.parse_args()

    if args.version:
        print(sqlacodegen.version)
        return
    if not args.url:
        print('You must supply a url\n', file=sys.stderr)
        parser.print_help()
        return

    engine = create_engine(args.url)
    metadata = MetaData(engine)
    tables = args.tables.split(',') if args.tables else None
    metadata.reflect(engine, args.schema, not args.noviews, tables)
    outfile = codecs.open(args.outfile, 'w', encoding='utf-8') if args.outfile else sys.stdout
    generator = CodeGenerator(metadata, args.noindexes, args.noconstraints, args.nojoined, args.noinflect,
                              args.noclasses, args.alwaysclasses, args.nosequences)
    generator.render(outfile)
开发者ID:rflynn,项目名称:sqlacodegen,代码行数:33,代码来源:main.py


示例12: __init__

    def __init__(self, db_connection, base_model, session_options=None, **kwargs):

        self.engine = engine.create_engine(db_connection, convert_unicode=True, **kwargs)

        self.Query = orm.Query
        self.session = self.create_scoped_session(session_options)
        self.Model = self.extend_base_model(base_model)
开发者ID:EclecticIQ,项目名称:OpenTAXII,代码行数:7,代码来源:sqldb_helper.py


示例13: setup_module

def setup_module():
    global connection, engine

    engine = create_engine("sqlite://")
    connection = engine.connect()
    Base.metadata.create_all(connection)
    session_factory.configure(bind=engine)
开发者ID:bencpeters,项目名称:home-controls,代码行数:7,代码来源:__init__.py


示例14: load_connection

def load_connection(connection_string, echo=False):
    """Create and return a connection to the database given in the
    connection string.

    Parameters
    ----------
    connection_string : str
        A string that points to the database conenction.  The
        connection string is in the following form:
        dialect+driver://username:[email protected]:port/database
    echo : bool
        Show all SQL produced.

    Returns
    -------
    session : sesson object
        Provides a holding zone for all objects loaded or associated
        with the database.
    engine : engine object
        Provides a source of database connectivity and behavior.
    """

    engine = create_engine(connection_string, echo=echo)
    Session = sessionmaker(bind=engine)

    return Session, engine
开发者ID:jotaylor,项目名称:cos_monitoring,代码行数:26,代码来源:db_tables.py


示例15: _init_engine

def _init_engine(database, backend, user=None, password=None, host=None, debug=False):
    if backend=='mysql':
        # Use oursql because other backends do not properly support MySQL cursors,
        # i.e. if you want to stream a large table through memory, other backends
        # will pull every row in the table into memory. The SQLAlchemy docs for yield_per
        # say that only psycopg2 is supported, but OurSQL also streams by default.
        if password:
            url = ('mysql+oursql://%s:%[email protected]%s/%s?charset=utf8&use_unicode=1' % 
                   (user, password, host, database))
        else:
            url = ('mysql+oursql://%[email protected]%s/%s?charset=utf8&use_unicode=1' % 
                   (user, host, database))
        
    elif backend=='sqlite_memory':
        url = 'sqlite:///:memory:'
    else:
        raise ValueError('Unknown database backend %s' % backend)

    engine = create_engine(url, echo=debug)
    
    # Enforce referential integrity on sqlite_memory for more rigorous tests.
    # http://stackoverflow.com/a/7831210/281469
    if backend=='sqlite_memory':
        event.listen(engine, 'connect', _fk_pragma_on_connect)
    return engine
开发者ID:insight-unlp,项目名称:domainmodeller,代码行数:25,代码来源:Storage.py


示例16: engine

def engine(request, sqlalchemy_connect_url, app_config):
    """Engine configuration.
    See http://docs.sqlalchemy.org/en/latest/core/engines.html
    for more details.

    :sqlalchemy_connect_url: Connection URL to the database. E.g
    postgresql://scott:[email protected]:5432/mydatabase 
    :app_config: Path to a ini config file containing the sqlalchemy.url
    config variable in the DEFAULT section.
    :returns: Engine instance

    """
    pass
    if app_config:
        from sqlalchemy import engine_from_config
        engine = engine_from_config(app_config)
    elif sqlalchemy_connect_url:
        from sqlalchemy.engine import create_engine
        engine = create_engine(sqlalchemy_connect_url)
    else:
        raise RuntimeError("Can not establish a connection to the database")

    def fin():
        print ("Disposing engine")
        engine.dispose()

    request.addfinalizer(fin)
    return engine
开发者ID:jayvdb,项目名称:pytest-sqlalchemy,代码行数:28,代码来源:pytest_sqlalchemy.py


示例17: test_url_default

 def test_url_default(self):
     engine = create_engine('presto://localhost:8080/hive')
     try:
         with contextlib.closing(engine.connect()) as connection:
             self.assertEqual(connection.execute('SELECT 1 AS foobar FROM one_row').scalar(), 1)
     finally:
         engine.dispose()
开发者ID:Uberi,项目名称:PyHive,代码行数:7,代码来源:test_sqlalchemy_presto.py


示例18: __init__

 def __init__(self, config):
     logger.info("Marcotti-MLS v{0}: Python {1} on {2}".format(__version__, sys.version, sys.platform))
     logger.info("Opened connection to {0}".format(self._public_db_uri(config.database_uri)))
     self.engine = create_engine(config.database_uri)
     self.connection = self.engine.connect()
     self.start_year = config.START_YEAR
     self.end_year = config.END_YEAR
开发者ID:soccermetrics,项目名称:marcotti-mls,代码行数:7,代码来源:base.py


示例19: engine

def engine(request):
    """Engine configuration."""
    url = request.config.getoption("--sqlalchemy-connect-url")
    from sqlalchemy.engine import create_engine
    engine = create_engine(url)
    yield engine
    engine.dispose()
开发者ID:elanmart,项目名称:sacred,代码行数:7,代码来源:test_sql_observer.py


示例20: run_with_taskmanager

def run_with_taskmanager(dsn=None):
    """Example for running PCSE/WOFOST with the task manager.

    Runs PyWOFOST for 6 crop types for one location in water-limited mode
    using an ensemble of 50 members. Each member is initialized with different
    values for the initial soil moisture and each member receives different
    values for rainfall as forcing variable. Executing PyWOFOST runs is done
    through the task manager. Output is writted to the database

    Parameters:
    dsn - SQLAlchemy data source name pointing to the database to be used.
    """

    # Open database connection and empty output table
    db_engine = sa_engine.create_engine(run_settings.connstr)

    # Initialise task manager
    taskmanager = TaskManager(db_engine, dbtype="MySQL")
    # Loop until no tasks are left
    task = taskmanager.get_task()
    while task is not None:
        try:
            task_id = task["task_id"]
            print "Running task: %i" % task_id
            task_runner(db_engine, task)

            # Set status of current task to 'Finished'
            taskmanager.set_task_finished(task)

        except SQLAlchemyError as inst:
            msg = "Database error on task_id %i." % task_id
            print msg
            logging.exception(msg)
            # Break because of error in the database connection
            break

        except PCSEError as inst:
            msg = "Error in PCSE on task_id %i." % task_id
            print msg
            logging.exception(msg)
            # Set status of current task to 'Error'
            taskmanager.set_task_error(task)

        except Exception as inst:
            msg = "General error on task_id %i" % task_id
            print msg
            logging.exception(msg)
            # Set status of current task to 'Error'
            taskmanager.set_task_error(task)

        except KeyboardInterrupt:
            msg = "Terminating on user request!"
            print msg
            logging.error(msg)
            taskmanager.set_task_error(task)
            sys.exit()

        finally:
            #Get new task
            task = taskmanager.get_task()
开发者ID:ajwdewit,项目名称:ggcmi,代码行数:60,代码来源:task_picker.py



注:本文中的sqlalchemy.engine.create_engine函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python engine.Connection类代码示例发布时间:2022-05-27
下一篇:
Python base.DATETIME类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap