• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python orm.sessionmaker函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.orm.sessionmaker函数的典型用法代码示例。如果您正苦于以下问题:Python sessionmaker函数的具体用法?Python sessionmaker怎么用?Python sessionmaker使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了sessionmaker函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: convert_all

def convert_all():
    """
    Convert all images from SQLite to MySQL
    :return:
    """

    # setup engines to each database
    mysql_engine = create_engine('mysql+pymysql://root:[email protected]:3306/plane_viewer')
    sqlite_engine = create_engine('sqlite:///images.sqlite3')

    # setup session for each database
    mysqlsessionamaker = sessionmaker(bind=mysql_engine)
    sqlitesessionmaker = sessionmaker(bind=sqlite_engine)

    mysql_session = mysqlsessionamaker()
    sqlite_session = sqlitesessionmaker()

    # retrieve all images from SQLite
    sqlite_images = sqlite_session.query(SqlitePlaneImage).all()

    # convert all images from SQLite to MySQL and attempt to load them, ignoring duiplicates
    for image in sqlite_images:
        mysql_image = convert_image_object(image)

        mysql_session.add(mysql_image)
        try:
            mysql_session.commit()
            print('Loaded', image)
        except exc.IntegrityError:
            mysql_session.rollback()
            print("Image {} already loaded".format(image))

    mysql_images = mysql_session.query(MySqlPlaneImage).all()
    print(len(mysql_images))
开发者ID:willwagner602,项目名称:PlaneViewer,代码行数:34,代码来源:DatabaseMigration.py


示例2: edit_category

def edit_category(category_id):
    if request.method == "GET":
        DBSession = sessionmaker(bind=engine)
        session = DBSession()
        try:
            result = session.query(Category).filter_by(id=category_id).one()
            return render_template("edit_category.html", category=result)
        except NoResultFound:
            abort(404)
        finally:
            session.close()
    elif request.method == "POST":
        DBSession = sessionmaker(bind=engine)
        session = DBSession()
        try:
            result = session.query(Category).filter_by(id=category_id).one()
            result.name = request.form["name"]
            result.description = request.form["description"]
            session.add(result)
            session.commit()
            return redirect(url_for("get_category", category_id=category_id), code=302)
        except NoResultFound:
            abort(404)
        finally:
            session.close()

        return "test"
    else:
        abort(400)
开发者ID:chenlu2015,项目名称:fullstack-nanodegree-vm,代码行数:29,代码来源:application.py


示例3: init_sqlalchemy

 def init_sqlalchemy(self, scheme, connection):
     try:
         import sqlalchemy
         from sqlalchemy import create_engine, MetaData
         from sqlalchemy.orm import scoped_session, sessionmaker
         from torweb.db import CacheQuery
         import _mysql_exceptions
         from sqlalchemy import event
         from sqlalchemy.exc import DisconnectionError
         def my_on_checkout(dbapi_conn, connection_rec, connection_proxy):
             try:
                 dbapi_conn.cursor().execute('select now()')
             except _mysql_exceptions.OperationalError:
                 raise DisconnectionError
         
         engine = create_engine(
             connection,
             convert_unicode=True,
             encoding="utf-8",
             pool_recycle=3600*7,
             #echo_pool=True,
             echo=False,
         )
         event.listen(engine, 'checkout', my_on_checkout)
         metadata = MetaData(bind=engine)
         session = scoped_session(sessionmaker(bind=engine, query_cls=CacheQuery))
         sqlalchemy_sessions = [session]
         DB_Session = sessionmaker(bind=engine)
         return {"metadata":metadata, "session":session, "sqlalchemy_sessions":sqlalchemy_sessions}
         #setattr(self.app, 'metadata', metadata)
         #setattr(self.app, scheme.get('sqlalchemy', 'session'), session)
         #setattr(self.app, 'sqlalchemy_sessions', sqlalchemy_sessions)
     except Exception as e:
         print e
开发者ID:finalbattle,项目名称:torweb,代码行数:34,代码来源:config.py


