• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python orm.clear_mappers函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.orm.clear_mappers函数的典型用法代码示例。如果您正苦于以下问题:Python clear_mappers函数的具体用法?Python clear_mappers怎么用?Python clear_mappers使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了clear_mappers函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: delData

def delData(doc):
    #getting database configuration
    param_name = 'db_%s' %doc.getParentDatabase().id
    conf = doc.get_properties(params=(param_name, )).values()[0]
    if not 'value' in conf.keys():
        api.portal.show_message(message='Replication not configured')
        return -1
    conf = json.loads(conf['value'])
    #istantiation of SQLAlquemy object
    try:
        db = sql.create_engine(conf['conn_string'])
        metadata = sql.schema.MetaData(bind=db,reflect=True,schema=conf['db_schema'])
        table = sql.Table(conf['db_table'], metadata, autoload=True)
        orm.clear_mappers() 
        rowmapper = orm.mapper(plominoData,table)
    except:
        api.portal.show_message(message=u'Si sono verificati errori nella connessione al database', request=doc.REQUEST )
        return -1
    #creating session
    Sess = orm.sessionmaker(bind = db)
    session = Sess()
    #deleting row from database
    docid = doc.getId()
    session.query(plominoData).filter_by(id=docid).delete()
    session.commit()    
开发者ID:mamogmx,项目名称:iol.import,代码行数:25,代码来源:pgReplication.py


示例2: db_commit

def db_commit():
    from openpyxl import load_workbook
    wb = load_workbook(filename=os.path.join(app.config['UPLOAD_FOLDER'],session['filename']), read_only=True)
    sheet_headers=session['sheet_headers']
    
    ws = wb.active

    tab=session['table_name']
    metadata = MetaData(bind=e)    
    t = Table(tab, metadata, Column('id', Integer, primary_key=True),*(Column(header, String(8000)) for header in sheet_headers))
    clear_mappers() 
    mapper(sheet, t)
      
    handler_count = session['handler_count']
    handle_size = session['handle_size']
    count=0
    handle_size_counter=0
    for r in ws.rows:
        count+=1
        if handler_count>count-1:
            continue
        else:
            handle_size_counter+=1
            s = sheet()
            cou=0
            for c in r:
                setattr(s,sheet_headers[cou],c.value)
                cou+=1
                
            db_session.add(s)
            if handle_size_counter-1==handle_size:
                break
    session['handler_count']=handler_count+handle_size_counter
    db_session.commit()
    return redirect(url_for('database_handler'))
开发者ID:navdeepniku,项目名称:riskadvisors,代码行数:35,代码来源:routes.py


示例3: __init__

    def __init__(self, dbname='xrayref.db'):
        "connect to an existing database"
        if not os.path.exists(dbname):
            parent, child = os.path.split(__file__)
            dbname = os.path.join(parent, dbname)
            if not os.path.exists(dbname):
                raise IOError("Database '%s' not found!" % dbname)

        if not isxrayDB(dbname):
            raise ValueError("'%s' is not a valid X-ray Database file!" % dbname)

        self.dbname = dbname
        self.engine = make_engine(dbname)
        self.conn = self.engine.connect()
        self.session = sessionmaker(bind=self.engine)()
        self.metadata =  MetaData(self.engine)
        self.metadata.reflect()
        tables = self.tables = self.metadata.tables
        try:
            clear_mappers()
        except:
            pass
        mapper(ChantlerTable,            tables['Chantler'])
        mapper(WaasmaierTable,           tables['Waasmaier'])
        mapper(KeskiRahkonenKrauseTable, tables['KeskiRahkonen_Krause'])
        mapper(ElementsTable,            tables['elements'])
        mapper(XrayLevelsTable,          tables['xray_levels'])
        mapper(XrayTransitionsTable,     tables['xray_transitions'])
        mapper(CosterKronigTable,        tables['Coster_Kronig'])
        mapper(PhotoAbsorptionTable,     tables['photoabsorption'])
        mapper(ScatteringTable,          tables['scattering'])
开发者ID:newville,项目名称:xraylarch,代码行数:31,代码来源:xraydb.py


示例4: go

        def go():
            mapper(A, table1, properties={
                "bs": relationship(B, order_by=table2.c.col1)
            })
            mapper(B, table2)

            mapper(A, table1, non_primary=True)

            sess = create_session()
            a1 = A(col2="a1")
            a2 = A(col2="a2")
            a3 = A(col2="a3")
            a1.bs.append(B(col2="b1"))
            a1.bs.append(B(col2="b2"))
            a3.bs.append(B(col2="b3"))
            for x in [a1, a2, a3]:
                sess.add(x)
            sess.flush()
            sess.expunge_all()

            alist = sess.query(A).order_by(A.col1).all()
            eq_(
                [
                    A(col2="a1", bs=[B(col2="b1"), B(col2="b2")]),
                    A(col2="a2", bs=[]),
                    A(col2="a3", bs=[B(col2="b3")])
                ],
                alist)

            for a in alist:
                sess.delete(a)
            sess.flush()
            sess.close()
            clear_mappers()
开发者ID:Attsun1031,项目名称:sqlalchemy,代码行数:34,代码来源:test_memusage.py


示例5: _create_tables

    def _create_tables(self):

        metadata = MetaData()
        metadata.bind = self.engine
        self.messages = self._create_table('messages', metadata,
                                           Column('id', String(192), index=True, primary_key=True),
                                           Column('content', Text, nullable=False),
                                           Column('received_at', DateTime, nullable=False),
                                           Column('sent_at', DateTime))

        self.digests = self._create_table('digests', metadata,
                                          Column('msg_id', String(192), ForeignKey('messages.id', ondelete='cascade'),
                                                 primary_key=True),
                                          Column('send_to', String(192), index=True, primary_key=True),
                                          Column('scheduled_at', DateTime, nullable=False),
                                          Column('sent_at', DateTime))

        metadata.bind = self.engine
        metadata.create_all(self.engine)

        clear_mappers()
        mapper(Message, self.messages, properties={
                'digests': relationship(Digest, backref='msg')
                })
        mapper(Digest, self.digests)

        self.session = create_session()
开发者ID:sirech,项目名称:deliver,代码行数:27,代码来源:base.py


示例6: tearDownAll

    def tearDownAll(self):
        global clear_mappers
        if clear_mappers is None:
            from sqlalchemy.orm import clear_mappers

        clear_mappers()
        _otest_metadata.drop_all()
开发者ID:tehasdf,项目名称:sqlalchemy,代码行数:7,代码来源:testing.py


示例7: test_noninherited_warning

    def test_noninherited_warning(self):
        A, B, b_table, a_table, Dest, dest_table = (
            self.classes.A,
            self.classes.B,
            self.tables.b_table,
            self.tables.a_table,
            self.classes.Dest,
            self.tables.dest_table,
        )

        mapper(A, a_table, properties={"some_dest": relationship(Dest)})
        mapper(B, b_table, inherits=A, concrete=True)
        mapper(Dest, dest_table)
        b = B()
        dest = Dest()
        assert_raises(AttributeError, setattr, b, "some_dest", dest)
        clear_mappers()

        mapper(A, a_table, properties={"a_id": a_table.c.id})
        mapper(B, b_table, inherits=A, concrete=True)
        mapper(Dest, dest_table)
        b = B()
        assert_raises(AttributeError, setattr, b, "a_id", 3)
        clear_mappers()

        mapper(A, a_table, properties={"a_id": a_table.c.id})
        mapper(B, b_table, inherits=A, concrete=True)
        mapper(Dest, dest_table)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:28,代码来源:test_concrete.py


示例8: teardown

    def teardown(self):
        clear_mappers()

        for db in self.dbs:
            db.connect().invalidate()
        for i in range(1, 5):
            os.remove("shard%d_%s.db" % (i, provision.FOLLOWER_IDENT))
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:7,代码来源:test_horizontal_shard.py


示例9: test_broken

    def test_broken(self):
        exception = False
        try:
            clear_mappers()
            dao = Dao("")
            dao.load_gtfs(BROKEN_GTFS, lenient=False)
        except KeyError:
            exception = True
        self.assertTrue(exception)

        clear_mappers()
        dao = Dao("")
        dao.load_gtfs(BROKEN_GTFS, lenient=True)

        # The following are based on BROKEN GTFS content,
        # that is the entities count minus broken ones.
        self.assertTrue(len(dao.routes()) == 4)
        self.assertTrue(len(list(dao.stops())) == 12)
        self.assertTrue(len(dao.calendars()) == 2)
        self.assertTrue(len(list(dao.trips())) == 104)
        self.assertTrue(len(dao.stoptimes()) == 500)
        self.assertTrue(len(dao.fare_attributes()) == 2)
        self.assertTrue(len(dao.fare_rules()) == 4)
        # This stop has missing coordinates in the broken file
        stop00 = dao.stop('FUR_CREEK_RES3')
        self.assertAlmostEquals(stop00.stop_lat, 0.0, 5)
        self.assertAlmostEquals(stop00.stop_lon, 0.0, 5)
开发者ID:pailakka,项目名称:gtfslib-python,代码行数:27,代码来源:test_broken.py


示例10: saveData

    def saveData(self,doc):
        self.plominoDoc = doc
        plominoDb = doc.getParentDatabase()
        prm = doc.getItem('pg_replication_config',0)

        self.conn_string = plominoDb.connString
        self.db_schema = plominoDb.dbSchema
        self.db_table = plominoDb.dbTable

        db = sql.create_engine(self.conn_string)
        metadata = sql.schema.MetaData(bind=db,reflect=True,schema=self.db_schema)
        table = sql.Table(self.db_table, metadata, autoload=True)
        orm.clear_mappers()
        rowmapper = orm.mapper(plominoData,table)
        Sess = orm.sessionmaker(bind = db)
        
        doc = self.plominoDoc
        data = self.getPlominoValues()
        data = json.loads(json.dumps(data, default=DateTime.DateTime.ISO,use_decimal=True ))
        data['id'] = doc.getId()
        data['plominodb'] = doc.getParentDatabase().id
        data['plominoform'] = doc.getForm().getFormName()
        data['owner'] = doc.getItem('owner','')
        data['review_state'] = doc.getItem('iol','')
        row = plominoData(data['id'],data['plominodb'],data['plominoform'],data['owner'],data['review_state'],data) 
        id = data['id']
        session = Sess()
        session.query(plominoData).filter_by(id=row.id).delete()
        session.commit()
        session.add(row)
        session.commit()        
开发者ID:mamogmx,项目名称:gisweb.plomino.extensions,代码行数:31,代码来源:pgReplication.py


示例11: connect

    def connect(self, dbname, server='sqlite', backup=True):
        "connect to an existing database"
        if server == 'sqlite':
            if not os.path.exists(dbname):
                raise IOError("Database '%s' not found!" % dbname)
            
            if not isMotorDB(dbname):
                raise ValueError("'%s' is not an Motor file!" % dbname)

            if backup:
                save_backup(dbname)
        self.dbname = dbname
        self.engine = make_engine(dbname, server)
        self.conn = self.engine.connect()
        self.session = sessionmaker(bind=self.engine)()

        self.metadata =  MetaData(self.engine)
        self.metadata.reflect()
        tables = self.tables = self.metadata.tables

        try:
            clear_mappers()
        except:
            pass

        mapper(MotorsTable,   tables['motors'])
        mapper(InfoTable,   tables['info'])
开发者ID:pyepics,项目名称:epicsapps,代码行数:27,代码来源:motordb.py


示例12: connect

    def connect(self, dbname, backup=True):
        "connect to an existing database"
        if not os.path.exists(dbname):
            raise IOError("Database '%s' not found!" % dbname)

        if not isInstrumentDB(dbname):
            raise ValueError("'%s' is not an Instrument file!" % dbname)

        if backup:
            save_backup(dbname)
        self.dbname = dbname
        self.engine = create_engine('sqlite:///%s' % self.dbname,
                                    poolclass = SingletonThreadPool)
        self.conn = self.engine.connect()
        self.session = sessionmaker(bind=self.engine)()

        self.metadata =  MetaData(self.engine)
        self.metadata.reflect()
        tables = self.tables = self.metadata.tables

        try:
            clear_mappers()
        except:
            pass

        mapper(Info,     tables['info'])
        mapper(Command,  tables['command'])
        mapper(PV,       tables['pv'])

        mapper(Instrument, tables['instrument'],
               properties={'pvs': relationship(PV,
                                               backref='instrument',
                                    secondary=tables['instrument_pv'])})

        mapper(PVType,   tables['pvtype'],
               properties={'pv':
                           relationship(PV, backref='pvtype')})

        mapper(Position, tables['position'],
               properties={'instrument': relationship(Instrument,
                                                      backref='positions'),
                           'pvs': relationship(Position_PV) })

        mapper(Instrument_PV, tables['instrument_pv'],
               properties={'pv':relationship(PV),
                           'instrument':relationship(Instrument)})

        mapper(Position_PV, tables['position_pv'],
               properties={'pv':relationship(PV)})

        mapper(Instrument_Precommand,  tables['instrument_precommand'],
               properties={'instrument': relationship(Instrument,
                                                      backref='precommands'),
                           'command':   relationship(Command,
                                                     backref='inst_precoms')})
        mapper(Instrument_Postcommand,   tables['instrument_postcommand'],
               properties={'instrument': relationship(Instrument,
                                                      backref='postcommands'),
                           'command':   relationship(Command,
                                                     backref='inst_postcoms')})
