本文整理汇总了Python中sqlahelper.get_engine函数的典型用法代码示例。如果您正苦于以下问题:Python get_engine函数的具体用法?Python get_engine怎么用?Python get_engine使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_engine函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_multiple_engines_without_default
def test_multiple_engines_without_default(self):
db1 = sa.create_engine(self.db1.url)
db2 = sa.create_engine(self.db2.url)
sqlahelper.add_engine(db1, "db1")
sqlahelper.add_engine(db2, "db2")
# Can we retrieve the engines?
self.assertIs(sqlahelper.get_engine("db1"), db1)
self.assertIs(sqlahelper.get_engine("db2"), db2)
# There should be no default engine
self.assertIsNone(sqlahelper.get_session().bind)
self.assertIsNone(sqlahelper.get_base().metadata.bind)
self.assertIsNone(sqlahelper.get_engine())
开发者ID:pyramid-collective,项目名称:SQLAHelper,代码行数:12,代码来源:tests.py
示例2: test_multiple_engines
def test_multiple_engines(self):
default = sa.create_engine(self.db1.url)
stats = sa.create_engine(self.db2.url)
sqlahelper.add_engine(default)
sqlahelper.add_engine(stats, "stats")
# Can we retrieve the engines?
self.assertIs(sqlahelper.get_engine(), default)
self.assertIs(sqlahelper.get_engine("default"), default)
self.assertIs(sqlahelper.get_engine("stats"), stats)
# Are the session binding and base binding set correctly?
self.assertIs(sqlahelper.get_session().bind, default)
self.assertIs(sqlahelper.get_base().metadata.bind, default)
开发者ID:pyramid-collective,项目名称:SQLAHelper,代码行数:12,代码来源:tests.py
示例3: setUp
def setUp(self):
import sqlahelper
import transaction
from c2cgeoportal.models import DBSession, Role, User, Functionality
from c2cgeoportal.lib.dbreflection import init
role1 = Role(name=u'__test_role1')
user1 = User(
username=u'__test_user1',
password=u'__test_user1',
role=role1
)
role2 = Role(name=u'__test_role2')
user2 = User(
username=u'__test_user2',
password=u'__test_user2',
role=role2
)
functionality1 = Functionality(u'__test_s', u'db')
functionality2 = Functionality(u'__test_a', u'db1')
functionality3 = Functionality(u'__test_a', u'db2')
user2.functionalities = [functionality1, functionality2, functionality3]
DBSession.add(user1)
DBSession.add(user2)
transaction.commit()
engine = sqlahelper.get_engine()
init(engine)
开发者ID:CDTRH,项目名称:c2cgeoportal,代码行数:30,代码来源:test_functionalities.py
示例4: initialized
def initialized(ev):
PTAH = ptah.get_settings(ptah.CFG_ID_PTAH, ev.registry)
# mail
PTAH['Mailer'] = DummyMailer()
PTAH['full_email_address'] = formataddr(
(PTAH['email_from_name'], PTAH['email_from_address']))
# sqla
SQLA = ptah.get_settings(ptah.CFG_ID_SQLA, ev.registry)
url = SQLA['url']
if url:
engine_args = {}
if SQLA['cache']:
cache = {}
engine_args['execution_options'] = \
{'compiled_cache': cache}
SQLA['sqlalchemy_cache'] = cache
try:
engine = sqlahelper.get_engine()
except: # pragma: no cover
engine = sqlalchemy.engine_from_config(
{'sqlalchemy.url': url}, 'sqlalchemy.', **engine_args)
sqlahelper.add_engine(engine)
# ptah manage
if PTAH['manage']:
ev.config.add_route(
'ptah-manage', '/ptah-manage/*traverse',
factory=ptah.manage.PtahManageRoute, use_global_views=True)
ptah.manage.set_access_manager(
ptah.manage.PtahAccessManager())
开发者ID:runyaga,项目名称:ptah,代码行数:32,代码来源:ptahsettings.py
示例5: get_info_from_mymaps
def get_info_from_mymaps(self, rows, attributes_to_remove):
features = []
ids = []
for row in rows:
category_id = row['category_id']
map_id = row['map_id']
cur_id = str(map_id) + "--" + str(category_id)
if cur_id not in ids:
ids.append(cur_id)
engine = sqlahelper.get_engine("mymaps")
query = "select ST_AsGeoJSON(ST_Collect (geometry)) as geometry\
, sum(ST_Length(geometry)) as length FROM\
public.feature_with_map_with_colors where\
category_id = %(category_id)d and map_id = '%(map_id)s'"\
% {'category_id': category_id, 'map_id': map_id}
res = engine.execute(query)
for feature in res.fetchall():
geometry = geojson_loads(feature['geometry'])
attributes = dict(row)
attributes['length'] = round(feature['length'] / 1000, 2)
self.remove_attributes(attributes,
attributes_to_remove,
"geometry")
features.append(self.to_feature(geometry, attributes, ""))
return features
开发者ID:ochriste,项目名称:geoportailv3,代码行数:27,代码来源:getfeatureinfo.py
示例6: setUp
def setUp(self):
import transaction
import sqlahelper
from sqlalchemy import Column, types, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base
from c2cgeoportal.models import DBSession
from c2cgeoportal.lib.dbreflection import _association_proxy
engine = sqlahelper.get_engine()
Base = declarative_base(bind=engine)
class Child(Base):
__tablename__ = 'child'
id = Column(types.Integer, primary_key=True)
name = Column(types.Unicode)
def __init__(self, name):
self.name = name
class Parent(Base):
__tablename__ = 'parent'
id = Column(types.Integer, primary_key=True)
child1_id = Column(types.Integer, ForeignKey('child.id'))
child2_id = Column(types.Integer, ForeignKey('child.id'))
child1_ = relationship(Child, primaryjoin=(child1_id==Child.id))
child1 = _association_proxy('child1_', 'name')
child2_ = relationship(Child, primaryjoin=(child2_id==Child.id))
child2 = _association_proxy('child2_', 'name')
Base.metadata.create_all()
DBSession.add_all([Child('foo'), Child('bar')])
transaction.commit()
self.metadata = Base.metadata
self.cls = Parent
开发者ID:tonio,项目名称:c2cgeoportal,代码行数:30,代码来源:test_dbreflection.py
示例7: setUp
def setUp(self): # noqa
import sqlahelper
import transaction
from c2cgeoportal.models import DBSession, Role, User, Functionality
from c2cgeoportal.lib.dbreflection import init
role1 = Role(name=u"__test_role1")
user1 = User(
username=u"__test_user1",
password=u"__test_user1",
role=role1
)
role2 = Role(name=u"__test_role2")
user2 = User(
username=u"__test_user2",
password=u"__test_user2",
role=role2
)
functionality1 = Functionality(u"__test_s", u"db")
functionality2 = Functionality(u"__test_a", u"db1")
functionality3 = Functionality(u"__test_a", u"db2")
role2.functionalities = [functionality1, functionality2, functionality3]
DBSession.add_all([user1, user2, role1, role2])
transaction.commit()
engine = sqlahelper.get_engine()
init(engine)
开发者ID:craxxkid,项目名称:c2cgeoportal,代码行数:29,代码来源:test_functionalities.py
示例8: main
def main(): # pragma: nocover
env = bootstrap("development.ini")
from geoportailv3.models import LuxGetfeatureDefinition
package = env["registry"].settings["package"]
directory = "%s/locale/" % package
destination = path.join(directory, "%s-tooltips.pot" % package)
w = codecs.open(destination, "wt", encoding="utf-8")
w.write(
u'''#, fuzzy
msgid ""
msgstr ""
"MIME-Version: 1.0\\n"
"Content-Type: text/plain; charset=utf-8\\n"
"Content-Transfer-Encoding: 8bit\\n"
'''
)
dbsession = sqlahelper.get_session()
results = dbsession.query(LuxGetfeatureDefinition).\
filter(LuxGetfeatureDefinition.remote_template == False).\
filter(LuxGetfeatureDefinition.template == 'default_gisgr.html').all() # noqa
fields = []
for result in results:
engine = None
first_row = None
if result.query is not None and len(result.query) > 0:
engine = sqlahelper.get_engine(result.engine)
first_row = engine.execute("SELECT * FROM " + result.query).first()
if result.rest_url is not None and len(result.rest_url) > 0:
first_row = _get_external_data(
result.rest_url,
'96958.90059551848,61965.61097091329,' +
'97454.77280739773,62463.21618929457', result.layer)
if first_row is not None:
attributes = dict(first_row)
attributes = remove_attributes(
attributes,
result.attributes_to_remove,
result.geometry_column)
for attribute in attributes:
if attribute not in fields:
fields.append(attribute)
w.write(
u'''#: engine:%(engine)s Layer:%(layer)s Role:%(role)s
msgid "f_%(name)s"
msgstr ""
''' % {
"engine": result.engine_gfi,
"layer": result.layer,
"role": result.role,
"name": attribute,
}
)
print("tooltips Pot file updated: %s" % destination)
开发者ID:geoportallux,项目名称:geoportailv3-gisgr,代码行数:59,代码来源:tooltips2pot.py
示例9: setUp
def setUp(self): # noqa
import sqlahelper
from c2cgeoportal.lib.dbreflection import init
self.metadata = None
engine = sqlahelper.get_engine()
init(engine)
开发者ID:camptocamp,项目名称:c2cgeoportal,代码行数:8,代码来源:test_dbreflection.py
示例10: setUp
def setUp(self):
try:
engine = sqlahelper.get_engine()
except: # pragma: no cover
engine = sqlalchemy.engine_from_config({"sqlalchemy.url": "sqlite://"})
sqlahelper.add_engine(engine)
self._setup_pyramid()
开发者ID:WouterVH,项目名称:ptah,代码行数:8,代码来源:base.py
示例11: test_ptahinit_sqla
def test_ptahinit_sqla(self):
self._settings = {'sqla.url': 'sqlite://'}
self.init_ptah()
import sqlahelper
engine = sqlahelper.get_engine()
self.assertIsNotNone(engine)
开发者ID:runyaga,项目名称:ptah,代码行数:8,代码来源:test_ptahsettings.py
示例12: setUp
def setUp(self):
try:
engine = sqlahelper.get_engine()
except:
engine = sqlalchemy.engine_from_config(
{'sqlalchemy.url': 'sqlite://'})
sqlahelper.add_engine(engine)
self._setup_pyramid()
self._setup_ptah()
开发者ID:WouterVH,项目名称:ptah,代码行数:10,代码来源:base.py
示例13: loginchange
def loginchange(self):
new_password = self.request.params.get('newPassword', None)
new_password_confirm = self.request.params.get('confirmNewPassword', None)
if new_password is None or new_password_confirm is None:
raise HTTPBadRequest('"newPassword" and "confirmNewPassword" should be \
available in request params')
# check if loggedin
if not self.request.user:
raise HTTPUnauthorized('bad credentials')
if new_password != new_password_confirm:
raise HTTPBadRequest("the new password and the new password \
confirmation don't match")
u = self.request.user
u._set_password(new_password)
u.is_password_changed = True
DBSession.flush()
log.info("password changed for user: %s" % self.request.user.username)
# handle replication
if 'auth_replication_enabled' in self.request.registry.settings and \
self.request.registry.settings['auth_replication_enabled'] == \
'true': # pragma: no cover
try:
log.debug("trying to find if engine set for replication exists")
engine = sqlahelper.get_engine('replication')
except RuntimeError:
log.debug("engine for replication doesn't exist yet, trying \
to create")
engine = engine_from_config(
self.request.registry.settings,
'sqlalchemy_replication.')
sqlahelper.add_engine(engine, 'replication')
DBSession2 = scoped_session(sessionmaker(bind=engine))
dbuser_r = DBSession2.query(User).filter(User.id == self.request.user.id)
result = dbuser_r.all()
if len(result) == 0:
msg = 'user not found in replication target database: %s' \
% self.request.user.username
log.exception(msg)
return HTTPBadRequest(msg) # pragma nocover
else:
u_r = dbuser_r.all()[0]
u_r._set_password(new_password)
u_r.is_password_changed = True
DBSession2.commit()
log.info("password changed in replication target database \
for user: %s" % self.request.user.username)
return Response('true', cache_control="no-cache")
开发者ID:pgiraud,项目名称:c2cgeoportal,代码行数:53,代码来源:entry.py
示例14: _create_table
def _create_table(self, tablename):
""" Test functions use this function to create a table object.
Each test function should call this function only once. And
there should not be two test functions that call this function
with the same ptable_name value.
"""
import sqlahelper
from sqlalchemy import Table, Column, ForeignKey, types
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from geoalchemy2 import Geometry, func
engine = sqlahelper.get_engine()
Base = declarative_base(bind=engine) # noqa
session = sessionmaker(bind=engine)()
postgis_version = session.execute(func.postgis_version()).scalar()
management = postgis_version.startswith("1.")
if self._tables is None:
self._tables = []
ctable = Table(
"{0!s}_child".format(tablename), Base.metadata,
Column("id", types.Integer, primary_key=True),
Column("name", types.Unicode),
schema="public"
)
ctable.create()
self._tables.append(ctable)
ptable = Table(
tablename, Base.metadata,
Column("id", types.Integer, primary_key=True),
Column(
"child1_id", types.Integer,
ForeignKey("public.{0!s}_child.id".format(tablename))
),
Column(
"child2_id", types.Integer,
ForeignKey("public.{0!s}_child.id".format(tablename))
),
Column("point", Geometry("POINT", management=management)),
Column("linestring", Geometry("LINESTRING", management=management)),
Column("polygon", Geometry("POLYGON", management=management)),
Column("multipoint", Geometry("MULTIPOINT", management=management)),
Column("multilinestring", Geometry("MULTILINESTRING", management=management)),
Column("multipolygon", Geometry("MULTIPOLYGON", management=management)),
schema="public"
)
ptable.create()
self._tables.append(ptable)
self.metadata = Base.metadata
开发者ID:camptocamp,项目名称:c2cgeoportal,代码行数:53,代码来源:test_dbreflection.py
示例15: sqla_initializing
def sqla_initializing(ev):
url = SQLA.url
if url:
engine_args = {}
if SQLA.cache:
engine_args['execution_options'] = \
{'compiled_cache': SQL_compiled_cache}
try:
engine = sqlahelper.get_engine()
except:
engine = sqlalchemy.engine_from_config(
{'sqlalchemy.url': url}, 'sqlalchemy.', **engine_args)
sqlahelper.add_engine(engine)
开发者ID:mcdonc,项目名称:ptah,代码行数:13,代码来源:app.py
示例16: setUp
def setUp(self):
if self._init_sqla:
try:
engine = sqlahelper.get_engine()
except: # pragma: no cover
engine = sqlalchemy.engine_from_config(
{'sqlalchemy.url': 'sqlite://'})
sqlahelper.add_engine(engine)
self.init_pyramid()
if self._init_ptah: # pragma: no cover
self.init_ptah()
开发者ID:runyaga,项目名称:ptah,代码行数:13,代码来源:testing.py
示例17: initializing
def initializing(ev):
# auth
if not SECURITY.secret:
SECURITY.secret = uuid.uuid4().get_hex()
pname = SECURITY.policy
if pname not in ('', 'no-policy'):
policyFactory, attrs, kw = types[pname]
settings = []
for attr in attrs:
settings.append(SECURITY.get(attr))
kwargs = {'wild_domain': False}
for attr in kw:
kwargs[attr] = SECURITY.get(attr)
policy = policyFactory(*settings, **kwargs)
config.registry.registerUtility(policy, IAuthenticationPolicy)
if SECURITY.authorization:
config.registry.registerUtility(
ACLAuthorizationPolicy(), IAuthorizationPolicy)
# mail
smtp_mailer = SMTPMailer(
hostname = MAIL.host,
port = MAIL.port,
username = MAIL.username or None,
password = MAIL.password or None,
no_tls = MAIL.no_tls,
force_tls = MAIL.force_tls,
debug_smtp = MAIL.debug)
MAIL.Mailer = DirectMailDelivery(smtp_mailer)
MAIL.full_from_address = formataddr((MAIL.from_name, MAIL.from_address))
# sqla
url = SQLA.url
if url:
engine_args = {}
if SQLA.cache:
engine_args['execution_options'] = \
{'compiled_cache': SQL_compiled_cache}
try:
engine = sqlahelper.get_engine()
except:
engine = sqlalchemy.engine_from_config(
{'sqlalchemy.url': url}, 'sqlalchemy.', **engine_args)
sqlahelper.add_engine(engine)
开发者ID:blaflamme,项目名称:ptah,代码行数:50,代码来源:srvinit.py
示例18: db_chooser_tween
def db_chooser_tween(request):
session = DBSession()
old = session.bind
method_path = "{0!s} {1!s}".format(request.method, request.path)
force_master = any(r.match(method_path) for r in master_paths)
if not force_master and (request.method in ("GET", "OPTIONS") or
any(r.match(method_path) for r in slave_paths)):
log.debug("Using slave database for: " + method_path)
session.bind = sqlahelper.get_engine("slave")
else:
log.debug("Using master database for: " + method_path)
try:
return handler(request)
finally:
session.bind = old
开发者ID:camptocamp,项目名称:c2cgeoportal,代码行数:16,代码来源:models.py
示例19: setUp
def setUp(self):
import sqlahelper
import transaction
from c2cgeoportal.models import DBSession, Role, User
from c2cgeoportal.lib.dbreflection import init
self.metadata = None
self.layer_ids = []
self.role = Role(name=u'__test_role')
self.user = User(username=u'__test_user',
password=u'__test_user',
role=self.role
)
DBSession.add(self.user)
transaction.commit()
engine = sqlahelper.get_engine()
init(engine)
开发者ID:bbinet,项目名称:c2cgeoportal,代码行数:20,代码来源:test_layers.py
示例20: get_poi_template
def get_poi_template(self):
layer = self.request.params.get('layer', None)
if layer is None:
return HTTPBadRequest()
self.dbsession = sqlahelper.get_session()
luxgetfeaturedefinitions = self.get_lux_feature_definition(layer)
if len(luxgetfeaturedefinitions) is not 1:
return HTTPBadRequest()
if luxgetfeaturedefinitions[0].poi_id_collection is None:
return HTTPBadRequest()
engine = sqlahelper.get_engine(luxgetfeaturedefinitions[0].engine)
poi_fields = ["id", "id_collection", "line_num", "easting", "northing",
"zip", "town", "street", "poi_name", "description",
"type", "url", "active", "core_data", "master_id",
"data_type", "synchro_date", "creation_date", "num",
"matching_address", "accuracy", "ratio",
"master_collection_id", "file_id", "x", "y",
"user_field1", "user_field2", "user_field3",
"user_field4", "user_field5", "phone", "mail",
"id_poi", "country", "category"]
query = "SELECT mapv3_html FROM public.tooltips WHERE \
id_collection=%d AND is_default=true"\
% (luxgetfeaturedefinitions[0].poi_id_collection)
res = engine.execute(query)
for row in res.fetchall():
content = row['mapv3_html'].replace("$%7B", "${").\
replace("%7D", "}")
for field in poi_fields:
content = content.replace("${%(field)s}" % ({'field': field}),
"{{feature['attributes']\
['%(field)s']}}"
% ({'field': field}))
response = "<h1>{{layers['layerLabel'] | translate}}</h1>\
<div class=\"poi-feature\"\
ng-repeat=\"feature in layers['features']\">%s\
</div>" % (content)
return Response(response)
return HTTPBadRequest()
开发者ID:ochriste,项目名称:geoportailv3,代码行数:41,代码来源:getfeatureinfo.py
注:本文中的sqlahelper.get_engine函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论