示例4: test_all_htmlpopups

 def test_all_htmlpopups(self):
     from chsdi.models import models_from_name
     from chsdi.models.bod import LayersConfig
     from sqlalchemy import distinct
     from sqlalchemy.orm import scoped_session, sessionmaker
     val = True
     DBSession = scoped_session(sessionmaker())
     query = DBSession.query(distinct(LayersConfig.idBod)).filter(LayersConfig.queryable == val).filter(LayersConfig.staging == 'prod')
     # Get a list of all the queryable layers
     layers = [q[0] for q in query]
     DBSession.close()
     # Get a list of feature ids
     features = []
     for layer in layers:
         try:
             model = models_from_name(layer)[0]
             DBSession = scoped_session(sessionmaker())
             query = DBSession.query(model.primary_key_column()).limit(1)
             ID = [q[0] for q in query]
             # If tables are empty ID is an empty list
             if ID:
                 features.append((layer, str(ID[0])))
             DBSession.close()
         except Exception as e:
             print e
         finally:
             DBSession.close()
     for f in features:
         for lang in ('de', 'fr', 'it', 'rm', 'en'):
             self.testapp.get('/rest/services/all/MapServer/%s/%s/htmlPopup' % (f[0], f[1]), params={'callback': 'cb', 'lang': '%s' % lang}, status=200)
开发者ID:jesseeichar,项目名称:mf-chsdi3,代码行数:30,代码来源:test_mapservice.py


示例5: test_all_htmlpopups

    def test_all_htmlpopups(self):
        from chsdi.models import models_from_name
        from chsdi.models.bod import LayersConfig
        from sqlalchemy import distinct
        from sqlalchemy.orm import scoped_session, sessionmaker
        val = True
        DBSession = scoped_session(sessionmaker())
        valnone = None
        query = DBSession.query(distinct(LayersConfig.layerBodId)).filter(LayersConfig.staging == 'prod').filter(LayersConfig.queryable == val).filter(LayersConfig.parentLayerId == valnone)
        features = []
        try:
            for layer in getLayers(query):
                try:
                    FeatDBSession = scoped_session(sessionmaker())
                    models = models_from_name(layer)
                    self.failUnless(models is not None and len(models) > 0, layer)
                    model = models[0]
                    query = FeatDBSession.query(model.primary_key_column()).limit(1)
                    ID = [q[0] for q in query]
                    if ID:
                        features.append((layer, str(ID[0])))
                finally:
                    FeatDBSession.close()
        finally:
            DBSession.close()

        for f in features:
            for lang in ('de', 'fr', 'it', 'rm', 'en'):
                link = '/rest/services/all/MapServer/' + f[0] + '/' + f[1] + '/htmlPopup?callback=cb&lang=' + lang
                resp = self.testapp.get(link)
                self.failUnless(resp.status_int == 200, link)
开发者ID:qbns,项目名称:mf-chsdi3,代码行数:31,代码来源:test_mapservice.py


示例6: setup_database

  def setup_database(self):
    # DB library imports
    from sqlalchemy import (create_engine, Table, MetaData, Column, Integer, 
                            String, Unicode, Text, UnicodeText, Date, Numeric, 
                            Time, Float, DateTime, Interval, Binary, Boolean, 
                            PickleType)
    from sqlalchemy.orm import sessionmaker, mapper
    # Create global name mappings for model()
    global column_mapping
    column_mapping = {
              'string': String,       'str': String,
             'integer': Integer,      'int': Integer, 
             'unicode': Unicode,     'text': Text,
         'unicodetext': UnicodeText, 'date': Date,
             'numeric': Numeric,     'time': Time,
               'float': Float,   'datetime': DateTime,
            'interval': Interval,  'binary': Binary,
             'boolean': Boolean,     'bool': Boolean,
          'pickletype': PickleType,
    }
 
    # Add a few SQLAlchemy types to globals() so we can use them in models
    globals().update({'Column': Column, 'Table': Table, 'Integer': Integer,
                      'MetaData': MetaData, 'mapper': mapper})
    # Ensures correct slash number for sqlite
    if self.config['db_type'] == 'sqlite':
      self.config['db_location'] = '/' + self.config['db_location']
      eng_name = self.config['db_type'] + '://' + self.config['db_location']
      self.config['db_engine'] = create_engine(eng_name)
      self.config['db_session'] = sessionmaker(bind=self.config['db_engine'])()
    elif self.config['db_type'] == 'postgres':
      eng_name = self.config['db_type'] + '://' + self.config['db_location']
      self.config['db_engine'] = create_engine(eng_name, encoding="utf8", convert_unicode=True)
      self.config['db_session'] = sessionmaker(bind=self.config['db_engine'])()
开发者ID:malaniz,项目名称:webyarara,代码行数:34,代码来源:yarara.py


示例7: __init__

    def __init__(self, dbname='xraydata.db', read_only=True):
        "connect to an existing database"
        if not os.path.exists(dbname):
            parent, _ = os.path.split(__file__)
            dbname = os.path.join(parent, dbname)
            if not os.path.exists(dbname):
                raise IOError("Database '%s' not found!" % dbname)

        if not isxrayDB(dbname):
            raise ValueError("'%s' is not a valid X-ray Database file!" % dbname)

        self.dbname = dbname
        self.engine = make_engine(dbname)
        self.conn = self.engine.connect()
        kwargs = {}
        if read_only:
            kwargs = {'autoflush': True, 'autocommit':False}
            def readonly_flush(*args, **kwargs):
                return
            self.session = sessionmaker(bind=self.engine, **kwargs)()
            self.session.flush = readonly_flush
        else:
            self.session = sessionmaker(bind=self.engine, **kwargs)()

        self.metadata = MetaData(self.engine)
        self.metadata.reflect()
        self.tables = self.metadata.tables

        self.atomic_symbols = [e.element for e in self.tables['elements'].select(
            ).execute().fetchall()]
开发者ID:xraypy,项目名称:xraylarch,代码行数:30,代码来源:xraydb.py


示例8: __init__

    def __init__(self, name=None):
        if not name:
            name = 'default'
        data = {}
        if hasattr(settings, 'DATABASES') and \
           name in settings.DATABASES:
            data = settings.DATABASES[name]
        if data.get('ENGINE', '') == '':
            raise ImproperlyConfigured('''\
The database ORM connection requires, at minimum, an engine type.''')
        if '.' in data['ENGINE']:
            data['ENGINE'] = data['ENGINE'].rsplit('.', 1)[-1]

        # django needs sqlite3 but sqlalchemy references sqlite
        if data['ENGINE'] == 'sqlite3':
            data['ENGINE'] = 'sqlite'

        self.engine = self._create_engine(data)
        ro_values = dict([(k[9:], v) for k, v in data.iteritems()
                             if k.startswith('READONLY_')])
        if len(ro_values):
            ro_data = dict(data)
            ro_data.update(ro_values)
            self.readonly_engine = self._create_engine(ro_data)
        self.metadata = MetaData(self.engine)
        self.Base = declarative_base(metadata=self.metadata)
        if hasattr(self, 'readonly_engine'):
            self._readonly_sessionmaker = \
                scoped_session(sessionmaker(bind=self.readonly_engine))
        self._sessionmaker = scoped_session(sessionmaker(bind=self.engine))
开发者ID:saebyn,项目名称:baph,代码行数:30,代码来源:orm.py


示例9: __init__

 def __init__(self, dburi):
     self.engine = create_engine(dburi, echo=False, pool_recycle=3600, echo_pool=True,
                                 connect_args={'check_same_thread':False})
     Session = sessionmaker(bind=self.engine)
     self.session = Session()
     Base.metadata.create_all(self.engine, checkfirst=True)
     self.session_factory = sessionmaker(bind=self.engine)
开发者ID:movb,项目名称:isrecorder,代码行数:7,代码来源:db.py


示例10: create_session

def create_session():
    # cloud_usage
    engine_csu = create_engine(config.get('main', 'cloud_usage_uri'))
    metadata_csu = MetaData(engine_csu)
    account_csu = Table('account', metadata_csu, autoload=True)
    mapper(AccountCsU, account_csu)
    usage_event_csu = Table('usage_event', metadata_csu, autoload=True)
    mapper(UsageEventCsU, usage_event_csu)
    cloud_usage_csu = Table('cloud_usage', metadata_csu, autoload=True)
    mapper(CloudUsageCsU, cloud_usage_csu)
    # cloud database
    engine_cs = create_engine(config.get('main', 'cloud_uri'))
    metadata_cs = MetaData(engine_cs)
    account_cs = Table('account', metadata_cs, autoload=True)
    mapper(AccountCs, account_cs)
    disk_offering_cs = Table('disk_offering', metadata_cs, autoload=True)
    mapper(DiskOfferingCs, disk_offering_cs)
    template_cs = Table('vm_template', metadata_cs, autoload=True)
    mapper(TemplateCs, template_cs)
    guest_os_cs = Table('guest_os', metadata_cs, autoload=True)
    mapper(GuestOSCs, guest_os_cs)

    SessionCs = sessionmaker(bind=engine_cs)
    session_cs = SessionCs()
    SessionCsU = sessionmaker(bind=engine_csu)
    session_csu = SessionCsU()

    # setup our dedicated model
    engine = create_engine(config.get('main', 'rbc_usage_uri'))
    Session = sessionmaker(bind=engine)
    session = Session()
    return (session_csu, session_cs, session)
开发者ID:redbridge,项目名称:rbc-usage,代码行数:32,代码来源:cloudusage.py


示例11: copy_stations_to_sqlite

def copy_stations_to_sqlite(src_dsn, dest_dsn):
    src_sesh = sessionmaker(bind=create_engine(src_dsn))()
    dest_sesh = sessionmaker(bind=create_engine(dest_dsn))()

    net = Network(name='MoTIe')
    dest_sesh.add(net)
    dest_sesh.flush()

    q = src_sesh.query(Station).join(History).join(
        Network).filter(Network.name == 'MoTIe')
    for stn in q.all():
        histories = [History(station_name=hist.station_name)
                     for hist in stn.histories]
        new_obj = Station(native_id=stn.native_id,
                          network_id=net.id, histories=histories)
        dest_sesh.add(new_obj)
    dest_sesh.commit()

    q = src_sesh.query(Variable).join(Network).filter(Network.name == 'MoTIe')
    for var in q.all():
        v = Variable(name=var.name, standard_name=var.standard_name,
                     cell_method=var.cell_method, network_id=net.id,
                     unit=var.unit)
        dest_sesh.add(v)
    dest_sesh.commit()
开发者ID:pacificclimate,项目名称:crmprtd,代码行数:25,代码来源:moti_infill_insert.py


示例12: init_sqlalchemy

def init_sqlalchemy(settings):
    # master
    master_url = settings['sqlalchemy.url']
    connect_kwargs = settings.get('sqlalchemy.connect_kwargs')
    kwargs = {}
    if connect_kwargs is not None:
        if isinstance(connect_kwargs, str):
            connect_kwargs = json.loads(connect_kwargs)
        for k, v in connect_kwargs.items():
            kwargs[k] = v
    engine = create_engine(master_url, **kwargs)

    sm = orm.sessionmaker(bind=engine, extension=ZopeTransactionExtension())
    meta.Session = orm.scoped_session(sm)
    meta.metadata.bind = engine

    # slaves
    slaves_url = settings.get('sqlalchemy.slaves', [])
    slaves = []
    for url in slaves_url:
        slave = create_engine(url, **kwargs)
        sm = orm.sessionmaker(bind=slave, extension=ZopeTransactionExtension())
        slaves.append(orm.scoped_session(sm))

    if slaves:
        slave = random.choice(slaves)
        meta.BaseObject.query = slave.query_property(orm.Query)
    else:
        meta.BaseObject.query = meta.Session.query_property(orm.Query)
开发者ID:lxneng,项目名称:easy_sqlalchemy,代码行数:29,代码来源:__init__.py


示例13: initialize_sql

def initialize_sql(settings):

    global DBSession
    global sqlalchemy_url
    global DBSession
    global Base
    global engine
    global Session
    global session
    global tmp_dir
    global extended

    sqlalchemy_url = settings['sqlalchemy.url']
    DBSession = scoped_session(sessionmaker())
    Base = declarative_base()
    engine = create_engine(sqlalchemy_url)
    Session = sessionmaker(bind=engine)
    session = Session()
    tmp_dir = settings['tmp_dir']

    # Extended attributes
    if settings['extended'] == 'true':
        extended = True
    else:
        extended = False

    DBSession.configure(bind=engine)
    Base.metadata.bind = engine
开发者ID:lightbase,项目名称:WSCServer,代码行数:28,代码来源:__init__.py


示例14: test_merge_load

    def test_merge_load(self):
        Parent = self.classes.Parent

        sess = sessionmaker()()
        sess2 = sessionmaker()()
        p1 = sess.query(Parent).get(1)
        p1.children

        # preloading of collection took this down from 1728 to 1192
        # using sqlite3 the C extension took it back up to approx. 1257
        # (py2.6)

        @profiling.function_call_count(variance=0.10,
                                versions={'2.5':1050, '2.6':1050,
                                        '2.6+cextension':1005, 
                                        '2.7':1005,
                                        '3':1050}
                            )
        def go():
            p2 = sess2.merge(p1)
        go()

        # one more time, count the SQL

        sess2 = sessionmaker(testing.db)()
        self.assert_sql_count(testing.db, go, 2)
开发者ID:sleepsonthefloor,项目名称:sqlalchemy,代码行数:26,代码来源:test_orm.py


