本文整理汇总了Python中sqlalchemy.orm.clear_mappers函数的典型用法代码示例。如果您正苦于以下问题:Python clear_mappers函数的具体用法?Python clear_mappers怎么用?Python clear_mappers使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了clear_mappers函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: delData
def delData(doc):
#getting database configuration
param_name = 'db_%s' %doc.getParentDatabase().id
conf = doc.get_properties(params=(param_name, )).values()[0]
if not 'value' in conf.keys():
api.portal.show_message(message='Replication not configured')
return -1
conf = json.loads(conf['value'])
#istantiation of SQLAlquemy object
try:
db = sql.create_engine(conf['conn_string'])
metadata = sql.schema.MetaData(bind=db,reflect=True,schema=conf['db_schema'])
table = sql.Table(conf['db_table'], metadata, autoload=True)
orm.clear_mappers()
rowmapper = orm.mapper(plominoData,table)
except:
api.portal.show_message(message=u'Si sono verificati errori nella connessione al database', request=doc.REQUEST )
return -1
#creating session
Sess = orm.sessionmaker(bind = db)
session = Sess()
#deleting row from database
docid = doc.getId()
session.query(plominoData).filter_by(id=docid).delete()
session.commit()
开发者ID:mamogmx,项目名称:iol.import,代码行数:25,代码来源:pgReplication.py
示例2: db_commit
def db_commit():
from openpyxl import load_workbook
wb = load_workbook(filename=os.path.join(app.config['UPLOAD_FOLDER'],session['filename']), read_only=True)
sheet_headers=session['sheet_headers']
ws = wb.active
tab=session['table_name']
metadata = MetaData(bind=e)
t = Table(tab, metadata, Column('id', Integer, primary_key=True),*(Column(header, String(8000)) for header in sheet_headers))
clear_mappers()
mapper(sheet, t)
handler_count = session['handler_count']
handle_size = session['handle_size']
count=0
handle_size_counter=0
for r in ws.rows:
count+=1
if handler_count>count-1:
continue
else:
handle_size_counter+=1
s = sheet()
cou=0
for c in r:
setattr(s,sheet_headers[cou],c.value)
cou+=1
db_session.add(s)
if handle_size_counter-1==handle_size:
break
session['handler_count']=handler_count+handle_size_counter
db_session.commit()
return redirect(url_for('database_handler'))
开发者ID:navdeepniku,项目名称:riskadvisors,代码行数:35,代码来源:routes.py
示例3: __init__
def __init__(self, dbname='xrayref.db'):
"connect to an existing database"
if not os.path.exists(dbname):
parent, child = os.path.split(__file__)
dbname = os.path.join(parent, dbname)
if not os.path.exists(dbname):
raise IOError("Database '%s' not found!" % dbname)
if not isxrayDB(dbname):
raise ValueError("'%s' is not a valid X-ray Database file!" % dbname)
self.dbname = dbname
self.engine = make_engine(dbname)
self.conn = self.engine.connect()
self.session = sessionmaker(bind=self.engine)()
self.metadata = MetaData(self.engine)
self.metadata.reflect()
tables = self.tables = self.metadata.tables
try:
clear_mappers()
except:
pass
mapper(ChantlerTable, tables['Chantler'])
mapper(WaasmaierTable, tables['Waasmaier'])
mapper(KeskiRahkonenKrauseTable, tables['KeskiRahkonen_Krause'])
mapper(ElementsTable, tables['elements'])
mapper(XrayLevelsTable, tables['xray_levels'])
mapper(XrayTransitionsTable, tables['xray_transitions'])
mapper(CosterKronigTable, tables['Coster_Kronig'])
mapper(PhotoAbsorptionTable, tables['photoabsorption'])
mapper(ScatteringTable, tables['scattering'])
开发者ID:newville,项目名称:xraylarch,代码行数:31,代码来源:xraydb.py
示例4: go
def go():
mapper(A, table1, properties={
"bs": relationship(B, order_by=table2.c.col1)
})
mapper(B, table2)
mapper(A, table1, non_primary=True)
sess = create_session()
a1 = A(col2="a1")
a2 = A(col2="a2")
a3 = A(col2="a3")
a1.bs.append(B(col2="b1"))
a1.bs.append(B(col2="b2"))
a3.bs.append(B(col2="b3"))
for x in [a1, a2, a3]:
sess.add(x)
sess.flush()
sess.expunge_all()
alist = sess.query(A).order_by(A.col1).all()
eq_(
[
A(col2="a1", bs=[B(col2="b1"), B(col2="b2")]),
A(col2="a2", bs=[]),
A(col2="a3", bs=[B(col2="b3")])
],
alist)
for a in alist:
sess.delete(a)
sess.flush()
sess.close()
clear_mappers()
开发者ID:Attsun1031,项目名称:sqlalchemy,代码行数:34,代码来源:test_memusage.py
示例5: _create_tables
def _create_tables(self):
metadata = MetaData()
metadata.bind = self.engine
self.messages = self._create_table('messages', metadata,
Column('id', String(192), index=True, primary_key=True),
Column('content', Text, nullable=False),
Column('received_at', DateTime, nullable=False),
Column('sent_at', DateTime))
self.digests = self._create_table('digests', metadata,
Column('msg_id', String(192), ForeignKey('messages.id', ondelete='cascade'),
primary_key=True),
Column('send_to', String(192), index=True, primary_key=True),
Column('scheduled_at', DateTime, nullable=False),
Column('sent_at', DateTime))
metadata.bind = self.engine
metadata.create_all(self.engine)
clear_mappers()
mapper(Message, self.messages, properties={
'digests': relationship(Digest, backref='msg')
})
mapper(Digest, self.digests)
self.session = create_session()
开发者ID:sirech,项目名称:deliver,代码行数:27,代码来源:base.py
示例6: tearDownAll
def tearDownAll(self):
global clear_mappers
if clear_mappers is None:
from sqlalchemy.orm import clear_mappers
clear_mappers()
_otest_metadata.drop_all()
开发者ID:tehasdf,项目名称:sqlalchemy,代码行数:7,代码来源:testing.py
示例7: test_noninherited_warning
def test_noninherited_warning(self):
A, B, b_table, a_table, Dest, dest_table = (
self.classes.A,
self.classes.B,
self.tables.b_table,
self.tables.a_table,
self.classes.Dest,
self.tables.dest_table,
)
mapper(A, a_table, properties={"some_dest": relationship(Dest)})
mapper(B, b_table, inherits=A, concrete=True)
mapper(Dest, dest_table)
b = B()
dest = Dest()
assert_raises(AttributeError, setattr, b, "some_dest", dest)
clear_mappers()
mapper(A, a_table, properties={"a_id": a_table.c.id})
mapper(B, b_table, inherits=A, concrete=True)
mapper(Dest, dest_table)
b = B()
assert_raises(AttributeError, setattr, b, "a_id", 3)
clear_mappers()
mapper(A, a_table, properties={"a_id": a_table.c.id})
mapper(B, b_table, inherits=A, concrete=True)
mapper(Dest, dest_table)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:28,代码来源:test_concrete.py
示例8: teardown
def teardown(self):
clear_mappers()
for db in self.dbs:
db.connect().invalidate()
for i in range(1, 5):
os.remove("shard%d_%s.db" % (i, provision.FOLLOWER_IDENT))
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:7,代码来源:test_horizontal_shard.py
示例9: test_broken
def test_broken(self):
exception = False
try:
clear_mappers()
dao = Dao("")
dao.load_gtfs(BROKEN_GTFS, lenient=False)
except KeyError:
exception = True
self.assertTrue(exception)
clear_mappers()
dao = Dao("")
dao.load_gtfs(BROKEN_GTFS, lenient=True)
# The following are based on BROKEN GTFS content,
# that is the entities count minus broken ones.
self.assertTrue(len(dao.routes()) == 4)
self.assertTrue(len(list(dao.stops())) == 12)
self.assertTrue(len(dao.calendars()) == 2)
self.assertTrue(len(list(dao.trips())) == 104)
self.assertTrue(len(dao.stoptimes()) == 500)
self.assertTrue(len(dao.fare_attributes()) == 2)
self.assertTrue(len(dao.fare_rules()) == 4)
# This stop has missing coordinates in the broken file
stop00 = dao.stop('FUR_CREEK_RES3')
self.assertAlmostEquals(stop00.stop_lat, 0.0, 5)
self.assertAlmostEquals(stop00.stop_lon, 0.0, 5)
开发者ID:pailakka,项目名称:gtfslib-python,代码行数:27,代码来源:test_broken.py
示例10: saveData
def saveData(self,doc):
self.plominoDoc = doc
plominoDb = doc.getParentDatabase()
prm = doc.getItem('pg_replication_config',0)
self.conn_string = plominoDb.connString
self.db_schema = plominoDb.dbSchema
self.db_table = plominoDb.dbTable
db = sql.create_engine(self.conn_string)
metadata = sql.schema.MetaData(bind=db,reflect=True,schema=self.db_schema)
table = sql.Table(self.db_table, metadata, autoload=True)
orm.clear_mappers()
rowmapper = orm.mapper(plominoData,table)
Sess = orm.sessionmaker(bind = db)
doc = self.plominoDoc
data = self.getPlominoValues()
data = json.loads(json.dumps(data, default=DateTime.DateTime.ISO,use_decimal=True ))
data['id'] = doc.getId()
data['plominodb'] = doc.getParentDatabase().id
data['plominoform'] = doc.getForm().getFormName()
data['owner'] = doc.getItem('owner','')
data['review_state'] = doc.getItem('iol','')
row = plominoData(data['id'],data['plominodb'],data['plominoform'],data['owner'],data['review_state'],data)
id = data['id']
session = Sess()
session.query(plominoData).filter_by(id=row.id).delete()
session.commit()
session.add(row)
session.commit()
开发者ID:mamogmx,项目名称:gisweb.plomino.extensions,代码行数:31,代码来源:pgReplication.py
示例11: connect
def connect(self, dbname, server='sqlite', backup=True):
"connect to an existing database"
if server == 'sqlite':
if not os.path.exists(dbname):
raise IOError("Database '%s' not found!" % dbname)
if not isMotorDB(dbname):
raise ValueError("'%s' is not an Motor file!" % dbname)
if backup:
save_backup(dbname)
self.dbname = dbname
self.engine = make_engine(dbname, server)
self.conn = self.engine.connect()
self.session = sessionmaker(bind=self.engine)()
self.metadata = MetaData(self.engine)
self.metadata.reflect()
tables = self.tables = self.metadata.tables
try:
clear_mappers()
except:
pass
mapper(MotorsTable, tables['motors'])
mapper(InfoTable, tables['info'])
开发者ID:pyepics,项目名称:epicsapps,代码行数:27,代码来源:motordb.py
示例12: connect
def connect(self, dbname, backup=True):
"connect to an existing database"
if not os.path.exists(dbname):
raise IOError("Database '%s' not found!" % dbname)
if not isInstrumentDB(dbname):
raise ValueError("'%s' is not an Instrument file!" % dbname)
if backup:
save_backup(dbname)
self.dbname = dbname
self.engine = create_engine('sqlite:///%s' % self.dbname,
poolclass = SingletonThreadPool)
self.conn = self.engine.connect()
self.session = sessionmaker(bind=self.engine)()
self.metadata = MetaData(self.engine)
self.metadata.reflect()
tables = self.tables = self.metadata.tables
try:
clear_mappers()
except:
pass
mapper(Info, tables['info'])
mapper(Command, tables['command'])
mapper(PV, tables['pv'])
mapper(Instrument, tables['instrument'],
properties={'pvs': relationship(PV,
backref='instrument',
secondary=tables['instrument_pv'])})
mapper(PVType, tables['pvtype'],
properties={'pv':
relationship(PV, backref='pvtype')})
mapper(Position, tables['position'],
properties={'instrument': relationship(Instrument,
backref='positions'),
'pvs': relationship(Position_PV) })
mapper(Instrument_PV, tables['instrument_pv'],
properties={'pv':relationship(PV),
'instrument':relationship(Instrument)})
mapper(Position_PV, tables['position_pv'],
properties={'pv':relationship(PV)})
mapper(Instrument_Precommand, tables['instrument_precommand'],
properties={'instrument': relationship(Instrument,
backref='precommands'),
'command': relationship(Command,
backref='inst_precoms')})
mapper(Instrument_Postcommand, tables['instrument_postcommand'],
properties={'instrument': relationship(Instrument,
backref='postcommands'),
'command': relationship(Command,
backref='inst_postcoms')})
开发者ID:promiseX,项目名称:epicsapps,代码行数:60,代码来源:instrument.py
示例13: _do_mapper_test
def _do_mapper_test(self, configs):
opts = {
'lazyload':'select',
'joinedload':'joined',
'subqueryload':'subquery',
}
for o, i, k, count in configs:
mapper(User, users, properties={
'orders':relationship(Order, lazy=opts[o], order_by=orders.c.id),
})
mapper(Order, orders, properties={
'items':relationship(Item,
secondary=order_items, lazy=opts[i], order_by=items.c.id),
})
mapper(Item, items, properties={
'keywords':relationship(Keyword,
lazy=opts[k],
secondary=item_keywords,
order_by=keywords.c.id)
})
mapper(Keyword, keywords)
try:
self._do_query_tests([], count)
finally:
clear_mappers()
开发者ID:gaguilarmi,项目名称:sqlalchemy,代码行数:27,代码来源:test_subquery_relations.py
示例14: queryDb
def queryDb():
try:
metadata = MetaData(bind=e)
t = Table(session['table_name'], metadata, Column('id', Integer, primary_key=True),*(Column(header, String(8000)) for header in session['sheet_headers']))
clear_mappers()
mapper(sheet, t)
qu = request.get_json()
print qu
query_var_byUser = qu['col_value']
query_column_byUser = qu['col_name']
qargs = {query_column_byUser:query_var_byUser}
acc = db_session.query(sheet).filter_by(**qargs).all()
result_list=[]
for item in acc:
temp_dict={}
for head in session['sheet_headers']:
temp_dict[head]=getattr(item,head)
result_list.append(temp_dict)
print result_list
if result_list==[]:
return jsonify([{"No Records available for this Query":""}]), 200
return jsonify(result_list), 200
except:
return jsonify([{"Enter valid Data":""}]), 200
开发者ID:navdeepniku,项目名称:riskadvisors,代码行数:25,代码来源:routes.py
示例15: test_invocation_systemwide_loaders
def test_invocation_systemwide_loaders(self):
baked.bake_lazy_loaders()
try:
User, Address = self._o2m_fixture()
sess = Session()
q = sess.query(User).options(lazyload(User.addresses))
with mock.patch.object(BakedLazyLoader, "_emit_lazyload") as el:
u1 = q.first()
u1.addresses
# invoked
is_(
el.mock_calls[0][1][1],
u1._sa_instance_state
)
finally:
baked.unbake_lazy_loaders()
clear_mappers()
User, Address = self._o2m_fixture()
sess = Session()
q = sess.query(User).options(lazyload(User.addresses))
with mock.patch.object(BakedLazyLoader, "_emit_lazyload") as el:
u1 = q.first()
u1.addresses
# not invoked
eq_(el.mock_calls, [])
开发者ID:CyberCollins,项目名称:sqlalchemy,代码行数:28,代码来源:test_baked.py
示例16: restore
def restore(self):
"""restores a griffith compressed backup"""
filename = gutils.file_chooser(_("Restore Griffith backup"), \
action=gtk.FILE_CHOOSER_ACTION_OPEN, buttons= \
(gtk.STOCK_CANCEL, gtk.RESPONSE_CANCEL, \
gtk.STOCK_OPEN, gtk.RESPONSE_OK))
if filename[0]:
try:
zip = zipfile.ZipFile(filename[0], 'r')
except:
gutils.error(self, _("Can't read backup file"), self.widgets['window'])
return False
mypath = os.path.join(self.locations['posters'])
for each in zip.namelist():
file_to_restore = os.path.split(each)
if not os.path.isdir(file_to_restore[1]):
if file_to_restore[1] == '':
continue
if file_to_restore[1].endswith('.jpg'):
myfile = os.path.join(mypath,file_to_restore[1])
else:
myfile = os.path.join(self.locations['home'],file_to_restore[1])
outfile = open(myfile, 'wb')
outfile.write(zip.read(each))
outfile.flush()
outfile.close()
zip.close()
# restore config file
self.config = config.Config(file=os.path.join(self.locations['home'],'griffith.conf'))
filename = os.path.join(self.locations['home'], self.config["default_db"])
self.db.metadata.engine.dispose() # close DB
from sqlalchemy.orm import clear_mappers
clear_mappers()
# check if file needs conversion
if self.config['default_db'].lower().endswith('.gri'):
self.debug.show('Old database format detected. Converting...')
from dbupgrade import convert_from_old_db
from initialize import location_posters
if convert_from_old_db(self, filename, os.path.join(self.locations['home'], 'griffith.db')):
self.config.save()
location_posters(self.locations, self.config)
else:
print 'Cant convert old database, exiting.'
import sys
sys.exit(4)
self.db = sql.GriffithSQL(self.config, self.debug, self.locations['home'])
from initialize import dictionaries, people_treeview
dictionaries(self)
people_treeview(self)
# let's refresh the treeview
self.clear_details()
self.populate_treeview()
self.go_last()
self.treeview_clicked()
self.count_statusbar()
gutils.info(self, _("Backup restored"), self.widgets['window'])
开发者ID:BackupTheBerlios,项目名称:griffith-svn,代码行数:60,代码来源:backup.py
示例17: go
def go():
m1 = mapper(A, table1, properties={
"bs":relation(B)
})
m2 = mapper(B, table2)
m3 = mapper(A, table1, non_primary=True)
sess = create_session()
a1 = A(col2="a1")
a2 = A(col2="a2")
a3 = A(col2="a3")
a1.bs.append(B(col2="b1"))
a1.bs.append(B(col2="b2"))
a3.bs.append(B(col2="b3"))
for x in [a1,a2,a3]:
sess.save(x)
sess.flush()
sess.clear()
alist = sess.query(A).all()
self.assertEquals(
[
A(col2="a1", bs=[B(col2="b1"), B(col2="b2")]),
A(col2="a2", bs=[]),
A(col2="a3", bs=[B(col2="b3")])
],
alist)
for a in alist:
sess.delete(a)
sess.flush()
sess.close()
clear_mappers()
开发者ID:Frihet,项目名称:sqlalchemy-patches,代码行数:34,代码来源:memusage.py
示例18: setUp
def setUp(self):
clear_mappers()
pysqla.AVAILABLE_OBJECTS = pysqla._marker
self.session = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))
Base = extended_declarative_base(
self.session,
metadata=sa.MetaData('sqlite:///:memory:'))
class Test1(Base):
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.String(50))
class Test2(Base):
idtest = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.String(50))
Base.metadata.create_all()
self.Test1 = Test1
self.Test2 = Test2
with transaction.manager:
self.value1 = Test1(name='Bob')
self.session.add(self.value1)
self.value2 = Test2(name='Bob')
self.session.add(self.value2)
开发者ID:LeResKP,项目名称:pyramid_sqladmin,代码行数:25,代码来源:test_init.py
示例19: reflect
def reflect(engine, models, schema = None):
metadata = MetaData()
metadata.bind = engine
with warnings.catch_warnings():
warnings.simplefilter("ignore", category = SAWarning)
metadata.reflect(schema = schema, views = False)
if schema is not None:
tables = dict((table_name.replace(str(schema) + ".", ""), table)
for table_name, table in metadata.tables.iteritems())
else:
tables = metadata.tables
clear_mappers()
mappers = {}
for table_name, table in tables.iteritems():
modelname = "".join([word.capitalize() for word in table_name.split("_")])
try:
model = getattr(models, modelname)
except AttributeError:
stderr.write("Missing model for table %s\n" % table_name)
else:
mappers[modelname] = mapper(model, table)
Session = sessionmaker(bind = engine, autocommit = False, autoflush = True)
return mappers, tables, Session
开发者ID:petrushev,项目名称:ideasphere,代码行数:30,代码来源:__init__.py
示例20: setMappers
def setMappers(self):
clear_mappers()
self.proprio = getProprio(self.metadata)
self.proprio.create(checkfirst=True)
mapper(Proprietaire, self.proprio,
primary_key=[self.proprio.c.pro_pk], )
开发者ID:gitesdewallonie,项目名称:gites.ldapimport,代码行数:7,代码来源:pg.py
注:本文中的sqlalchemy.orm.clear_mappers函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论