• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python sqlalchemy.join函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.join函数的典型用法代码示例。如果您正苦于以下问题:Python join函数的具体用法?Python join怎么用?Python join使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了join函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: getSensorEquipment

def getSensorEquipment(request):
    session = request.dbsession
    id = request.matchdict['id']
    curSensor = session.query(Sensor).get(id)
    curSensorType = curSensor.GetType().Name

    if ('RFID' in curSensorType.upper()):
        table = Base.metadata.tables['MonitoredSiteEquipment']
        joinTable = join(table, Sensor, table.c['FK_Sensor'] == Sensor.ID)
        joinTable = join(joinTable, MonitoredSite, table.c[
                         'FK_MonitoredSite'] == MonitoredSite.ID)
        query = select([table.c['StartDate'], table.c['EndDate'], Sensor.UnicIdentifier, MonitoredSite.Name, MonitoredSite.ID.label('MonitoredSiteID')]).select_from(joinTable
                                                                                                                                                                     ).where(table.c['FK_Sensor'] == id
                                                                                                                                                                             ).order_by(desc(table.c['StartDate']))

    elif (curSensorType.lower() in ['gsm', 'satellite', 'vhf']):
        table = Base.metadata.tables['IndividualEquipment']
        joinTable = join(table, Sensor, table.c['FK_Sensor'] == Sensor.ID)
        query = select([table.c['StartDate'], table.c['EndDate'], table.c['FK_Individual'], Sensor.UnicIdentifier]).select_from(joinTable
                                                                                                                                ).where(table.c['FK_Sensor'] == id
                                                                                                                                        ).order_by(desc(table.c['StartDate']))
    else:
        return 'bad request'

    result = session.execute(query).fetchall()
    response = []
    for row in result:
        curRow = OrderedDict(row)
        curRow['StartDate'] = curRow['StartDate'].strftime('%Y-%m-%d %H:%M:%S')
        curRow['EndDate'] = curRow['EndDate'].strftime(
            '%Y-%m-%d %H:%M:%S') if curRow['EndDate'] is not None else None
        curRow['format'] = 'YYYY-MM-DD HH:mm:ss'
        response.append(curRow)

    return response
开发者ID:romfabbro,项目名称:ecoReleve-Data,代码行数:35,代码来源:sensor.py


示例2: getIndivEquipment

def getIndivEquipment(request):
    session = request.dbsession
    id_indiv = request.matchdict['id']

    table = Base.metadata.tables['IndividualEquipment']
    joinTable = join(table, Sensor, table.c['FK_Sensor'] == Sensor.ID)
    joinTable = join(joinTable, SensorType,
                     Sensor.FK_SensorType == SensorType.ID)
    query = select([table.c['StartDate'], table.c['EndDate'], Sensor.UnicIdentifier, Sensor.ID.label('SensorID'), table.c['FK_Individual'], SensorType.Name.label('Type')]
                   ).select_from(joinTable
                                 ).where(table.c['FK_Individual'] == id_indiv
                                         ).order_by(desc(table.c['StartDate']))

    result = session.execute(query).fetchall()
    response = []
    for row in result:
        curRow = OrderedDict(row)
        curRow['StartDate'] = curRow['StartDate'].strftime('%Y-%m-%d %H:%M:%S')
        if curRow['EndDate'] is not None:
            curRow['EndDate'] = curRow['EndDate'].strftime('%Y-%m-%d %H:%M:%S')
        else:
            curRow['EndDate'] = ''
        response.append(curRow)

    return response
开发者ID:romfabbro,项目名称:ecoReleve-Data,代码行数:25,代码来源:individual.py


示例3: test_moving_plugin_attributes

    def test_moving_plugin_attributes(self):
        clusters = self.meta.tables['clusters']
        attributes = self.meta.tables['attributes']
        plugins = self.meta.tables['plugins']
        cluster_plugins = self.meta.tables['cluster_plugins']

        query = sa.select([attributes.c.editable])\
            .select_from(
                sa.join(
                    attributes, clusters,
                    attributes.c.cluster_id == clusters.c.id))
        result = jsonutils.loads(db.execute(query).fetchone()[0])
        self.assertItemsEqual(result, {})

        query = sa.select([cluster_plugins.c.attributes])\
            .select_from(
                sa.join(
                    cluster_plugins, plugins,
                    cluster_plugins.c.plugin_id == plugins.c.id))\
            .where(plugins.c.name == 'test_plugin_a')
        result = jsonutils.loads(db.execute(query).fetchone()[0])
        self.assertNotIn('metadata', result)
        self.assertItemsEqual(result['attribute'], {
            'value': 'value',
            'type': 'text',
            'description': 'description',
            'weight': 25,
            'label': 'label'
        })
开发者ID:ansumanbebarta,项目名称:fuel-web,代码行数:29,代码来源:test_migration_fuel_8_0.py


示例4: GetFlatDataList

    def GetFlatDataList(self,searchInfo=None,getFieldWorkers=True) :
        ''' Override parent function to include management of Observation/Protocols and fieldWorkers '''
        fullQueryJoinOrdered = self.GetFullQuery(searchInfo)
        result = self.ObjContext.execute(fullQueryJoinOrdered).fetchall()
        data = []

        if getFieldWorkers:
            # listID = list(map(lambda x: x['ID'],result))
            queryCTE = fullQueryJoinOrdered.cte()
            joinFW = join(Station_FieldWorker,User,Station_FieldWorker.FK_FieldWorker==User.id)
            joinTable = join(queryCTE,joinFW,queryCTE.c['ID']== Station_FieldWorker.FK_Station)

            query = select([Station_FieldWorker.FK_Station,User.Login]).select_from(joinTable)
            FieldWorkers = self.ObjContext.execute(query).fetchall()
        
            list_ = {}
            for x,y in FieldWorkers :
                list_.setdefault(x,[]).append(y)
            for row in result :
                row = OrderedDict(row)
                try :
                    row['FK_FieldWorker_FieldWorkers'] = list_[row['ID']]
                except:
                    pass
                data.append(row)
        else:
            for row in result :
                row = OrderedDict(row)
                data.append(row)
        return data
开发者ID:FredericBerton,项目名称:ecoReleve-Data,代码行数:30,代码来源:List.py


示例5: monitoredSitesArea

def monitoredSitesArea(request):

	req = request.POST

	if 'name_view' in req :
		print('name_view')
		try :
			proto_view_Table = Base.metadata.tables[proto_view_Name]
			join_table = join(proto_view_Table, Station, proto_view_Table.c['TSta_PK_ID'] == Station.id )
		except :
			proto_view_Table = dict_proto[proto_view_Name]()
			join_table = join(proto_view_Table, Station, proto_view_Table.FK_TSta_ID == Station.id )

		print (proto_view_Table)

	
		slct = select([Station.area]).distinct().select_from(join_table)
		data = DBSession.execute(slct).fetchall()
		return [row['Region' or 'Area'] for row in data]


	else :
		table = Base.metadata.tables['geo_CNTRIES_and_RENECO_MGMTAreas']
		slct = select([table.c['Place']]).distinct()
		data =  DBSession.execute(slct).fetchall()
		return [row[0] for row in data]
开发者ID:NaturalSolutions,项目名称:ecoReleve-Server,代码行数:26,代码来源:station.py


示例6: monitoredSitesLocality

def monitoredSitesLocality(request):

	req = request.POST

	if 'name_view' in req :
		print('name_view')
		try :
			proto_view_Table=Base.metadata.tables[proto_view_Name]
			join_table=join(proto_view_Table, Station, proto_view_Table.c['TSta_PK_ID'] == Station.id )

		except :
			
			proto_view_Table=dict_proto[proto_view_Name]()
			join_table=join(proto_view_Table, Station, proto_view_Table.FK_TSta_ID == Station.id )

		slct=select([Station.locality]).distinct().select_from(join_table)
		data = DBSession.execute(slct).fetchall()
		return [row['Place' or 'Locality'] for row in data]
	else :
		if 'Region' in req :
			query=select([Station.locality]).distinct().where(Station.area==req.get('Region'))
		else :
			query=select([Station.locality]).distinct()
		data=DBSession.execute(query).fetchall()
		return [row[0] for row in data]
开发者ID:NaturalSolutions,项目名称:ecoReleve-Server,代码行数:25,代码来源:station.py


示例7: determine_fetches

def determine_fetches(db_session, cred):
    for thread in db_session.query(Thread).filter_by(closed=False):
        update_thread_status(thread, cred)
    db_session.flush()
    incomplete_page_ids = (
        sa.select([ThreadPost.page_id])
        .group_by(ThreadPost.page_id)
        .having(sa.func.count(ThreadPost.id) < 40)
        .as_scalar()
    )
    incomplete_pages = sa.select(
        [ThreadPage.thread_id, ThreadPage.page_num], from_obj=sa.join(ThreadPage, Thread)
    ).where(sa.and_(ThreadPage.id.in_(incomplete_page_ids), Thread.closed == sa.false()))
    fetch_status = (
        sa.select(
            [ThreadPage.thread_id.label("thread_id"), sa.func.max(ThreadPage.page_num).label("last_fetched_page")]
        )
        .group_by(ThreadPage.thread_id)
        .alias("fetch_status")
    )
    unfetched_pages = sa.select(
        [
            Thread.id.label("thread_id"),
            sa.func.generate_series(fetch_status.c.last_fetched_page + 1, Thread.page_count).label("page_num"),
        ],
        from_obj=sa.join(Thread, fetch_status, Thread.id == fetch_status.c.thread_id),
    )
    fetched_first_pages = sa.select([ThreadPage.thread_id]).where(ThreadPage.page_num == 1).as_scalar()
    unfetched_first_pages = sa.select(
        [Thread.id.label("thread_id"), sa.literal(1, sa.Integer).label("page_num")], from_obj=Thread
    ).where(Thread.id.notin_(fetched_first_pages))
    q = sa.union(incomplete_pages, unfetched_pages, unfetched_first_pages)
    q = q.order_by(q.c.thread_id.asc(), q.c.page_num.asc())
    return db_session.execute(q).fetchall()
开发者ID:inklesspen,项目名称:mimir,代码行数:34,代码来源:fetch.py


示例8: getEquipment

    def getEquipment(self):
        id_site = self.objectDB.ID
        table = Base.metadata.tables['MonitoredSiteEquipment']

        joinTable = join(table, Sensor, table.c['FK_Sensor'] == Sensor.ID)
        joinTable = join(joinTable, SensorType,
                         Sensor.FK_SensorType == SensorType.ID)
        query = select([table.c['StartDate'],
                        table.c['EndDate'],
                        Sensor.UnicIdentifier,
                        table.c['FK_MonitoredSite'],
                        SensorType.Name.label('Type')]
                       ).select_from(joinTable
                                     ).where(table.c['FK_MonitoredSite'] == id_site
                                             ).order_by(desc(table.c['StartDate']))

        result = self.session.execute(query).fetchall()
        response = []
        for row in result:
            curRow = OrderedDict(row)
            curRow['StartDate'] = curRow['StartDate'].strftime('%Y-%m-%d %H:%M:%S')
            if curRow['EndDate'] is not None:
                curRow['EndDate'] = curRow['EndDate'].strftime('%Y-%m-%d %H:%M:%S')
            else:
                curRow['EndDate'] = ''
            response.append(curRow)

        return response
开发者ID:jvitus,项目名称:ecoReleve-Data,代码行数:28,代码来源:monitoredSite.py


示例9: _rel_child

def _rel_child(parent_acl_ids, source=True):
  """Get left side of relationships mappings through source."""
  rel_table = all_models.Relationship.__table__
  acl_table = all_models.AccessControlList.__table__
  parent_acr = all_models.AccessControlRole.__table__.alias(
      "parent_acr_{}".format(source)
  )
  child_acr = all_models.AccessControlRole.__table__.alias(
      "child_acr_{}".format(source)
  )

  if source:
    object_id = rel_table.c.destination_id
    object_type = rel_table.c.destination_type
  else:
    object_id = rel_table.c.source_id
    object_type = rel_table.c.source_type

  acl_link = sa.and_(
      acl_table.c.object_id == rel_table.c.id,
      acl_table.c.object_type == all_models.Relationship.__name__,
  )

  select_statement = sa.select([
      acl_table.c.person_id.label("person_id"),
      child_acr.c.id.label("ac_role_id"),
      object_id.label("object_id"),
      object_type.label("object_type"),
      sa.func.now().label("created_at"),
      sa.literal(login.get_current_user_id()).label("modified_by_id"),
      sa.func.now().label("updated_at"),
      acl_table.c.id.label("parent_id"),
      acl_table.c.id.label("parent_id_nn"),
  ]).select_from(
      sa.join(
          sa.join(
              sa.join(
                  rel_table,
                  acl_table,
                  acl_link
              ),
              parent_acr,
              parent_acr.c.id == acl_table.c.ac_role_id
          ),
          child_acr,
          child_acr.c.parent_id == parent_acr.c.id
      )
  ).where(
      sa.and_(
          acl_table.c.id.in_(parent_acl_ids),
          child_acr.c.object_type == object_type,
      )
  )
  return select_statement
开发者ID:egorhm,项目名称:ggrc-core,代码行数:54,代码来源:propagation.py


示例10: user_per_month

    def user_per_month(self, trans, **kwd):
        params = util.Params(kwd)
        message = ''

        email = util.restore_text(params.get('email', ''))
        specs = sorter('date', kwd)
        sort_id = specs.sort_id
        order = specs.order
        arrow = specs.arrow
        _order = specs.exc_order

        q = sa.select((self.select_month(model.Job.table.c.create_time).label('date'),
                       sa.func.count(model.Job.table.c.id).label('total_jobs')),
                      whereclause=model.User.table.c.email == email,
                      from_obj=[sa.join(model.Job.table, model.User.table)],
                      group_by=self.group_by_month(model.Job.table.c.create_time),
                      order_by=[_order])

        all_jobs_per_user = sa.select((model.Job.table.c.create_time.label('date'),
                                       model.Job.table.c.id.label('job_id')),
                                      whereclause=sa.and_(model.User.table.c.email == email),
                                      from_obj=[sa.join(model.Job.table, model.User.table)])

        trends = dict()
        for job in all_jobs_per_user.execute():
            job_day = int(job.date.strftime("%-d")) - 1
            job_month = int(job.date.strftime("%-m"))
            job_month_name = job.date.strftime("%B")
            job_year = job.date.strftime("%Y")
            key = str(job_month_name + job_year)

            try:
                trends[key][job_day] += 1
            except KeyError:
                job_year = int(job_year)
                wday, day_range = calendar.monthrange(job_year, job_month)
                trends[key] = [0] * day_range
                trends[key][job_day] += 1

        jobs = []
        for row in q.execute():
            jobs.append((row.date.strftime("%Y-%m"),
                         row.total_jobs,
                         row.date.strftime("%B"),
                         row.date.strftime("%Y")))
        return trans.fill_template('/webapps/reports/jobs_user_per_month.mako',
                                   order=order,
                                   arrow=arrow,
                                   sort_id=sort_id,
                                   id=kwd.get('id'),
                                   trends=trends,
                                   email=util.sanitize_text(email),
                                   jobs=jobs, message=message)
开发者ID:ImmPortDB,项目名称:immport-galaxy,代码行数:53,代码来源:jobs.py


示例11: _propagate_to_wf_children

def _propagate_to_wf_children(new_wf_acls, child_class):
  """Propagate newly added roles to workflow objects.

  Args:
    wf_new_acl: list of all newly created acl entries for workflows

  Returns:
    list of newly created acl entries for task groups.
  """

  child_table = child_class.__table__
  acl_table = all_models.AccessControlList.__table__
  acr_table = all_models.AccessControlRole.__table__.alias("parent_acr")
  acr_mapped_table = all_models.AccessControlRole.__table__.alias("mapped")

  current_user_id = login.get_current_user_id()

  select_statement = sa.select([
      acl_table.c.person_id,
      acr_mapped_table.c.id,
      child_table.c.id,
      sa.literal(child_class.__name__),
      sa.func.now(),
      sa.literal(current_user_id),
      sa.func.now(),
      acl_table.c.id.label("parent_id"),
      acl_table.c.id.label("parent_id_nn"),
  ]).select_from(
      sa.join(
          sa.join(
              sa.join(
                  child_table,
                  acl_table,
                  sa.and_(
                      acl_table.c.object_id == child_table.c.workflow_id,
                      acl_table.c.object_type == all_models.Workflow.__name__,
                  )
              ),
              acr_table,
          ),
          acr_mapped_table,
          acr_mapped_table.c.name == sa.func.concat(
              acr_table.c.name, " Mapped")
      )
  ).where(
      acl_table.c.id.in_(new_wf_acls)
  )

  acl_utils.insert_select_acls(select_statement)

  return _get_child_ids(new_wf_acls, child_class)
开发者ID:egorhm,项目名称:ggrc-core,代码行数:51,代码来源:workflow.py


示例12: getIndivHistory

def getIndivHistory(request):
    session = request.dbsession
    id = request.matchdict['id']

    tableJoin = join(IndividualDynPropValue,IndividualDynProp
        ,IndividualDynPropValue.FK_IndividualDynProp == IndividualDynProp.ID)
    query = select([IndividualDynPropValue,IndividualDynProp.Name]).select_from(tableJoin).where(
        IndividualDynPropValue.FK_Individual == id
        ).order_by(desc(IndividualDynPropValue.StartDate))

    result = session.execute(query).fetchall()
    response = []
    for row in result:
        curRow = OrderedDict(row)
        dictRow = {}
        for key in curRow :
            if curRow[key] is not None :
                if 'Value' in key :
                    dictRow['value'] = curRow[key] 
                elif 'FK' not in key :
                    dictRow[key] = curRow[key]
        dictRow['StartDate'] = curRow['StartDate'].strftime('%Y-%m-%d %H:%M:%S')
        response.append(dictRow)

    return response
开发者ID:gerald13,项目名称:ecoReleve-Data,代码行数:25,代码来源:individual.py


示例13: test_cross_schema_reflection_one

    def test_cross_schema_reflection_one(self):

        meta1 = self.metadata

        users = Table('users', meta1,
                      Column('user_id', Integer, primary_key=True),
                      Column('user_name', String(30), nullable=False),
                      schema='test_schema')
        addresses = Table(
            'email_addresses', meta1,
            Column(
                'address_id', Integer, primary_key=True),
            Column(
                'remote_user_id', Integer, ForeignKey(
                    users.c.user_id)),
                Column(
                    'email_address', String(20)), schema='test_schema')
        meta1.create_all()
        meta2 = MetaData(testing.db)
        addresses = Table('email_addresses', meta2, autoload=True,
                          schema='test_schema')
        users = Table('users', meta2, mustexist=True,
                      schema='test_schema')
        j = join(users, addresses)
        self.assert_((users.c.user_id
                      == addresses.c.remote_user_id).compare(j.onclause))
开发者ID:DeepakAlevoor,项目名称:sqlalchemy,代码行数:26,代码来源:test_reflection.py


示例14: apply_default_value

    def apply_default_value(self, column):
        if column.default:
            execute = self.table.migration.conn.execute
            val = column.default.arg
            table = self.table.migration.metadata.tables[self.table.name]
            table.append_column(column)
            cname = getattr(table.c, column.name)
            if column.default.is_callable:
                Table = self.table.migration.metadata.tables['system_model']
                Column = self.table.migration.metadata.tables['system_column']
                j1 = join(Table, Column, Table.c.name == Column.c.model)
                query = select([Column.c.name]).select_from(j1)
                query = query.where(Column.c.primary_key.is_(True))
                query = query.where(Table.c.table == self.table.name)
                columns = [x[0] for x in execute(query).fetchall()]

                query = select([func.count()]).select_from(table)
                query = query.where(cname.is_(None))
                nb_row = self.table.migration.conn.execute(query).fetchone()[0]
                for offset in range(nb_row):
                    query = select(columns).select_from(table)
                    query = query.where(cname.is_(None)).limit(1)
                    res = execute(query).fetchone()
                    where = and_(
                        *[getattr(table.c, x) == res[x] for x in columns])
                    query = update(table).where(where).values(
                        {cname: val(None)})
                    execute(query)

            else:
                query = update(table).where(cname.is_(None)).values(
                    {cname: val})
                execute(query)
开发者ID:AnyBlok,项目名称:AnyBlok,代码行数:33,代码来源:migration.py


示例15: user_per_month

    def user_per_month( self, trans, **kwd ):
        params = util.Params( kwd )
        message = ''

        email = util.restore_text( params.get( 'email', '' ) )
        specs = sorter( 'date', kwd )
        sort_id = specs.sort_id
        order = specs.order
        arrow = specs.arrow
        _order = specs.exc_order
        q = sa.select( ( self.select_month( model.Job.table.c.create_time ).label( 'date' ),
                         sa.func.count( model.Job.table.c.id ).label( 'total_jobs' ) ),
                       whereclause=sa.and_( model.Job.table.c.session_id == model.GalaxySession.table.c.id,
                                            model.GalaxySession.table.c.user_id == model.User.table.c.id,
                                            model.User.table.c.email == email ),
                       from_obj=[ sa.join( model.Job.table, model.User.table ) ],
                       group_by=self.group_by_month( model.Job.table.c.create_time ),
                       order_by=[ _order ] )
        jobs = []
        for row in q.execute():
            jobs.append( ( row.date.strftime( "%Y-%m" ),
                           row.total_jobs,
                           row.date.strftime( "%B" ),
                           row.date.strftime( "%Y" ) ) )
        return trans.fill_template( '/webapps/reports/jobs_user_per_month.mako',
                                    order=order,
                                    arrow=arrow,
                                    sort_id=sort_id,
                                    id=kwd.get('id'),
                                    email=util.sanitize_text( email ),
                                    jobs=jobs, message=message )
开发者ID:Christian-B,项目名称:galaxy,代码行数:31,代码来源:jobs.py


示例16: indiv_details

def indiv_details(request):

    params=int(request.matchdict['id'])
    join_table = join(SatTrx, ObjectsCaracValues, SatTrx.ptt == cast(ObjectsCaracValues.value, Integer)
        ).join(Individual, ObjectsCaracValues.object==Individual.id) 

    query=select([ObjectsCaracValues.value.label('id'), Individual.id.label('ind_id'),Individual.survey_type.label('survey_type'), Individual.status.label('status')
        , Individual.monitoring_status.label('monitoring_status'), Individual.birth_date.label('birth_date'), Individual.ptt.label('ptt'),ObjectsCaracValues.begin_date.label('begin_date'),ObjectsCaracValues.end_date.label('end_date')]
        ).select_from(join_table
        ).where(and_(SatTrx.model.like('GSM%'),ObjectsCaracValues.carac_type==19,ObjectsCaracValues.object_type=='Individual')
        ).where(ObjectsCaracValues.value==params).order_by(desc(ObjectsCaracValues.begin_date))
   
    data=DBSession.execute(query).first()
    transaction.commit()
    if data['end_date'] == None :
        end_date=datetime.datetime.now()
    else :
        end_date=data['end_date'] 

    result=dict([ (key[0],key[1]) for key in data.items()])
    print(result)
    result['duration']=(end_date.month-data['begin_date'].month)+(end_date.year-data['begin_date'].year)*12
    
    query = select([V_Individuals_LatLonDate.c.date]
                     ).where(V_Individuals_LatLonDate.c.ind_id == result['ind_id']
                     ).order_by(desc(V_Individuals_LatLonDate.c.date)).limit(1)
     
    lastObs=DBSession.execute(query).fetchone()
    result['last_observation']=lastObs['date'].strftime('%d/%m/%Y')
    if result['birth_date']!= None:
        result['birth_date']=result['birth_date'].strftime('%d/%m/%Y')
    del result['begin_date'], result['end_date']
    print (result)
    return result
开发者ID:NaturalSolutions,项目名称:ecoReleve-Server,代码行数:34,代码来源:data_gsm.py


示例17: job

def job(request, name):
    engine = yield from aiopg.sa.create_engine(DATABASE_URL)
    with (yield from engine) as conn:

        jobs = yield from conn.execute(select(
            [Job.__table__, Maintainer.__table__,],
            use_labels=True
        ).select_from(join(
            Maintainer.__table__,
            Job.__table__,
            Maintainer.id == Job.maintainer_id
        )).where(Job.name == name).limit(1))
        job = yield from jobs.first()

        runs = yield from conn.execute(
            select([Run.id, Run.failed, Run.start_time, Run.end_time]).
            where(Run.job_id == job.job_id).
            order_by(Run.id.desc())
        )

        return request.render('job.html', {
            "job": job,
            "runs": runs,
            "next_run": humanize.naturaltime(job.job_scheduled),
        })
开发者ID:dmc2015,项目名称:moxie,代码行数:25,代码来源:app.py


示例18: active_traffic_groups

 def active_traffic_groups(cls, when=None):
     return select([TrafficGroup]).select_from(
         join(TrafficGroup,
              Membership).join(cls)
     ).where(
         Membership.active(when)
     )
开发者ID:agdsn,项目名称:pycroft,代码行数:7,代码来源:user.py


示例19: thd

        def thd(conn):
            bs_tbl = self.db.model.buildsets
            ch_tbl = self.db.model.changes
            j = sa.join(self.db.model.buildsets,
                               self.db.model.sourcestampsets)
            j = j.join(self.db.model.sourcestamps)
            j = j.join(self.db.model.sourcestamp_changes)
            j = j.join(ch_tbl)
            q = sa.select(columns=[bs_tbl], from_obj=[j],
                                         distinct=True)
            q = q.order_by(sa.desc(bs_tbl.c.id))
            q = q.limit(count)

            if complete is not None:
                if complete:
                    q = q.where(bs_tbl.c.complete != 0)
                else:
                    q = q.where((bs_tbl.c.complete == 0) |
                                (bs_tbl.c.complete == None))
            if branch:
              q = q.where(ch_tbl.c.branch == branch)
            if repository:
              q = q.where(ch_tbl.c.repository == repository)
            res = conn.execute(q)
            return list(reversed([ self._row2dict(row)
                                  for row in res.fetchall() ]))
开发者ID:jhnwsk,项目名称:buildbot,代码行数:26,代码来源:buildsets.py


示例20: test_schema_reflection

    def test_schema_reflection(self):
        """note: this test requires that the 'test_schema' schema be
        separate and accessible by the test user"""

        meta1 = self.metadata

        users = Table('users', meta1, Column('user_id', Integer,
                      primary_key=True), Column('user_name',
                      String(30), nullable=False), schema='test_schema')
        addresses = Table(
            'email_addresses',
            meta1,
            Column('address_id', Integer, primary_key=True),
            Column('remote_user_id', Integer,
                   ForeignKey(users.c.user_id)),
            Column('email_address', String(20)),
            schema='test_schema',
            )
        meta1.create_all()
        meta2 = MetaData(testing.db)
        addresses = Table('email_addresses', meta2, autoload=True,
                          schema='test_schema')
        users = Table('users', meta2, mustexist=True,
                      schema='test_schema')
        j = join(users, addresses)
        self.assert_((users.c.user_id
                     == addresses.c.remote_user_id).compare(j.onclause))
开发者ID:aburan28,项目名称:sqlalchemy,代码行数:27,代码来源:test_reflection.py



注:本文中的sqlalchemy.join函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sqlalchemy.literal_column函数代码示例发布时间:2022-05-27
下一篇:
Python sqlalchemy.inspect函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap