本文整理汇总了Python中sqlalchemy.join函数的典型用法代码示例。如果您正苦于以下问题:Python join函数的具体用法?Python join怎么用?Python join使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了join函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: getSensorEquipment
def getSensorEquipment(request):
session = request.dbsession
id = request.matchdict['id']
curSensor = session.query(Sensor).get(id)
curSensorType = curSensor.GetType().Name
if ('RFID' in curSensorType.upper()):
table = Base.metadata.tables['MonitoredSiteEquipment']
joinTable = join(table, Sensor, table.c['FK_Sensor'] == Sensor.ID)
joinTable = join(joinTable, MonitoredSite, table.c[
'FK_MonitoredSite'] == MonitoredSite.ID)
query = select([table.c['StartDate'], table.c['EndDate'], Sensor.UnicIdentifier, MonitoredSite.Name, MonitoredSite.ID.label('MonitoredSiteID')]).select_from(joinTable
).where(table.c['FK_Sensor'] == id
).order_by(desc(table.c['StartDate']))
elif (curSensorType.lower() in ['gsm', 'satellite', 'vhf']):
table = Base.metadata.tables['IndividualEquipment']
joinTable = join(table, Sensor, table.c['FK_Sensor'] == Sensor.ID)
query = select([table.c['StartDate'], table.c['EndDate'], table.c['FK_Individual'], Sensor.UnicIdentifier]).select_from(joinTable
).where(table.c['FK_Sensor'] == id
).order_by(desc(table.c['StartDate']))
else:
return 'bad request'
result = session.execute(query).fetchall()
response = []
for row in result:
curRow = OrderedDict(row)
curRow['StartDate'] = curRow['StartDate'].strftime('%Y-%m-%d %H:%M:%S')
curRow['EndDate'] = curRow['EndDate'].strftime(
'%Y-%m-%d %H:%M:%S') if curRow['EndDate'] is not None else None
curRow['format'] = 'YYYY-MM-DD HH:mm:ss'
response.append(curRow)
return response
开发者ID:romfabbro,项目名称:ecoReleve-Data,代码行数:35,代码来源:sensor.py
示例2: getIndivEquipment
def getIndivEquipment(request):
session = request.dbsession
id_indiv = request.matchdict['id']
table = Base.metadata.tables['IndividualEquipment']
joinTable = join(table, Sensor, table.c['FK_Sensor'] == Sensor.ID)
joinTable = join(joinTable, SensorType,
Sensor.FK_SensorType == SensorType.ID)
query = select([table.c['StartDate'], table.c['EndDate'], Sensor.UnicIdentifier, Sensor.ID.label('SensorID'), table.c['FK_Individual'], SensorType.Name.label('Type')]
).select_from(joinTable
).where(table.c['FK_Individual'] == id_indiv
).order_by(desc(table.c['StartDate']))
result = session.execute(query).fetchall()
response = []
for row in result:
curRow = OrderedDict(row)
curRow['StartDate'] = curRow['StartDate'].strftime('%Y-%m-%d %H:%M:%S')
if curRow['EndDate'] is not None:
curRow['EndDate'] = curRow['EndDate'].strftime('%Y-%m-%d %H:%M:%S')
else:
curRow['EndDate'] = ''
response.append(curRow)
return response
开发者ID:romfabbro,项目名称:ecoReleve-Data,代码行数:25,代码来源:individual.py
示例3: test_moving_plugin_attributes
def test_moving_plugin_attributes(self):
clusters = self.meta.tables['clusters']
attributes = self.meta.tables['attributes']
plugins = self.meta.tables['plugins']
cluster_plugins = self.meta.tables['cluster_plugins']
query = sa.select([attributes.c.editable])\
.select_from(
sa.join(
attributes, clusters,
attributes.c.cluster_id == clusters.c.id))
result = jsonutils.loads(db.execute(query).fetchone()[0])
self.assertItemsEqual(result, {})
query = sa.select([cluster_plugins.c.attributes])\
.select_from(
sa.join(
cluster_plugins, plugins,
cluster_plugins.c.plugin_id == plugins.c.id))\
.where(plugins.c.name == 'test_plugin_a')
result = jsonutils.loads(db.execute(query).fetchone()[0])
self.assertNotIn('metadata', result)
self.assertItemsEqual(result['attribute'], {
'value': 'value',
'type': 'text',
'description': 'description',
'weight': 25,
'label': 'label'
})
开发者ID:ansumanbebarta,项目名称:fuel-web,代码行数:29,代码来源:test_migration_fuel_8_0.py
示例4: GetFlatDataList
def GetFlatDataList(self,searchInfo=None,getFieldWorkers=True) :
''' Override parent function to include management of Observation/Protocols and fieldWorkers '''
fullQueryJoinOrdered = self.GetFullQuery(searchInfo)
result = self.ObjContext.execute(fullQueryJoinOrdered).fetchall()
data = []
if getFieldWorkers:
# listID = list(map(lambda x: x['ID'],result))
queryCTE = fullQueryJoinOrdered.cte()
joinFW = join(Station_FieldWorker,User,Station_FieldWorker.FK_FieldWorker==User.id)
joinTable = join(queryCTE,joinFW,queryCTE.c['ID']== Station_FieldWorker.FK_Station)
query = select([Station_FieldWorker.FK_Station,User.Login]).select_from(joinTable)
FieldWorkers = self.ObjContext.execute(query).fetchall()
list_ = {}
for x,y in FieldWorkers :
list_.setdefault(x,[]).append(y)
for row in result :
row = OrderedDict(row)
try :
row['FK_FieldWorker_FieldWorkers'] = list_[row['ID']]
except:
pass
data.append(row)
else:
for row in result :
row = OrderedDict(row)
data.append(row)
return data
开发者ID:FredericBerton,项目名称:ecoReleve-Data,代码行数:30,代码来源:List.py
示例5: monitoredSitesArea
def monitoredSitesArea(request):
req = request.POST
if 'name_view' in req :
print('name_view')
try :
proto_view_Table = Base.metadata.tables[proto_view_Name]
join_table = join(proto_view_Table, Station, proto_view_Table.c['TSta_PK_ID'] == Station.id )
except :
proto_view_Table = dict_proto[proto_view_Name]()
join_table = join(proto_view_Table, Station, proto_view_Table.FK_TSta_ID == Station.id )
print (proto_view_Table)
slct = select([Station.area]).distinct().select_from(join_table)
data = DBSession.execute(slct).fetchall()
return [row['Region' or 'Area'] for row in data]
else :
table = Base.metadata.tables['geo_CNTRIES_and_RENECO_MGMTAreas']
slct = select([table.c['Place']]).distinct()
data = DBSession.execute(slct).fetchall()
return [row[0] for row in data]
开发者ID:NaturalSolutions,项目名称:ecoReleve-Server,代码行数:26,代码来源:station.py
示例6: monitoredSitesLocality
def monitoredSitesLocality(request):
req = request.POST
if 'name_view' in req :
print('name_view')
try :
proto_view_Table=Base.metadata.tables[proto_view_Name]
join_table=join(proto_view_Table, Station, proto_view_Table.c['TSta_PK_ID'] == Station.id )
except :
proto_view_Table=dict_proto[proto_view_Name]()
join_table=join(proto_view_Table, Station, proto_view_Table.FK_TSta_ID == Station.id )
slct=select([Station.locality]).distinct().select_from(join_table)
data = DBSession.execute(slct).fetchall()
return [row['Place' or 'Locality'] for row in data]
else :
if 'Region' in req :
query=select([Station.locality]).distinct().where(Station.area==req.get('Region'))
else :
query=select([Station.locality]).distinct()
data=DBSession.execute(query).fetchall()
return [row[0] for row in data]
开发者ID:NaturalSolutions,项目名称:ecoReleve-Server,代码行数:25,代码来源:station.py
示例7: determine_fetches
def determine_fetches(db_session, cred):
for thread in db_session.query(Thread).filter_by(closed=False):
update_thread_status(thread, cred)
db_session.flush()
incomplete_page_ids = (
sa.select([ThreadPost.page_id])
.group_by(ThreadPost.page_id)
.having(sa.func.count(ThreadPost.id) < 40)
.as_scalar()
)
incomplete_pages = sa.select(
[ThreadPage.thread_id, ThreadPage.page_num], from_obj=sa.join(ThreadPage, Thread)
).where(sa.and_(ThreadPage.id.in_(incomplete_page_ids), Thread.closed == sa.false()))
fetch_status = (
sa.select(
[ThreadPage.thread_id.label("thread_id"), sa.func.max(ThreadPage.page_num).label("last_fetched_page")]
)
.group_by(ThreadPage.thread_id)
.alias("fetch_status")
)
unfetched_pages = sa.select(
[
Thread.id.label("thread_id"),
sa.func.generate_series(fetch_status.c.last_fetched_page + 1, Thread.page_count).label("page_num"),
],
from_obj=sa.join(Thread, fetch_status, Thread.id == fetch_status.c.thread_id),
)
fetched_first_pages = sa.select([ThreadPage.thread_id]).where(ThreadPage.page_num == 1).as_scalar()
unfetched_first_pages = sa.select(
[Thread.id.label("thread_id"), sa.literal(1, sa.Integer).label("page_num")], from_obj=Thread
).where(Thread.id.notin_(fetched_first_pages))
q = sa.union(incomplete_pages, unfetched_pages, unfetched_first_pages)
q = q.order_by(q.c.thread_id.asc(), q.c.page_num.asc())
return db_session.execute(q).fetchall()
开发者ID:inklesspen,项目名称:mimir,代码行数:34,代码来源:fetch.py
示例8: getEquipment
def getEquipment(self):
id_site = self.objectDB.ID
table = Base.metadata.tables['MonitoredSiteEquipment']
joinTable = join(table, Sensor, table.c['FK_Sensor'] == Sensor.ID)
joinTable = join(joinTable, SensorType,
Sensor.FK_SensorType == SensorType.ID)
query = select([table.c['StartDate'],
table.c['EndDate'],
Sensor.UnicIdentifier,
table.c['FK_MonitoredSite'],
SensorType.Name.label('Type')]
).select_from(joinTable
).where(table.c['FK_MonitoredSite'] == id_site
).order_by(desc(table.c['StartDate']))
result = self.session.execute(query).fetchall()
response = []
for row in result:
curRow = OrderedDict(row)
curRow['StartDate'] = curRow['StartDate'].strftime('%Y-%m-%d %H:%M:%S')
if curRow['EndDate'] is not None:
curRow['EndDate'] = curRow['EndDate'].strftime('%Y-%m-%d %H:%M:%S')
else:
curRow['EndDate'] = ''
response.append(curRow)
return response
开发者ID:jvitus,项目名称:ecoReleve-Data,代码行数:28,代码来源:monitoredSite.py
示例9: _rel_child
def _rel_child(parent_acl_ids, source=True):
"""Get left side of relationships mappings through source."""
rel_table = all_models.Relationship.__table__
acl_table = all_models.AccessControlList.__table__
parent_acr = all_models.AccessControlRole.__table__.alias(
"parent_acr_{}".format(source)
)
child_acr = all_models.AccessControlRole.__table__.alias(
"child_acr_{}".format(source)
)
if source:
object_id = rel_table.c.destination_id
object_type = rel_table.c.destination_type
else:
object_id = rel_table.c.source_id
object_type = rel_table.c.source_type
acl_link = sa.and_(
acl_table.c.object_id == rel_table.c.id,
acl_table.c.object_type == all_models.Relationship.__name__,
)
select_statement = sa.select([
acl_table.c.person_id.label("person_id"),
child_acr.c.id.label("ac_role_id"),
object_id.label("object_id"),
object_type.label("object_type"),
sa.func.now().label("created_at"),
sa.literal(login.get_current_user_id()).label("modified_by_id"),
sa.func.now().label("updated_at"),
acl_table.c.id.label("parent_id"),
acl_table.c.id.label("parent_id_nn"),
]).select_from(
sa.join(
sa.join(
sa.join(
rel_table,
acl_table,
acl_link
),
parent_acr,
parent_acr.c.id == acl_table.c.ac_role_id
),
child_acr,
child_acr.c.parent_id == parent_acr.c.id
)
).where(
sa.and_(
acl_table.c.id.in_(parent_acl_ids),
child_acr.c.object_type == object_type,
)
)
return select_statement
开发者ID:egorhm,项目名称:ggrc-core,代码行数:54,代码来源:propagation.py
示例10: user_per_month
def user_per_month(self, trans, **kwd):
params = util.Params(kwd)
message = ''
email = util.restore_text(params.get('email', ''))
specs = sorter('date', kwd)
sort_id = specs.sort_id
order = specs.order
arrow = specs.arrow
_order = specs.exc_order
q = sa.select((self.select_month(model.Job.table.c.create_time).label('date'),
sa.func.count(model.Job.table.c.id).label('total_jobs')),
whereclause=model.User.table.c.email == email,
from_obj=[sa.join(model.Job.table, model.User.table)],
group_by=self.group_by_month(model.Job.table.c.create_time),
order_by=[_order])
all_jobs_per_user = sa.select((model.Job.table.c.create_time.label('date'),
model.Job.table.c.id.label('job_id')),
whereclause=sa.and_(model.User.table.c.email == email),
from_obj=[sa.join(model.Job.table, model.User.table)])
trends = dict()
for job in all_jobs_per_user.execute():
job_day = int(job.date.strftime("%-d")) - 1
job_month = int(job.date.strftime("%-m"))
job_month_name = job.date.strftime("%B")
job_year = job.date.strftime("%Y")
key = str(job_month_name + job_year)
try:
trends[key][job_day] += 1
except KeyError:
job_year = int(job_year)
wday, day_range = calendar.monthrange(job_year, job_month)
trends[key] = [0] * day_range
trends[key][job_day] += 1
jobs = []
for row in q.execute():
jobs.append((row.date.strftime("%Y-%m"),
row.total_jobs,
row.date.strftime("%B"),
row.date.strftime("%Y")))
return trans.fill_template('/webapps/reports/jobs_user_per_month.mako',
order=order,
arrow=arrow,
sort_id=sort_id,
id=kwd.get('id'),
trends=trends,
email=util.sanitize_text(email),
jobs=jobs, message=message)
开发者ID:ImmPortDB,项目名称:immport-galaxy,代码行数:53,代码来源:jobs.py
示例11: _propagate_to_wf_children
def _propagate_to_wf_children(new_wf_acls, child_class):
"""Propagate newly added roles to workflow objects.
Args:
wf_new_acl: list of all newly created acl entries for workflows
Returns:
list of newly created acl entries for task groups.
"""
child_table = child_class.__table__
acl_table = all_models.AccessControlList.__table__
acr_table = all_models.AccessControlRole.__table__.alias("parent_acr")
acr_mapped_table = all_models.AccessControlRole.__table__.alias("mapped")
current_user_id = login.get_current_user_id()
select_statement = sa.select([
acl_table.c.person_id,
acr_mapped_table.c.id,
child_table.c.id,
sa.literal(child_class.__name__),
sa.func.now(),
sa.literal(current_user_id),
sa.func.now(),
acl_table.c.id.label("parent_id"),
acl_table.c.id.label("parent_id_nn"),
]).select_from(
sa.join(
sa.join(
sa.join(
child_table,
acl_table,
sa.and_(
acl_table.c.object_id == child_table.c.workflow_id,
acl_table.c.object_type == all_models.Workflow.__name__,
)
),
acr_table,
),
acr_mapped_table,
acr_mapped_table.c.name == sa.func.concat(
acr_table.c.name, " Mapped")
)
).where(
acl_table.c.id.in_(new_wf_acls)
)
acl_utils.insert_select_acls(select_statement)
return _get_child_ids(new_wf_acls, child_class)
开发者ID:egorhm,项目名称:ggrc-core,代码行数:51,代码来源:workflow.py
示例12: getIndivHistory
def getIndivHistory(request):
session = request.dbsession
id = request.matchdict['id']
tableJoin = join(IndividualDynPropValue,IndividualDynProp
,IndividualDynPropValue.FK_IndividualDynProp == IndividualDynProp.ID)
query = select([IndividualDynPropValue,IndividualDynProp.Name]).select_from(tableJoin).where(
IndividualDynPropValue.FK_Individual == id
).order_by(desc(IndividualDynPropValue.StartDate))
result = session.execute(query).fetchall()
response = []
for row in result:
curRow = OrderedDict(row)
dictRow = {}
for key in curRow :
if curRow[key] is not None :
if 'Value' in key :
dictRow['value'] = curRow[key]
elif 'FK' not in key :
dictRow[key] = curRow[key]
dictRow['StartDate'] = curRow['StartDate'].strftime('%Y-%m-%d %H:%M:%S')
response.append(dictRow)
return response
开发者ID:gerald13,项目名称:ecoReleve-Data,代码行数:25,代码来源:individual.py
示例13: test_cross_schema_reflection_one
def test_cross_schema_reflection_one(self):
meta1 = self.metadata
users = Table('users', meta1,
Column('user_id', Integer, primary_key=True),
Column('user_name', String(30), nullable=False),
schema='test_schema')
addresses = Table(
'email_addresses', meta1,
Column(
'address_id', Integer, primary_key=True),
Column(
'remote_user_id', Integer, ForeignKey(
users.c.user_id)),
Column(
'email_address', String(20)), schema='test_schema')
meta1.create_all()
meta2 = MetaData(testing.db)
addresses = Table('email_addresses', meta2, autoload=True,
schema='test_schema')
users = Table('users', meta2, mustexist=True,
schema='test_schema')
j = join(users, addresses)
self.assert_((users.c.user_id
== addresses.c.remote_user_id).compare(j.onclause))
开发者ID:DeepakAlevoor,项目名称:sqlalchemy,代码行数:26,代码来源:test_reflection.py
示例14: apply_default_value
def apply_default_value(self, column):
if column.default:
execute = self.table.migration.conn.execute
val = column.default.arg
table = self.table.migration.metadata.tables[self.table.name]
table.append_column(column)
cname = getattr(table.c, column.name)
if column.default.is_callable:
Table = self.table.migration.metadata.tables['system_model']
Column = self.table.migration.metadata.tables['system_column']
j1 = join(Table, Column, Table.c.name == Column.c.model)
query = select([Column.c.name]).select_from(j1)
query = query.where(Column.c.primary_key.is_(True))
query = query.where(Table.c.table == self.table.name)
columns = [x[0] for x in execute(query).fetchall()]
query = select([func.count()]).select_from(table)
query = query.where(cname.is_(None))
nb_row = self.table.migration.conn.execute(query).fetchone()[0]
for offset in range(nb_row):
query = select(columns).select_from(table)
query = query.where(cname.is_(None)).limit(1)
res = execute(query).fetchone()
where = and_(
*[getattr(table.c, x) == res[x] for x in columns])
query = update(table).where(where).values(
{cname: val(None)})
execute(query)
else:
query = update(table).where(cname.is_(None)).values(
{cname: val})
execute(query)
开发者ID:AnyBlok,项目名称:AnyBlok,代码行数:33,代码来源:migration.py
示例15: user_per_month
def user_per_month( self, trans, **kwd ):
params = util.Params( kwd )
message = ''
email = util.restore_text( params.get( 'email', '' ) )
specs = sorter( 'date', kwd )
sort_id = specs.sort_id
order = specs.order
arrow = specs.arrow
_order = specs.exc_order
q = sa.select( ( self.select_month( model.Job.table.c.create_time ).label( 'date' ),
sa.func.count( model.Job.table.c.id ).label( 'total_jobs' ) ),
whereclause=sa.and_( model.Job.table.c.session_id == model.GalaxySession.table.c.id,
model.GalaxySession.table.c.user_id == model.User.table.c.id,
model.User.table.c.email == email ),
from_obj=[ sa.join( model.Job.table, model.User.table ) ],
group_by=self.group_by_month( model.Job.table.c.create_time ),
order_by=[ _order ] )
jobs = []
for row in q.execute():
jobs.append( ( row.date.strftime( "%Y-%m" ),
row.total_jobs,
row.date.strftime( "%B" ),
row.date.strftime( "%Y" ) ) )
return trans.fill_template( '/webapps/reports/jobs_user_per_month.mako',
order=order,
arrow=arrow,
sort_id=sort_id,
id=kwd.get('id'),
email=util.sanitize_text( email ),
jobs=jobs, message=message )
开发者ID:Christian-B,项目名称:galaxy,代码行数:31,代码来源:jobs.py
示例16: indiv_details
def indiv_details(request):
params=int(request.matchdict['id'])
join_table = join(SatTrx, ObjectsCaracValues, SatTrx.ptt == cast(ObjectsCaracValues.value, Integer)
).join(Individual, ObjectsCaracValues.object==Individual.id)
query=select([ObjectsCaracValues.value.label('id'), Individual.id.label('ind_id'),Individual.survey_type.label('survey_type'), Individual.status.label('status')
, Individual.monitoring_status.label('monitoring_status'), Individual.birth_date.label('birth_date'), Individual.ptt.label('ptt'),ObjectsCaracValues.begin_date.label('begin_date'),ObjectsCaracValues.end_date.label('end_date')]
).select_from(join_table
).where(and_(SatTrx.model.like('GSM%'),ObjectsCaracValues.carac_type==19,ObjectsCaracValues.object_type=='Individual')
).where(ObjectsCaracValues.value==params).order_by(desc(ObjectsCaracValues.begin_date))
data=DBSession.execute(query).first()
transaction.commit()
if data['end_date'] == None :
end_date=datetime.datetime.now()
else :
end_date=data['end_date']
result=dict([ (key[0],key[1]) for key in data.items()])
print(result)
result['duration']=(end_date.month-data['begin_date'].month)+(end_date.year-data['begin_date'].year)*12
query = select([V_Individuals_LatLonDate.c.date]
).where(V_Individuals_LatLonDate.c.ind_id == result['ind_id']
).order_by(desc(V_Individuals_LatLonDate.c.date)).limit(1)
lastObs=DBSession.execute(query).fetchone()
result['last_observation']=lastObs['date'].strftime('%d/%m/%Y')
if result['birth_date']!= None:
result['birth_date']=result['birth_date'].strftime('%d/%m/%Y')
del result['begin_date'], result['end_date']
print (result)
return result
开发者ID:NaturalSolutions,项目名称:ecoReleve-Server,代码行数:34,代码来源:data_gsm.py
示例17: job
def job(request, name):
engine = yield from aiopg.sa.create_engine(DATABASE_URL)
with (yield from engine) as conn:
jobs = yield from conn.execute(select(
[Job.__table__, Maintainer.__table__,],
use_labels=True
).select_from(join(
Maintainer.__table__,
Job.__table__,
Maintainer.id == Job.maintainer_id
)).where(Job.name == name).limit(1))
job = yield from jobs.first()
runs = yield from conn.execute(
select([Run.id, Run.failed, Run.start_time, Run.end_time]).
where(Run.job_id == job.job_id).
order_by(Run.id.desc())
)
return request.render('job.html', {
"job": job,
"runs": runs,
"next_run": humanize.naturaltime(job.job_scheduled),
})
开发者ID:dmc2015,项目名称:moxie,代码行数:25,代码来源:app.py
示例18: active_traffic_groups
def active_traffic_groups(cls, when=None):
return select([TrafficGroup]).select_from(
join(TrafficGroup,
Membership).join(cls)
).where(
Membership.active(when)
)
开发者ID:agdsn,项目名称:pycroft,代码行数:7,代码来源:user.py
示例19: thd
def thd(conn):
bs_tbl = self.db.model.buildsets
ch_tbl = self.db.model.changes
j = sa.join(self.db.model.buildsets,
self.db.model.sourcestampsets)
j = j.join(self.db.model.sourcestamps)
j = j.join(self.db.model.sourcestamp_changes)
j = j.join(ch_tbl)
q = sa.select(columns=[bs_tbl], from_obj=[j],
distinct=True)
q = q.order_by(sa.desc(bs_tbl.c.id))
q = q.limit(count)
if complete is not None:
if complete:
q = q.where(bs_tbl.c.complete != 0)
else:
q = q.where((bs_tbl.c.complete == 0) |
(bs_tbl.c.complete == None))
if branch:
q = q.where(ch_tbl.c.branch == branch)
if repository:
q = q.where(ch_tbl.c.repository == repository)
res = conn.execute(q)
return list(reversed([ self._row2dict(row)
for row in res.fetchall() ]))
开发者ID:jhnwsk,项目名称:buildbot,代码行数:26,代码来源:buildsets.py
示例20: test_schema_reflection
def test_schema_reflection(self):
"""note: this test requires that the 'test_schema' schema be
separate and accessible by the test user"""
meta1 = self.metadata
users = Table('users', meta1, Column('user_id', Integer,
primary_key=True), Column('user_name',
String(30), nullable=False), schema='test_schema')
addresses = Table(
'email_addresses',
meta1,
Column('address_id', Integer, primary_key=True),
Column('remote_user_id', Integer,
ForeignKey(users.c.user_id)),
Column('email_address', String(20)),
schema='test_schema',
)
meta1.create_all()
meta2 = MetaData(testing.db)
addresses = Table('email_addresses', meta2, autoload=True,
schema='test_schema')
users = Table('users', meta2, mustexist=True,
schema='test_schema')
j = join(users, addresses)
self.assert_((users.c.user_id
== addresses.c.remote_user_id).compare(j.onclause))
开发者ID:aburan28,项目名称:sqlalchemy,代码行数:27,代码来源:test_reflection.py
注:本文中的sqlalchemy.join函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论