开发者ID:promiseX,项目名称:epicsapps,代码行数:60,代码来源:instrument.py


示例13: _do_mapper_test

    def _do_mapper_test(self, configs):
        opts = {
            'lazyload':'select',
            'joinedload':'joined',
            'subqueryload':'subquery',
        }

        for o, i, k, count in configs:
            mapper(User, users, properties={
                'orders':relationship(Order, lazy=opts[o], order_by=orders.c.id), 
            })
            mapper(Order, orders, properties={
                'items':relationship(Item, 
                            secondary=order_items, lazy=opts[i], order_by=items.c.id), 
            })
            mapper(Item, items, properties={
                'keywords':relationship(Keyword, 
                                            lazy=opts[k],
                                            secondary=item_keywords,
                                            order_by=keywords.c.id)
            })
            mapper(Keyword, keywords)
            
            try:
                self._do_query_tests([], count)
            finally:
                clear_mappers()
开发者ID:gaguilarmi,项目名称:sqlalchemy,代码行数:27,代码来源:test_subquery_relations.py


示例14: queryDb

def queryDb():
    try:
        metadata = MetaData(bind=e)    
        t = Table(session['table_name'], metadata, Column('id', Integer, primary_key=True),*(Column(header, String(8000)) for header in session['sheet_headers']))
        clear_mappers() 
        mapper(sheet, t)
        qu = request.get_json()
        print qu
        query_var_byUser = qu['col_value']
        query_column_byUser = qu['col_name']
        qargs = {query_column_byUser:query_var_byUser}
        acc = db_session.query(sheet).filter_by(**qargs).all()
        result_list=[]
        for item in acc:
            temp_dict={}
            for head in session['sheet_headers']:
                temp_dict[head]=getattr(item,head)
            result_list.append(temp_dict)
        
        print result_list
        if result_list==[]:
            return jsonify([{"No Records available for this Query":""}]), 200
        return jsonify(result_list), 200
    except:
        return jsonify([{"Enter valid Data":""}]), 200
开发者ID:navdeepniku,项目名称:riskadvisors,代码行数:25,代码来源:routes.py


示例15: test_invocation_systemwide_loaders

    def test_invocation_systemwide_loaders(self):
        baked.bake_lazy_loaders()
        try:
            User, Address = self._o2m_fixture()

            sess = Session()
            q = sess.query(User).options(lazyload(User.addresses))
            with mock.patch.object(BakedLazyLoader, "_emit_lazyload") as el:
                u1 = q.first()
                u1.addresses
                # invoked
                is_(
                    el.mock_calls[0][1][1],
                    u1._sa_instance_state
                )
        finally:
            baked.unbake_lazy_loaders()

        clear_mappers()
        User, Address = self._o2m_fixture()
        sess = Session()
        q = sess.query(User).options(lazyload(User.addresses))

        with mock.patch.object(BakedLazyLoader, "_emit_lazyload") as el:
            u1 = q.first()
            u1.addresses
            # not invoked
            eq_(el.mock_calls, [])
开发者ID:CyberCollins,项目名称:sqlalchemy,代码行数:28,代码来源:test_baked.py


示例16: restore

def restore(self):
	"""restores a griffith compressed backup"""
	filename = gutils.file_chooser(_("Restore Griffith backup"), \
		action=gtk.FILE_CHOOSER_ACTION_OPEN, buttons= \
		(gtk.STOCK_CANCEL, gtk.RESPONSE_CANCEL, \
		gtk.STOCK_OPEN, gtk.RESPONSE_OK))
	if filename[0]:
		try:
			zip = zipfile.ZipFile(filename[0], 'r')
		except:
			gutils.error(self, _("Can't read backup file"), self.widgets['window'])
			return False
		mypath = os.path.join(self.locations['posters'])
		for each in zip.namelist():
			file_to_restore = os.path.split(each)
			if not os.path.isdir(file_to_restore[1]):
				if file_to_restore[1] == '':
					continue
				if file_to_restore[1].endswith('.jpg'):
					myfile = os.path.join(mypath,file_to_restore[1])
				else:
					myfile = os.path.join(self.locations['home'],file_to_restore[1])
				outfile = open(myfile, 'wb')
				outfile.write(zip.read(each))
				outfile.flush()
				outfile.close()
		zip.close()

		# restore config file
		self.config = config.Config(file=os.path.join(self.locations['home'],'griffith.conf'))
		filename = os.path.join(self.locations['home'], self.config["default_db"])

		self.db.metadata.engine.dispose() # close DB
		from sqlalchemy.orm import clear_mappers
		clear_mappers()

		# check if file needs conversion
		if self.config['default_db'].lower().endswith('.gri'):
			self.debug.show('Old database format detected. Converting...')
			from dbupgrade import convert_from_old_db
			from initialize	import location_posters
			if convert_from_old_db(self, filename, os.path.join(self.locations['home'], 'griffith.db')):
				self.config.save()
				location_posters(self.locations, self.config)
			else:
				print 'Cant convert old database, exiting.'
				import sys
				sys.exit(4)

		self.db = sql.GriffithSQL(self.config, self.debug, self.locations['home'])
		from initialize	import dictionaries, people_treeview
		dictionaries(self)
		people_treeview(self)
		# let's refresh the treeview
		self.clear_details()
		self.populate_treeview()
		self.go_last()
		self.treeview_clicked()
		self.count_statusbar()
		gutils.info(self, _("Backup restored"), self.widgets['window'])
开发者ID:BackupTheBerlios,项目名称:griffith-svn,代码行数:60,代码来源:backup.py


示例17: go

        def go():
            m1 = mapper(A, table1, properties={
                "bs":relation(B)
            })
            m2 = mapper(B, table2)

            m3 = mapper(A, table1, non_primary=True)

            sess = create_session()
            a1 = A(col2="a1")
            a2 = A(col2="a2")
            a3 = A(col2="a3")
            a1.bs.append(B(col2="b1"))
            a1.bs.append(B(col2="b2"))
            a3.bs.append(B(col2="b3"))
            for x in [a1,a2,a3]:
                sess.save(x)
            sess.flush()
            sess.clear()

            alist = sess.query(A).all()
            self.assertEquals(
                [
                    A(col2="a1", bs=[B(col2="b1"), B(col2="b2")]),
                    A(col2="a2", bs=[]),
                    A(col2="a3", bs=[B(col2="b3")])
                ],
                alist)

            for a in alist:
                sess.delete(a)
            sess.flush()
            sess.close()
            clear_mappers()
开发者ID:Frihet,项目名称:sqlalchemy-patches,代码行数:34,代码来源:memusage.py


示例18: setUp

    def setUp(self):
        clear_mappers()
        pysqla.AVAILABLE_OBJECTS = pysqla._marker
        self.session = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))
        Base = extended_declarative_base(
            self.session,
            metadata=sa.MetaData('sqlite:///:memory:'))

        class Test1(Base):
            id = sa.Column(sa.Integer, primary_key=True)
            name = sa.Column(sa.String(50))

        class Test2(Base):
            idtest = sa.Column(sa.Integer, primary_key=True)
            name = sa.Column(sa.String(50))

        Base.metadata.create_all()

        self.Test1 = Test1
        self.Test2 = Test2
        with transaction.manager:
            self.value1 = Test1(name='Bob')
            self.session.add(self.value1)
            self.value2 = Test2(name='Bob')
            self.session.add(self.value2)
开发者ID:LeResKP,项目名称:pyramid_sqladmin,代码行数:25,代码来源:test_init.py


示例19: reflect

def reflect(engine, models, schema = None):
    metadata = MetaData()
    metadata.bind = engine

    with warnings.catch_warnings():
        warnings.simplefilter("ignore", category = SAWarning)
        metadata.reflect(schema = schema, views = False)

    if schema is not None:
        tables = dict((table_name.replace(str(schema) + ".", ""), table)
                      for table_name, table in metadata.tables.iteritems())
    else:
        tables = metadata.tables

    clear_mappers()

    mappers = {}
    for table_name, table in tables.iteritems():
        modelname = "".join([word.capitalize() for word in table_name.split("_")])

        try:
            model = getattr(models, modelname)
        except AttributeError:
            stderr.write("Missing model for table %s\n" % table_name)
        else:
            mappers[modelname] = mapper(model, table)

    Session = sessionmaker(bind = engine, autocommit = False, autoflush = True)

    return mappers, tables, Session
开发者ID:petrushev,项目名称:ideasphere,代码行数:30,代码来源:__init__.py


示例20: setMappers

    def setMappers(self):
        clear_mappers()
        self.proprio = getProprio(self.metadata)
        self.proprio.create(checkfirst=True)

        mapper(Proprietaire, self.proprio,
               primary_key=[self.proprio.c.pro_pk], )
开发者ID:gitesdewallonie,项目名称:gites.ldapimport,代码行数:7,代码来源:pg.py



注:本文中的sqlalchemy.orm.clear_mappers函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python orm.column_property函数代码示例发布时间:2022-05-27
下一篇:
Python orm.class_mapper函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap