• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python automap.automap_base函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.ext.automap.automap_base函数的典型用法代码示例。如果您正苦于以下问题:Python automap_base函数的具体用法?Python automap_base怎么用?Python automap_base使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了automap_base函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: get_data

def get_data():
    #Connection to the database
    config = ConfigParser.ConfigParser()
    config.read(os.path.join(os.path.dirname(__file__),'../vars/vars.cfg'))
    username = config.get('database','username')
    password = config.get('database','password')

    Base = automap_base()
    engine = create_engine("postgresql://{username}:{password}@localhost/PostGIS".\
                          format(username = username,password = password),
                           client_encoding = 'utf8', echo = False,)

    metadata = MetaData()
    metadata.reflect(engine)

    db = automap_base(metadata=metadata)
    db.prepare()

    session = Session(engine)

    query = session.query(db.classes.cv_entries.cv_id,\
                         db.classes.cv_entries.group,\
                         db.classes.cv_entries.start,\
                         db.classes.cv_entries.end,\
                         db.classes.cv_entries.heading,\
                         db.classes.cv_entries.markdown,\
                         db.classes.cv_entries.summary,\
       func.ST_AsGeoJSON(func.ST_Transform(db.classes.cv_entries.geom, 3857)))\
        .order_by(desc(db.classes.cv_entries.start))


    geojson_data = []
    for i in query:
        cv_id = i[0]
        group = i[1]
        summary = i[6]
        geom = geojson.loads(i[7])
        feat = geojson.Feature(geometry=geom, properties={'cv_id': cv_id,'group': group,'summary': summary})
        geojson_data.append(feat)

    crs = {
    "type": "name",
    "properties": {
        "name": "EPSG:3857"
    }
}
    cv_geojson = geojson.FeatureCollection(geojson_data,crs=crs) 

    attributes = [(i[0],i[1],i[2],i[3],i[4],i[5]) for i in query]
    return geojson.dumps(cv_geojson),attributes
开发者ID:Acurus,项目名称:Kyrdalen,代码行数:50,代码来源:cv.py


示例2: __init__

    def __init__(self, verbose=0, *args, **kwds):  # @UnusedVariable
        super(SbciFinanceDB, self).__init__(*args, **kwds)

        if not os.access(FINANCEDB_FILE, os.R_OK | os.W_OK):
            raise RuntimeError('cannot access Finance DB file ({}) for R/W!'
                               .format(FINANCEDB_FILE))

        self.Base = automap_base()

        self.engine = create_engine('sqlite:///' + FINANCEDB_FILE)

        self.Base.prepare(self.engine, reflect=True)

        self.Categories = self.Base.classes.categories
        self.Seasons = self.Base.classes.seasons
        self.Cheques = self.Base.classes.cheques
        self.Transactions = self.Base.classes.transactions
        self.Trybooking = self.Base.classes.trybooking

        self.dbsession = Session(self.engine)

        self.categories_query = self.dbsession.query(self.Categories)
        self.seasons_query = self.dbsession.query(self.Seasons)
        self.cheques_query = self.dbsession.query(self.Cheques)
        self.transactions_query = self.dbsession.query(self.Transactions)
        self.trybooking_query = self.dbsession.query(self.Trybooking)
开发者ID:mjjensen,项目名称:shootersbasketball,代码行数:26,代码来源:financedb.py


示例3: base_app

def base_app():
    global app
    app = Flask('backend')
    app.config.overrides = {}

    logging.info('Connecting to database...')
    engine = create_engine(backend.settings.db_connection)
    app.engine = engine
    base = automap_base()
    base.prepare(engine, reflect=True)

    session = Session(engine)
    app.base = base
    app.session = session
    app.Decl_Base = declarative_base()
    
    from backend.models import Members
    app.Decl_Base.metadata.create_all(app.engine)

    # Create the Flask-Restless API manager.
    app.api_manager = flask.ext.restless.APIManager(app, session=app.session)
    app.api_manager.create_api(Members, methods=['GET', 'POST', 'PATCH', 'DELETE'], collection_name='members')

    return app

    @app.route("/shutdown", methods=["POST"])   # pragma: no cover
    def shutdown():  # pragma: no cover
        logging.info('shutting down server')
        shutdown_server()
        return "server shutting down"
开发者ID:faulteh,项目名称:memberdb-ng,代码行数:30,代码来源:__init__.py


示例4: run

    def run(self):
        try:
            print("Please waiting... Analyzing source database... please wait...")
            ## AutoMap
            self.metadata.reflect(self.engine)  # get columns from existing table
            Base = automap_base(bind=self.engine, metadata=self.metadata)
            Base.prepare(self.engine, reflect=True)
            MetaTable = Base.metadata.tables

            thread_list = []
            ## From all tables found in database
            for table in MetaTable.keys():
                if '.' in table:
                    table = str(table).split('.')[1]

                if len(config.source['tables']['exclude_tables']) > 0:
                    if exactyMatchList(config.source['tables']['exclude_tables'], table):
                        continue

                ## From config.py custom_tables
                if len(config.source['tables']['custom_tables']) > 0:
                    if not exactyMatchList(config.source['tables']['custom_tables'], table):
                        continue

                print("Preparing thread to table %s " % table)
                t = threading.Thread(target=self.export2RedShift, name='thread-' + table, args=(table,))
                thread_list.append(t)

            thread_control(thread_list, self.cfg_thread_number)
            print("Finish...")

        except (SQLAlchemyError, Exception) as e:
            print("Error: %s" % e)
开发者ID:csmanioto,项目名称:power-leech,代码行数:33,代码来源:main.py


示例5: reflect_model

    def reflect_model(self, table_name, bind_key=None):
        """ 反向生成 ORM 的 Model
        :param table_name:
        :param bind_key:
        :return: ORMClass
        """
        with self._reflect_lock:
            if table_name in self._models:
                return self._models[table_name]

            engine = self.get_engine(bind_key)
            meta = MetaData(bind=engine)
            meta.reflect(only=[table_name])

            table = meta.tables[table_name]
            self._tables[table_name] = table

            Base = automap_base(metadata=meta)
            Base.prepare()

            model = getattr(Base.classes, table_name)
            model.__table__.metadata = None
            self._models[table_name] = model

            return model
开发者ID:qianka,项目名称:qianka-sqlalchemy,代码行数:25,代码来源:sqlalchemy.py


示例6: create_app

def create_app(database_uri):
    """
    Creates a new flask app that exposes the database
    provided as a ReSTful Application

    :param str|unicode|sqlalchemy.engine.url.URL database_uri: The database
        URI in a manner that SQLAlchemy can understand
    :return: A flask app that exposes a database as
        a ReSTful API that can be accessed using either
        the Hal or SIREN protocol
    :rtype: Flask
    """
    # Create the flask application
    app = Flask(__name__)

    # Setup SQLAlchemy to reflect the database
    engine = create_engine(database_uri)
    base = automap_base()
    base.prepare(engine, reflect=True)

    # Create the ripozo dispatcher and register the response formats
    dispatcher = FlaskDispatcher(app)
    dispatcher.register_adapters(adapters.HalAdapter, adapters.SirenAdapter)
    session_handler = ScopedSessionHandler(engine)

    # Create and register resources from the sqlalchemy models
    # We need to pass ``append_slash=True`` due to a quirk in how flask handles routing
    resources = [create_resource(model, session_handler, append_slash=True) for model in base.classes]
    dispatcher.register_resources(*resources)
    return app
开发者ID:rmoorman,项目名称:ripozo-oasis,代码行数:30,代码来源:api_builder.py


示例7: __prepare__

 def __prepare__(self):
     # create declarative base class
     self.base = automap_base()
     # create declarative classes from dbms
     self.base.prepare(self.engine, reflect=True)
     # create session for later use
     self.session = Session(self.engine)
开发者ID:sgeyer-tgm,项目名称:WahlAnalyse,代码行数:7,代码来源:database.py


示例8: __init__

    def __init__(self, engine):
        metadata = MetaData(engine)

        self._define_tables_without_primary_keys(metadata)

        self.base = automap_base(metadata=metadata)
        self.base.prepare(engine, reflect=True)
开发者ID:geometalab,项目名称:OSMNames,代码行数:7,代码来源:tables.py


示例9: test_relationship_pass_params

    def test_relationship_pass_params(self):
        Base = automap_base(metadata=self.metadata)

        mock = Mock()

        def _gen_relationship(
            base, direction, return_fn, attrname, local_cls, referred_cls, **kw
        ):
            mock(base, direction, attrname)
            return generate_relationship(
                base,
                direction,
                return_fn,
                attrname,
                local_cls,
                referred_cls,
                **kw
            )

        Base.prepare(generate_relationship=_gen_relationship)
        assert set(tuple(c[1]) for c in mock.mock_calls).issuperset(
            [
                (Base, interfaces.MANYTOONE, "nodes"),
                (Base, interfaces.MANYTOMANY, "keywords_collection"),
                (Base, interfaces.MANYTOMANY, "items_collection"),
                (Base, interfaces.MANYTOONE, "users"),
                (Base, interfaces.ONETOMANY, "addresses_collection"),
            ]
        )
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:29,代码来源:test_automap.py


示例10: init

def init():
    Base = automap_base()
    engine = create_engine(config.db_url)
    Base.prepare(engine, reflect=True)
    session = Session(engine)
    add_role(Base, session)
    add_user(Base, session)
开发者ID:gobabiertoAR,项目名称:dashboard-transito,代码行数:7,代码来源:008_tabla_users_roles.py


示例11: test_naming_schemes

    def test_naming_schemes(self):
        Base = automap_base(metadata=self.metadata)

        def classname_for_table(base, tablename, table):
            return str("cls_" + tablename)

        def name_for_scalar_relationship(
                base, local_cls, referred_cls, constraint):
            return "scalar_" + referred_cls.__name__

        def name_for_collection_relationship(
                base, local_cls, referred_cls, constraint):
            return "coll_" + referred_cls.__name__

        Base.prepare(
            classname_for_table=classname_for_table,
            name_for_scalar_relationship=name_for_scalar_relationship,
            name_for_collection_relationship=name_for_collection_relationship
        )

        User = Base.classes.cls_users
        Address = Base.classes.cls_addresses

        u1 = User()
        a1 = Address()
        u1.coll_cls_addresses.append(a1)
        assert a1.scalar_cls_users is u1
开发者ID:cpcloud,项目名称:sqlalchemy,代码行数:27,代码来源:test_automap.py


示例12: get_taxa_photo_count

def get_taxa_photo_count(session, metadata):
    """Return the photo count for each (genus, section, species) combination.

    Taxa are returned as 4-tuples ``(genus, section, species, photo_count)``.
    """
    Base = automap_base(metadata=metadata)
    Base.prepare()
    configure_mappers()

    Photo = Base.classes.photos
    Taxon = Base.classes.taxa
    Rank = Base.classes.ranks

    stmt_genus = session.query(Photo.id, Taxon.name.label('genus')).\
        join(Photo.taxa_collection, Taxon.ranks).\
        filter(Rank.name == 'genus').subquery()

    stmt_section = session.query(Photo.id, Taxon.name.label('section')).\
        join(Photo.taxa_collection, Taxon.ranks).\
        filter(Rank.name == 'section').subquery()

    stmt_species = session.query(Photo.id, Taxon.name.label('species')).\
        join(Photo.taxa_collection, Taxon.ranks).\
        filter(Rank.name == 'species').subquery()

    q = session.query('genus', 'section', 'species',
            functions.count(Photo.id).label('photos')).\
        select_from(Photo).\
        join(stmt_genus, stmt_genus.c.id == Photo.id).\
        outerjoin(stmt_section, stmt_section.c.id == Photo.id).\
        join(stmt_species, stmt_species.c.id == Photo.id).\
        group_by('genus', 'section', 'species')

    return q
开发者ID:xieyanfu,项目名称:nbclassify,代码行数:34,代码来源:db.py


示例13: setupConnection

 def setupConnection(self):
     LOG.info('setupConnection:'+self.sql_url)
     if self._scoped_session:
         self._scoped_session.remove()
     try:
         engine = create_engine(self.sql_url, encoding='utf8', echo=False, pool_recycle=1)
     except:
         exc_type, exc_value, exc_traceback = sys.exc_info()
         lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
         LOG.error(''.join('!! ' + line for line in lines))
         return
     self._insp = reflection.Inspector.from_engine(engine)
     self.d_base = declarative_base(bind=engine)
     a_base = automap_base(bind=engine)
     try:
         a_base.metadata.reflect(views=True)
         self.name = unicode(self.sql_url)
     except:
         LOG.info('Unable to reflect the whole DB!')
         exc_type, exc_value, exc_traceback = sys.exc_info()
         lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
         for line in lines:
             LOG.info(line)
         a_base.metadata.reflect(views=True, only=[self.sql_table])
         self.restricted = True
         self.name = unicode(self.sql_url+'+'+self.sql_table)
     a_base.prepare(a_base.metadata.bind, name_for_collection_relationship=unique_collection)
     self.a_base = a_base
     self._scoped_session = scoped_session(sessionmaker(bind=self.a_base.metadata.bind, extension=ZopeTransactionExtension(keep_session=True), autocommit=True))
开发者ID:Martronic-SA,项目名称:collective.behavior.sql,代码行数:29,代码来源:content.py


示例14: get_photos_with_taxa

def get_photos_with_taxa(session, metadata):
    """Return photos with genus, section, and species class.

    This generator returns 4-tuples ``(photo, genus, section, species)``.
    """
    Base = automap_base(metadata=metadata)
    Base.prepare()
    configure_mappers()

    Photo = Base.classes.photos
    Taxon = Base.classes.taxa
    Rank = Base.classes.ranks

    stmt_genus = session.query(Photo.id, Taxon.name.label('genus')).\
        join(Photo.taxa_collection, Taxon.ranks).\
        filter(Rank.name == 'genus').subquery()

    stmt_section = session.query(Photo.id, Taxon.name.label('section')).\
        join(Photo.taxa_collection, Taxon.ranks).\
        filter(Rank.name == 'section').subquery()

    stmt_species = session.query(Photo.id, Taxon.name.label('species')).\
        join(Photo.taxa_collection, Taxon.ranks).\
        filter(Rank.name == 'species').subquery()

    q = session.query(Photo, 'genus', 'section', 'species').\
        join(stmt_genus, stmt_genus.c.id == Photo.id).\
        outerjoin(stmt_section, stmt_section.c.id == Photo.id).\
        join(stmt_species, stmt_species.c.id == Photo.id)

    return q
开发者ID:xieyanfu,项目名称:nbclassify,代码行数:31,代码来源:db.py


示例15: _automap

    def _automap(self, e):
        Base = automap_base()

        Base.prepare(e, reflect=True)

        time.sleep(.01)
        configure_mappers()
开发者ID:cpcloud,项目名称:sqlalchemy,代码行数:7,代码来源:test_automap.py


示例16: collect_bnetza_data

def collect_bnetza_data():
    """
    Collects data of wind turbines available in the BNetzA Anlagenstammdaten.

    Parameters
    ----------

    Returns
    -------
    plants : DataFrame of all wind generators in BNetzA
    """
    meta.reflect(bind=conn, schema='model_draft',
                 only=['bnetza_eeg_anlagenstammdaten_wind_classification'])
    Base = automap_base(metadata=meta)
    Base.prepare()

    Bnetza = Base.classes.bnetza_eeg_anlagenstammdaten_wind_classification

    query = session.query(Bnetza.installierte_leistung,
                          Bnetza.wea_manufacturer,
                          Bnetza.wea_type,
                          Bnetza.nabenhöhe,
                          Bnetza.rotordurchmesser).filter(Bnetza.seelage == None)
    plants = [(installierte_leistung,
               wea_manufacturer,
               wea_type,
               nabenhöhe,
               rotordurchmesser)
    for installierte_leistung, wea_manufacturer, wea_type, nabenhöhe, rotordurchmesser
    in query.all()]
    plants.sort(key=itemgetter(0))
    columns = ['capacity', 'manufacturer', 'type', 'hub', 'rotor']
    plants = pd.DataFrame(plants, columns=columns)

    return plants
开发者ID:openego,项目名称:data_processing,代码行数:35,代码来源:simple_feedin.py


示例17: collect_ego_turbines

def collect_ego_turbines():
    """
    Collects data of wind turbines used in the eGo database.

    Parameters
    ----------

    Returns
    -------
    generators : capacity of turbines used in the eGo database
    """
    meta.reflect(bind=conn, schema='model_draft',
                 only=['ego_dp_supply_res_powerplant'])
    Base = automap_base(metadata=meta)
    Base.prepare()

    Dp = Base.classes.ego_dp_supply_res_powerplant

    query = session.query(Dp.electrical_capacity).\
                        filter(and_(Dp.generation_subtype == 'wind_onshore',\
                        Dp.electrical_capacity < 7600 ,\
                        Dp.start_up_date > '1998-01-01 00:00:00',\
                        Dp.start_up_date < '2018-01-01 00:00:00'))

    Gens = [(electrical_capacity) for electrical_capacity in query.all()]
    generators = []
    for i in range(0, len(Gens)):
        generators.append(float(Gens[i][0]))
    generators.sort()

    return generators
开发者ID:openego,项目名称:data_processing,代码行数:31,代码来源:simple_feedin.py


示例18: main

def main(src):
    Base = automap_base()
    engine = create_engine(src)
    Base.prepare(engine, reflect=True)
    collector = Collector(Resolver())
    d = collector.collect(Base.classes)
    loading.dumpfile(d, format="json")
开发者ID:podhmo,项目名称:individual-sandbox,代码行数:7,代码来源:01gen.py


示例19: handle

    def handle(self, *args, **options):

        engine = create_engine(get_default_db_string(), convert_unicode=True)

        metadata = MetaData()

        app_config = apps.get_app_config('content')
        # Exclude channelmetadatacache in case we are reflecting an older version of Kolibri
        table_names = [model._meta.db_table for name, model in app_config.models.items() if name != 'channelmetadatacache']
        metadata.reflect(bind=engine, only=table_names)
        Base = automap_base(metadata=metadata)
        # TODO map relationship backreferences using the django names
        Base.prepare()
        session = sessionmaker(bind=engine, autoflush=False)()

        # Load fixture data into the test database with Django
        call_command('loaddata', 'content_import_test.json', interactive=False)

        def get_dict(item):
            value = {key: value for key, value in item.__dict__.items() if key != '_sa_instance_state'}
            return value

        data = {}

        for table_name, record in Base.classes.items():
            data[table_name] = [get_dict(r) for r in session.query(record).all()]

        with open(SCHEMA_PATH_TEMPLATE.format(name=options['version']), 'wb') as f:
            pickle.dump(metadata, f, protocol=2)

        with open(DATA_PATH_TEMPLATE.format(name=options['version']), 'w') as f:
            json.dump(data, f)
开发者ID:indirectlylit,项目名称:kolibri,代码行数:32,代码来源:generate_schema.py


示例20: init_conn

def init_conn():
  connection_string = 'postgres://postgres:[email protected]:5432/adna'

  from sqlalchemy.ext.automap import automap_base
  from sqlalchemy.orm import Session
  from sqlalchemy.event import listens_for
  from sqlalchemy.schema import Table
  from sqlalchemy import create_engine, Column, DateTime, MetaData, Table
  from datetime import datetime

  engine = create_engine(connection_string)

  metadata = MetaData()
  metadata.reflect(engine, only=['results', 'job'])

  Table('results', metadata,
    Column('createdAt', DateTime, default=datetime.now),
    Column('updatedAt', DateTime, default=datetime.now, 
      onupdate=datetime.now),
       extend_existing=True)

  Table('job', metadata,
    Column('createdAt', DateTime, default=datetime.now),
    Column('updatedAt', DateTime, default=datetime.now,
      onupdate=datetime.now),
       extend_existing=True)

  Base = automap_base(metadata=metadata)

  Base.prepare()

  global Results, Job, session
  Results, Job = Base.classes.results, Base.classes.job

  session = Session(engine)
开发者ID:theboocock,项目名称:ancient-dna,代码行数:35,代码来源:interface.py



注:本文中的sqlalchemy.ext.automap.automap_base函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python baked.bakery函数代码示例发布时间:2022-05-27
下一篇:
Python associationproxy.association_proxy函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap