本文整理汇总了Python中sqlalchemy.orm.relation函数的典型用法代码示例。如果您正苦于以下问题:Python relation函数的具体用法?Python relation怎么用?Python relation使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了relation函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setup_orm
def setup_orm():
tables = meta.metadata.tables
orm.mapper(Language, tables['languages'])
orm.mapper(I18nText, tables['i18n_texts'],
properties={
'versions': relation(I18nTextVersion,
order_by=tables['i18n_texts_versions'].c.language_id.asc())
})
orm.mapper(I18nTextVersion, tables['i18n_texts_versions'],
properties={ 'language': relation(Language) })
# LanguageText is deprecated until it uses I18nText
orm.mapper(LanguageText,
tables['language_texts'],
properties={
'language': relation(Language,
backref=backref('texts',
order_by=tables['language_texts'].c.id.asc(),
cascade='all, delete-orphan'))})
orm.mapper(Country,
tables['countries'],
properties={
'language': relation(Language,
backref=backref('countries',
cascade='all, delete-orphan',
order_by=tables['countries'].c.id.asc()))})
开发者ID:nous-consulting,项目名称:ututi,代码行数:29,代码来源:i18n.py
示例2: testone
def testone(self):
"""
Tests eager load of a many-to-one attached to a one-to-many. this
testcase illustrated the bug, which is that when the single Company is
loaded, no further processing of the rows occurred in order to load
the Company's second Address object.
"""
mapper(Address, addresses)
mapper(Company, companies, properties={"addresses": relation(Address, lazy=False)})
mapper(Invoice, invoices, properties={"company": relation(Company, lazy=False)})
a1 = Address(address="a1 address")
a2 = Address(address="a2 address")
c1 = Company(company_name="company 1", addresses=[a1, a2])
i1 = Invoice(date=datetime.datetime.now(), company=c1)
session = create_session()
session.add(i1)
session.flush()
company_id = c1.company_id
invoice_id = i1.invoice_id
session.expunge_all()
c = session.query(Company).get(company_id)
session.expunge_all()
i = session.query(Invoice).get(invoice_id)
eq_(c, i.company)
开发者ID:obeattie,项目名称:sqlalchemy,代码行数:33,代码来源:test_assorted_eager.py
示例3: initialize_mapper
def initialize_mapper():
orm.mapper(Content, content,
polymorphic_on=content.c.object_type,
polymorphic_identity='content',
properties = {
'children': orm.relation(
Content,
backref=orm.backref(
'parent',
remote_side=[content.c.content_id])),
'relations': orm.relation(
Relation,
cascade="all, delete-orphan",
primaryjoin=content.c.content_id==relations.c.source_id,
backref=orm.backref("source"))})
orm.mapper(Relation, relations,
properties = {
'target': orm.relation(
Content, uselist=False,
primaryjoin=content.c.content_id==relations.c.target_id)})
orm.mapper(File, files,
polymorphic_on=files.c.type,
polymorphic_identity='db-file')
开发者ID:acsr,项目名称:collective.dexteritycontentmirror,代码行数:25,代码来源:schema.py
示例4: initModels
def initModels():
"""Initialize timebot DB tables and mappers"""
engine = db.getEngine()
meta = db.getMeta()
session = db.getSession()
tb_user = Table(
'tb_user', meta,
Column('id', Integer, primary_key=True, autoincrement=True),
Column('company_id', None, ForeignKey('tb_company.id'), nullable=True),
Column('jid', Unicode(50), unique=True, nullable=False),
Column('name', Unicode(50)),
Column('rate', Integer))
tb_worktime = Table(
'tb_time', meta,
Column('id', Integer, primary_key=True),
Column('user_id', None, ForeignKey('tb_user.id'), nullable=False),
Column('start', DateTime, default=datetime.now),
Column('stop', DateTime, nullable=True))
tb_company = Table(
'tb_company', meta,
Column('id', Integer, primary_key=True),
Column('name', Unicode(50), nullable=True))
meta.create_all()
mapper(TUser, tb_user, properties=dict(
worktime=relation(TWorktime),
company=relation(TCompany)))
mapper(TWorktime, tb_worktime, properties=dict(
user=relation(TUser)))
mapper(TCompany, tb_company, properties=dict(
users=relation(TUser)))
开发者ID:hdg700,项目名称:gTimebot,代码行数:35,代码来源:models_old.py
示例5: versions_properties
def versions_properties(item_class, change_class, versions_table):
props = {
"change": relation(change_class, uselist=False),
"head": relation(item_class, uselist=False)
}
# Notes:
# - domain.AttachedFile is the only versionable type that is
# not a ParliamentaryItem.
# - !+IVersionable(mr, jul-2011) an AttachedFile does not have
# attached_files; but, this violates the meaning of IVersionable?
# Or, the ability to have attached_files should be independent of
# being versionable? IMayAttachFiles
if item_class is not domain.AttachedFile:
props["attached_files"] = relation(domain.AttachedFileVersion,
primaryjoin=rdb.and_(
versions_table.c.content_id ==
schema.attached_file_versions.c.item_id,
versions_table.c.version_id ==
schema.attached_file_versions.c.file_version_id
),
foreign_keys=[
schema.attached_file_versions.c.item_id,
schema.attached_file_versions.c.file_version_id
]
)
return props
开发者ID:BenoitTalbot,项目名称:bungeni-portal,代码行数:26,代码来源:orm.py
示例6: InitMapper
def InitMapper( cls, metadata, ObjectType ):
cls.__table__ = Table( cls.__tablename__, metadata,
Column('id', Integer, index = True, primary_key = True),
Column('type_id', Integer, ForeignKey( ObjectType.id ), nullable = False),
Column('parent_id', Integer, ForeignKey( "%s.id" % cls.__tablename__ ), nullable = True),
Column('name', Text, nullable = False),
Column('size', Integer(64), nullable = False, default = 0),
Column('pos_x', Integer(64), nullable = False, default = 0),
Column('pos_y', Integer(64), nullable = False, default = 0),
Column('pos_z', Integer(64), nullable = False, default = 0),
Column('mtime', DateTime, nullable = False,
onupdate = func.current_timestamp(), default = func.current_timestamp()))
cols = cls.__table__.c
Index('ix_%s_position' % cls.__tablename__, cols.pos_x, cols.pos_y, cols.pos_z)
mapper( cls, cls.__table__, polymorphic_on = cols.type_id, properties = {
'type': relation( ObjectType,
uselist = False,
backref = backref( 'objects' )),
# Tree like hierarchy for objects ie. Universe => Solar systems => Planets => etc.
'children': relation( cls,
backref = backref( 'parent', remote_side = [ cols.id ] )),
# Object position in 3D space
'position': composite( Vector3D, cols.pos_x, cols.pos_y, cols.pos_z ),
})
开发者ID:cahirwpz,项目名称:tpserver-py,代码行数:27,代码来源:Object.py
示例7: create_tables
def create_tables():
"""Create tables filled with test data."""
occupations_table = Table('occupations', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)))
users_table = Table('users', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
Column('occupation_id', Integer, ForeignKey("occupations.id")))
addresses_table = Table('addresses', metadata,
Column('id', Integer, primary_key=True),
Column('user_id', Integer, ForeignKey("users.id")),
Column('street', String(50)),
Column('city', String(40)))
mapper(Occupation, occupations_table)
mapper(User, users_table, properties={
'occupation' : relation(Occupation, lazy=False),
'addresses': relation(Address, backref='user', lazy=False)})
mapper(Address, addresses_table)
try:
metadata.create_all()
except Exception, error:
# workaround for a problem with PySqlite < 2.6 and SQLAlchemy < 0.5
if 'Cannot operate on a closed cursor' in str(error):
metadata.create_all(checkfirst=False)
else:
raise
开发者ID:OnShift,项目名称:turbogears,代码行数:30,代码来源:test_paginate.py
示例8: init_model
def init_model(engine):
"""Call me before using any of the tables or classes in the model"""
global prefix_table
prefix_table = schema.Table('prefix', meta.metadata,
schema.Column('id', types.Integer,
schema.Sequence('prefix_id_seq'), primary_key=True),
schema.Column('prefix', types.Text())
)
global users_table
users_table = schema.Table('users', meta.metadata,
schema.Column('id', types.Integer,
schema.Sequence('users_id_seq'), primary_key=True),
schema.Column('name', types.Text()),
schema.Column('password', types.Text()),
schema.Column('real_name', types.Text())
)
global users_prefix_access
users_prefix_access = schema.Table('user_prefix_access', meta.metadata,
schema.Column('user_id', types.Integer, schema.ForeignKey('users.id'), primary_key=True),
schema.Column('prefix_id', types.Integer, schema.ForeignKey('prefix.id'), primary_key=True),
schema.Column('write_access', types.Integer)
)
orm.mapper(Users, users_table, properties = { 'base':orm.relation(UsersPrefixAssociation) })
orm.mapper(UsersPrefixAssociation, users_prefix_access, properties = { 'prefix':orm.relation(Prefix) })
orm.mapper(Prefix, prefix_table)
meta.Session.configure(bind=engine)
meta.engine = engine
开发者ID:marta09,项目名称:szarp,代码行数:31,代码来源:__init__.py
示例9: reload_mapper
def reload_mapper(metadata, now):
"""<comment-ja>
Machine2Jobgroup(Model)のマッパーをリロードします。
@param metadata: リロードしたいMetaData
@type metadata: sqlalchemy.schema.MetaData
@param now: now
@type now: Datatime
</comment-ja>
<comment-en>
TODO: English Comment
</comment-en>
"""
t_machine2jobgroup = get_machine2jobgroup_table(metadata, now)
t_machine = metadata.tables["machine"]
t_user = metadata.tables["user"]
mapper(Machine2Jobgroup, t_machine2jobgroup, properties={
'created_user' : relation(karesansui.db.model.user.User,
primaryjoin=t_machine2jobgroup.c.created_user_id==t_user.c.id),
'modified_user' : relation(karesansui.db.model.user.User,
primaryjoin=t_machine2jobgroup.c.modified_user_id==t_user.c.id),
'machine' : relation(karesansui.db.model.machine.Machine,
primaryjoin=t_machine2jobgroup.c.machine_id==t_machine.c.id,
),
})
开发者ID:AdUser,项目名称:karesansui,代码行数:26,代码来源:machine2jobgroup.py
示例10: test_double
def test_double(self):
"""test that a mapper can have two eager relations to the same table, via
two different association tables. aliases are required."""
Place.mapper = mapper(Place, place, properties = {
'thingies':relation(mapper(PlaceThingy, place_thingy), lazy=False)
})
Transition.mapper = mapper(Transition, transition, properties = dict(
inputs = relation(Place.mapper, place_output, lazy=False),
outputs = relation(Place.mapper, place_input, lazy=False),
)
)
tran = Transition('transition1')
tran.inputs.append(Place('place1'))
tran.outputs.append(Place('place2'))
tran.outputs.append(Place('place3'))
sess = create_session()
sess.add(tran)
sess.flush()
sess.expunge_all()
r = sess.query(Transition).all()
self.assert_unordered_result(r, Transition,
{'name': 'transition1',
'inputs': (Place, [{'name':'place1'}]),
'outputs': (Place, [{'name':'place2'}, {'name':'place3'}])
})
开发者ID:clones,项目名称:sqlalchemy,代码行数:29,代码来源:test_manytomany.py
示例11: reload_mapper
def reload_mapper(metadata, now):
"""<comment-ja>
Machine(Model)のマッパーをリロードします。
@param metadata: リロードしたいMetaData
@type metadata: sqlalchemy.schema.MetaData
@param now: now
@type now: Datatime
</comment-ja>
<comment-en>
TODO: English Comment
</comment-en>
"""
t_machine = get_machine_table(metadata, now)
t_machine_tag = metadata.tables['machine2tag']
t_user = metadata.tables['user']
mapper(Machine, t_machine, properties={
'children' : relation(Machine,
backref=backref('parent',
remote_side=[t_machine.c.id])),
'notebook' : relation(karesansui.db.model.notebook.Notebook),
'created_user' : relation(karesansui.db.model.user.User,
primaryjoin=t_machine.c.created_user_id==t_user.c.id),
'modified_user' : relation(karesansui.db.model.user.User,
primaryjoin=t_machine.c.modified_user_id==t_user.c.id),
'tags' : relation(karesansui.db.model.tag.Tag,
secondary=t_machine_tag,
backref="machine"),
})
开发者ID:AdUser,项目名称:karesansui,代码行数:29,代码来源:machine.py
示例12: orm_load_baseordereddataset
def orm_load_baseordereddataset(man):
man.baseordereddataset_table = Table('baseordereddataset', man.metadata,
Column('id', Integer, primary_key=True),
Column('created', DateTime),
Column('label', String(50), nullable=False, unique=True),
Column('type', String(30), nullable=False))
man.ordereditems_table = Table('ordereddata_items', man.metadata,
Column('dataset_id', Integer, ForeignKey('baseordereddataset.id'),
primary_key=True),
Column('item_id', Integer, ForeignKey('basedata.basedata_id'),
primary_key=True),
Column('index', Integer, nullable=False)
)
man.metadata.create_all()
mapper(BaseOrderedDataSet, man.baseordereddataset_table,
polymorphic_on=man.baseordereddataset_table.c.type, polymorphic_identity='base_ordered_dataset',
properties={'data_items': relation(OrderedDataSetItem,
backref='ordered_datasets_items',
cascade='all, delete-orphan',
collection_class=column_mapped_collection(man.ordereditems_table.c.index))
}
)
mapper(OrderedDataSetItem, man.ordereditems_table, properties={
'item': relation(BaseData, lazy='joined', backref='dataitem')
})
开发者ID:dpretty,项目名称:pyfusion,代码行数:28,代码来源:base.py
示例13: setup_orm
def setup_orm():
tables = meta.metadata.tables
columns = tables['group_mailing_list_messages'].c
orm.mapper(GroupMailingListMessage,
tables['group_mailing_list_messages'],
inherits=ContentItem,
polymorphic_identity='mailing_list_message',
polymorphic_on=tables['content_items'].c.content_type,
properties = {
'reply_to': relation(GroupMailingListMessage,
backref=backref('replies'),
foreign_keys=(columns.reply_to_message_machine_id),
primaryjoin=columns.id == columns.reply_to_message_machine_id,
remote_side=(columns.id)),
'thread': relation(GroupMailingListMessage,
post_update=True,
order_by=[asc(columns.sent)],
backref=backref('posts'),
foreign_keys=(columns.thread_message_machine_id),
primaryjoin=columns.id == columns.thread_message_machine_id,
remote_side=(columns.id)),
'author': relation(User,
backref=backref('messages')),
'group': relation(Group,
primaryjoin=(columns.group_id == tables['groups'].c.id)),
'attachments': synonym("files")
})
开发者ID:nous-consulting,项目名称:ututi,代码行数:27,代码来源:mailing.py
示例14: setUp
def setUp(self):
if not sqlalchemy:
self.skipTest("'sqlalchemy' is not available")
# Create DB and map objects
self.metadata = MetaData()
self.engine = create_engine("sqlite:///:memory:", echo=False)
Session = sessionmaker(bind=self.engine)
self.session = Session()
self.tables = {}
self.tables["users"] = Table(
"users", self.metadata, Column("id", Integer, primary_key=True), Column("name", String(64))
)
self.tables["addresses"] = Table(
"addresses",
self.metadata,
Column("id", Integer, primary_key=True),
Column("user_id", Integer, ForeignKey("users.id")),
Column("email_address", String(128)),
)
self.tables["lazy_loaded"] = Table(
"lazy_loaded",
self.metadata,
Column("id", Integer, primary_key=True),
Column("user_id", Integer, ForeignKey("users.id")),
)
self.tables["another_lazy_loaded"] = Table(
"another_lazy_loaded",
self.metadata,
Column("id", Integer, primary_key=True),
Column("user_id", Integer, ForeignKey("users.id")),
)
self.mappers = {}
self.mappers["user"] = mapper(
User,
self.tables["users"],
properties={
"addresses": relation(Address, backref="user", lazy=False),
"lazy_loaded": relation(LazyLoaded, lazy=True),
"another_lazy_loaded": relation(AnotherLazyLoaded, lazy=True),
},
)
self.mappers["addresses"] = mapper(Address, self.tables["addresses"])
self.mappers["lazy_loaded"] = mapper(LazyLoaded, self.tables["lazy_loaded"])
self.mappers["another_lazy_loaded"] = mapper(AnotherLazyLoaded, self.tables["another_lazy_loaded"])
self.metadata.create_all(self.engine)
pyamf.register_class(User, "server.User")
pyamf.register_class(Address, "server.Address")
pyamf.register_class(LazyLoaded, "server.LazyLoaded")
开发者ID:nervatura,项目名称:nerva2py,代码行数:60,代码来源:test_sqlalchemy.py
示例15: test_bidirectional
def test_bidirectional(self):
"""tests a many-to-many backrefs"""
Place.mapper = mapper(Place, place)
Transition.mapper = mapper(Transition, transition, properties = dict(
inputs = relation(Place.mapper, place_output, lazy=True, backref='inputs'),
outputs = relation(Place.mapper, place_input, lazy=True, backref='outputs'),
)
)
t1 = Transition('transition1')
t2 = Transition('transition2')
t3 = Transition('transition3')
p1 = Place('place1')
p2 = Place('place2')
p3 = Place('place3')
t1.inputs.append(p1)
t1.inputs.append(p2)
t1.outputs.append(p3)
t2.inputs.append(p1)
p2.inputs.append(t2)
p3.inputs.append(t2)
p1.outputs.append(t1)
sess = create_session()
sess.add_all((t1, t2, t3,p1, p2, p3))
sess.flush()
self.assert_result([t1], Transition, {'outputs': (Place, [{'name':'place3'}, {'name':'place1'}])})
self.assert_result([p2], Place, {'inputs': (Transition, [{'name':'transition1'},{'name':'transition2'}])})
开发者ID:clones,项目名称:sqlalchemy,代码行数:29,代码来源:test_manytomany.py
示例16: initialize_sql
def initialize_sql(engine):
"""Called by the app on startup to setup bindings to the DB"""
DBSession.configure(bind=engine)
Base.metadata.bind = engine
# only if we are on sqlite do we have this relation
if 'sqlite' in str(DBSession.bind):
if not hasattr(SqliteBmarkFT, 'bmark'):
Bmark.fulltext = relation(SqliteBmarkFT,
backref='bmark',
uselist=False,
cascade="all, delete, delete-orphan",
)
if not hasattr(SqliteContentFT, 'readable'):
Readable.fulltext = relation(SqliteContentFT,
backref='readable',
uselist=False,
cascade="all, delete, delete-orphan",
)
# this is purely to make queries easier. If I've searched the content,
# I want to get back to the hashed->bmark as quickly as possible. Since
# there's only one fulltext result for each hashed anyway, it's ok to
# join it directly without going through the Readable table object
if not hasattr(SqliteContentFT, 'hashed'):
Hashed.fulltext = relation(SqliteContentFT,
backref='hashed',
uselist=False,
primaryjoin=Hashed.hash_id == SqliteContentFT.hash_id,
foreign_keys=[SqliteContentFT.hash_id],
cascade="all, delete, delete-orphan",
)
开发者ID:lmorchard,项目名称:Bookie,代码行数:34,代码来源:__init__.py
示例17: make_tables
def make_tables( me):
t1 = Table('table1', me.meta,
Column('name', Text, ),
Column('id', Integer, primary_key=True),
Column('t2_id', Integer,
ForeignKey('table2.id',)
)
)
t2 = Table('table2', me.meta,
Column('name', Text, ),
Column('id', Integer, primary_key=True),
Column('t1_id', Integer,
ForeignKey('table1.id',
use_alter=True,
name='zt2id_fk'
)
)
)
me.meta.create_all()
mapper( A, t1, properties={
'link': relation( B,
primaryjoin= t1.c.t2_id==t2.c.id,
)
})
mapper( B, t2, properties={
'link': relation( A,
primaryjoin= t2.c.t1_id==t1.c.id,
post_update=True
)
})
开发者ID:hstanev,项目名称:dbcook,代码行数:33,代码来源:ref_AxA.py
示例18: test_pending_expunge
def test_pending_expunge(self):
class Order(_fixtures.Base):
pass
class Item(_fixtures.Base):
pass
class Attribute(_fixtures.Base):
pass
mapper(Attribute, attributes)
mapper(Item, items, properties=dict(
attributes=relation(Attribute, cascade="all,delete-orphan", backref="item")
))
mapper(Order, orders, properties=dict(
items=relation(Item, cascade="all,delete-orphan", backref="order")
))
s = create_session()
order = Order(name="order1")
s.add(order)
attr = Attribute(name="attr1")
item = Item(name="item1", attributes=[attr])
order.items.append(item)
order.items.remove(item)
assert item not in s
assert attr not in s
s.flush()
assert orders.count().scalar() == 1
assert items.count().scalar() == 0
assert attributes.count().scalar() == 0
开发者ID:gajop,项目名称:springgrid,代码行数:33,代码来源:test_cascade.py
示例19: test_one
def test_one(self):
p_m = mapper(Part, parts)
mapper(InheritedPart, inherited_part, properties=dict(
part=relation(Part, lazy=False)))
d_m = mapper(Design, design, properties=dict(
inheritedParts=relation(InheritedPart,
cascade="all, delete-orphan",
backref="design")))
mapper(DesignType, design_types)
d_m.add_property(
"type", relation(DesignType, lazy=False, backref="designs"))
p_m.add_property(
"design", relation(
Design, lazy=False,
backref=backref("parts", cascade="all, delete-orphan")))
d = Design()
sess = create_session()
sess.add(d)
sess.flush()
sess.expunge_all()
x = sess.query(Design).get(1)
x.inheritedParts
开发者ID:gajop,项目名称:springgrid,代码行数:29,代码来源:test_assorted_eager.py
示例20: setup_mappers
def setup_mappers(cls):
mapper(Extra, extra)
mapper(Pref, prefs, properties=dict(
extra = relation(Extra, cascade="all, delete")
))
mapper(User, users, properties = dict(
pref = relation(Pref, lazy=False, cascade="all, delete-orphan", single_parent=True )
))
开发者ID:gajop,项目名称:springgrid,代码行数:8,代码来源:test_cascade.py
注:本文中的sqlalchemy.orm.relation函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论