示例15: get_data

def get_data():
    engine = db_connect()
    create_deals_table(engine)
    Session = sessionmaker(bind=engine)
    sess = Session()
    if request.method == 'POST':
        search_code = request.form['naics']
        if not search_code:
            rfps_nasa = sess.query(RFP).all()
        else:
            rfps_nasa = sess.query(RFP).filter(RFP.NAICS_code == search_code).all()
    else:
        rfps_nasa = sess.query(RFP).all()

    engine = db_connect()
    create_deals_table(engine)
    Session = sessionmaker(bind=engine)
    sess = Session()
    if request.method == 'POST':
        search_code = request.form['naics']
        if not search_code:
            rfps_fbo = OpportunityDetail.query.all()
        else:
            code = NAICSChild.query.filter_by(code=search_code).first()
            rfps_fbo = OpportunityDetail.query.filter_by(naics_code=code)\
                .all()
    else:
        rfps_fbo = OpportunityDetail.query.all()

    res = []
    for r in rfps_nasa:
        res.append(r.to_json())
    for r in rfps_fbo:
        res.append(r.to_json())
    return str(res)
开发者ID:Ksynko,项目名称:RFP_spiders,代码行数:35,代码来源:main.py


示例16: test_taxa_to_count_and_push

def test_taxa_to_count_and_push(MergedataToUpload, engine, tbl_taxa, count, tbl_count, count_table):
    count.fillna("NA", inplace=True)

    uploading = MergedataToUpload(sessionmaker(bind=engine, autoflush=False))
    uploading.merge_for_datatype_table_upload(
        raw_dataframe=count,
        formated_dataframe=tbl_count,
        formated_dataframe_name="count",
        raw_data_taxa_columns=["site", "genus", "species"],
        uploaded_taxa_columns=["study_site_table_fkey", "genus", "species"],
    )

    Session = sessionmaker(bind=engine, autoflush=False)
    session = Session()
    tbl_count_query = select([count_table])
    count_query_stm = session.execute(tbl_count_query)
    count_query_df = pd.DataFrame(count_query_stm.fetchall())
    count_query_df.columns = count_query_stm.keys()
    session.close()
    engine.dispose()

    print(count_query_df)
    assert (
        tbl_count["count_observation"].values.tolist() == count_query_df["count_observation"].values.tolist()
    ) is True
    assert (
        [str(x) for x in tbl_count["spatial_replication_level_2"].values.tolist()]
        == [str(x) for x in count_query_df["spatial_replication_level_2"].values.tolist()]
    ) is True
开发者ID:bibsian,项目名称:database-development,代码行数:29,代码来源:test_upload_popler3.py


示例17: __init__

	def __init__( self, filename ):
		echo = False
		if self.DebugLevel > 2:
			echo = True

		engine = create_engine( 'sqlite:///' + filename, echo = echo )
		metadata=Base.metadata
		metadata.create_all(engine)

		self.Session=sessionmaker()
		self.Session.configure(bind=engine)	
		self.SessionInstance = self.Session()

		dirname=os.path.dirname(filename)
		self.SessionsInstances={}
		for file_list in self.SessionInstance.query( FileList ).all():
			filename=file_list.filename
			if not os.path.isfile(filename):
				filename=os.path.join(dirname,os.path.basename(file_list.filename))

			engine = create_engine( 'sqlite:///' + filename, echo = echo )
			metadata=Base.metadata
			metadata.create_all(engine)

			self.Session=sessionmaker()
			self.Session.configure(bind=engine)	
			self.SessionsInstances[file_list.type] = self.Session()

		if not self.SessionsInstances.has_key('Source'):
			self.SessionsInstances['Source']=self.SessionInstance

		if not self.SessionsInstances.has_key('Target'):
			self.SessionsInstances['Target']=self.SessionInstance
开发者ID:LucaBongiorni,项目名称:DarunGrim,代码行数:33,代码来源:DarunGrimDatabase.py


示例18: test_taxa_to_biomass_and_push

def test_taxa_to_biomass_and_push(MergedataToUpload, engine, tbl_taxa, biomass, tbl_biomass, biomass_table):
    biomass.fillna("NA", inplace=True)
    uploading = MergedataToUpload(sessionmaker(bind=engine, autoflush=False))
    uploading.merge_for_datatype_table_upload(
        raw_dataframe=biomass,
        formated_dataframe=tbl_biomass,
        formated_dataframe_name="biomass",
        raw_data_taxa_columns=["site", "phylum", "clss", "ordr", "family", "genus", "species"],
        uploaded_taxa_columns=["study_site_table_fkey", "phylum", "clss", "ordr", "family", "genus", "species"],
    )
    Session = sessionmaker(bind=engine, autoflush=False)
    session = Session()
    tbl_biomass_query = select([biomass_table])
    biomass_query_stm = session.execute(tbl_biomass_query)
    biomass_query_df = pd.DataFrame(biomass_query_stm.fetchall())
    biomass_query_df.columns = biomass_query_stm.keys()
    session.close()
    engine.dispose()

    biomass_true_list = tbl_biomass["biomass_observation"].values.tolist()
    biomass_test_list = biomass_query_df["biomass_observation"].values.tolist()

    biomass_true_list = [float(x) for x in biomass_true_list]
    biomass_test_list = [float(x) for x in biomass_test_list]

    assert (biomass_true_list == biomass_test_list) is True

    assert (
        [str(x) for x in tbl_biomass["spatial_replication_level_2"].values.tolist()]
        == [str(x) for x in biomass_query_df["spatial_replication_level_2"].values.tolist()]
    ) is True
开发者ID:bibsian,项目名称:database-development,代码行数:31,代码来源:test_upload_popler3.py


示例19: construct_maker

def construct_maker(databases, models=None, query_cls=Query, engine_params=None,
                    session_class=DBSession):
    '''
    databases - str with db uri or dict.
    models - str name of models package (module), default is 'module'
    query_cls - additional query class
    engine_params - additional engine params
    '''
    models = models or 'models'
    engine_params = engine_params or {}
    db_dict = {}
    if isinstance(databases, basestring):
        engine = create_engine(databases, **engine_params)
        return orm.sessionmaker(class_=session_class, query_cls=query_cls,
                                bind=engine, autoflush=False)
    for ref, uri in databases.items():
        md_ref = '.'.join(filter(None, [models, ref]))
        metadata = import_string(md_ref, 'metadata')
        engine = create_engine(uri, **engine_params)
        #TODO: find out why this is broken since sqlalchemy 0.7
        #engine.logger.logger.name += '(%s)' % ref
        for table in metadata.sorted_tables:
            db_dict[table] = engine
    return orm.sessionmaker(class_=session_class, query_cls=query_cls, binds=db_dict,
                            autoflush=False)
开发者ID:riffm,项目名称:mage,代码行数:25,代码来源:sqla.py


示例20: test_taxa_to_percent_and_push

def test_taxa_to_percent_and_push(
    MergedataToUpload, engine, tbl_taxa, percent, tbl_percent, percent_cover_table, convert_types, find_types
):
    percent.fillna("NA", inplace=True)

    uploading = MergedataToUpload(sessionmaker(bind=engine, autoflush=False))
    uploading.merge_for_datatype_table_upload(
        raw_dataframe=percent,
        formated_dataframe=tbl_percent,
        formated_dataframe_name="percent_cover",
        raw_data_taxa_columns=["site", "code"],
        uploaded_taxa_columns=["study_site_table_fkey", "sppcode"],
    )

    Session = sessionmaker(bind=engine, autoflush=False)
    session = Session()
    tbl_percent_cover_query = select([percent_cover_table])
    percent_cover_query_stm = session.execute(tbl_percent_cover_query)
    percent_cover_query_df = pd.DataFrame(percent_cover_query_stm.fetchall())
    percent_cover_query_df.columns = percent_cover_query_stm.keys()
    session.close()
    engine.dispose()

    percent_cover_true_list = tbl_percent["percent_cover_observation"].values.tolist()
    percent_cover_test_list = percent_cover_query_df["percent_cover_observation"].values.tolist()

    percent_cover_true_list = [float(x) for x in percent_cover_true_list]
    percent_cover_test_list = [float(x) for x in percent_cover_test_list]

    assert (percent_cover_true_list == percent_cover_test_list) is True

    assert (
        [str(x) for x in tbl_percent["spatial_replication_level_2"].values.tolist()]
        == [str(x) for x in percent_cover_query_df["spatial_replication_level_2"].values.tolist()]
    ) is True
开发者ID:bibsian,项目名称:database-development,代码行数:35,代码来源:test_upload_popler3.py



注:本文中的sqlalchemy.orm.sessionmaker函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python orm.subqueryload函数代码示例发布时间:2022-05-27
下一篇:
Python orm.scoped_session函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap