本文整理汇总了Python中sqlalchemy.ext.automap.automap_base函数的典型用法代码示例。如果您正苦于以下问题:Python automap_base函数的具体用法?Python automap_base怎么用?Python automap_base使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了automap_base函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_data
def get_data():
#Connection to the database
config = ConfigParser.ConfigParser()
config.read(os.path.join(os.path.dirname(__file__),'../vars/vars.cfg'))
username = config.get('database','username')
password = config.get('database','password')
Base = automap_base()
engine = create_engine("postgresql://{username}:{password}@localhost/PostGIS".\
format(username = username,password = password),
client_encoding = 'utf8', echo = False,)
metadata = MetaData()
metadata.reflect(engine)
db = automap_base(metadata=metadata)
db.prepare()
session = Session(engine)
query = session.query(db.classes.cv_entries.cv_id,\
db.classes.cv_entries.group,\
db.classes.cv_entries.start,\
db.classes.cv_entries.end,\
db.classes.cv_entries.heading,\
db.classes.cv_entries.markdown,\
db.classes.cv_entries.summary,\
func.ST_AsGeoJSON(func.ST_Transform(db.classes.cv_entries.geom, 3857)))\
.order_by(desc(db.classes.cv_entries.start))
geojson_data = []
for i in query:
cv_id = i[0]
group = i[1]
summary = i[6]
geom = geojson.loads(i[7])
feat = geojson.Feature(geometry=geom, properties={'cv_id': cv_id,'group': group,'summary': summary})
geojson_data.append(feat)
crs = {
"type": "name",
"properties": {
"name": "EPSG:3857"
}
}
cv_geojson = geojson.FeatureCollection(geojson_data,crs=crs)
attributes = [(i[0],i[1],i[2],i[3],i[4],i[5]) for i in query]
return geojson.dumps(cv_geojson),attributes
开发者ID:Acurus,项目名称:Kyrdalen,代码行数:50,代码来源:cv.py
示例2: __init__
def __init__(self, verbose=0, *args, **kwds): # @UnusedVariable
super(SbciFinanceDB, self).__init__(*args, **kwds)
if not os.access(FINANCEDB_FILE, os.R_OK | os.W_OK):
raise RuntimeError('cannot access Finance DB file ({}) for R/W!'
.format(FINANCEDB_FILE))
self.Base = automap_base()
self.engine = create_engine('sqlite:///' + FINANCEDB_FILE)
self.Base.prepare(self.engine, reflect=True)
self.Categories = self.Base.classes.categories
self.Seasons = self.Base.classes.seasons
self.Cheques = self.Base.classes.cheques
self.Transactions = self.Base.classes.transactions
self.Trybooking = self.Base.classes.trybooking
self.dbsession = Session(self.engine)
self.categories_query = self.dbsession.query(self.Categories)
self.seasons_query = self.dbsession.query(self.Seasons)
self.cheques_query = self.dbsession.query(self.Cheques)
self.transactions_query = self.dbsession.query(self.Transactions)
self.trybooking_query = self.dbsession.query(self.Trybooking)
开发者ID:mjjensen,项目名称:shootersbasketball,代码行数:26,代码来源:financedb.py
示例3: base_app
def base_app():
global app
app = Flask('backend')
app.config.overrides = {}
logging.info('Connecting to database...')
engine = create_engine(backend.settings.db_connection)
app.engine = engine
base = automap_base()
base.prepare(engine, reflect=True)
session = Session(engine)
app.base = base
app.session = session
app.Decl_Base = declarative_base()
from backend.models import Members
app.Decl_Base.metadata.create_all(app.engine)
# Create the Flask-Restless API manager.
app.api_manager = flask.ext.restless.APIManager(app, session=app.session)
app.api_manager.create_api(Members, methods=['GET', 'POST', 'PATCH', 'DELETE'], collection_name='members')
return app
@app.route("/shutdown", methods=["POST"]) # pragma: no cover
def shutdown(): # pragma: no cover
logging.info('shutting down server')
shutdown_server()
return "server shutting down"
开发者ID:faulteh,项目名称:memberdb-ng,代码行数:30,代码来源:__init__.py
示例4: run
def run(self):
try:
print("Please waiting... Analyzing source database... please wait...")
## AutoMap
self.metadata.reflect(self.engine) # get columns from existing table
Base = automap_base(bind=self.engine, metadata=self.metadata)
Base.prepare(self.engine, reflect=True)
MetaTable = Base.metadata.tables
thread_list = []
## From all tables found in database
for table in MetaTable.keys():
if '.' in table:
table = str(table).split('.')[1]
if len(config.source['tables']['exclude_tables']) > 0:
if exactyMatchList(config.source['tables']['exclude_tables'], table):
continue
## From config.py custom_tables
if len(config.source['tables']['custom_tables']) > 0:
if not exactyMatchList(config.source['tables']['custom_tables'], table):
continue
print("Preparing thread to table %s " % table)
t = threading.Thread(target=self.export2RedShift, name='thread-' + table, args=(table,))
thread_list.append(t)
thread_control(thread_list, self.cfg_thread_number)
print("Finish...")
except (SQLAlchemyError, Exception) as e:
print("Error: %s" % e)
开发者ID:csmanioto,项目名称:power-leech,代码行数:33,代码来源:main.py
示例5: reflect_model
def reflect_model(self, table_name, bind_key=None):
""" 反向生成 ORM 的 Model
:param table_name:
:param bind_key:
:return: ORMClass
"""
with self._reflect_lock:
if table_name in self._models:
return self._models[table_name]
engine = self.get_engine(bind_key)
meta = MetaData(bind=engine)
meta.reflect(only=[table_name])
table = meta.tables[table_name]
self._tables[table_name] = table
Base = automap_base(metadata=meta)
Base.prepare()
model = getattr(Base.classes, table_name)
model.__table__.metadata = None
self._models[table_name] = model
return model
开发者ID:qianka,项目名称:qianka-sqlalchemy,代码行数:25,代码来源:sqlalchemy.py
示例6: create_app
def create_app(database_uri):
"""
Creates a new flask app that exposes the database
provided as a ReSTful Application
:param str|unicode|sqlalchemy.engine.url.URL database_uri: The database
URI in a manner that SQLAlchemy can understand
:return: A flask app that exposes a database as
a ReSTful API that can be accessed using either
the Hal or SIREN protocol
:rtype: Flask
"""
# Create the flask application
app = Flask(__name__)
# Setup SQLAlchemy to reflect the database
engine = create_engine(database_uri)
base = automap_base()
base.prepare(engine, reflect=True)
# Create the ripozo dispatcher and register the response formats
dispatcher = FlaskDispatcher(app)
dispatcher.register_adapters(adapters.HalAdapter, adapters.SirenAdapter)
session_handler = ScopedSessionHandler(engine)
# Create and register resources from the sqlalchemy models
# We need to pass ``append_slash=True`` due to a quirk in how flask handles routing
resources = [create_resource(model, session_handler, append_slash=True) for model in base.classes]
dispatcher.register_resources(*resources)
return app
开发者ID:rmoorman,项目名称:ripozo-oasis,代码行数:30,代码来源:api_builder.py
示例7: __prepare__
def __prepare__(self):
# create declarative base class
self.base = automap_base()
# create declarative classes from dbms
self.base.prepare(self.engine, reflect=True)
# create session for later use
self.session = Session(self.engine)
开发者ID:sgeyer-tgm,项目名称:WahlAnalyse,代码行数:7,代码来源:database.py
示例8: __init__
def __init__(self, engine):
metadata = MetaData(engine)
self._define_tables_without_primary_keys(metadata)
self.base = automap_base(metadata=metadata)
self.base.prepare(engine, reflect=True)
开发者ID:geometalab,项目名称:OSMNames,代码行数:7,代码来源:tables.py
示例9: test_relationship_pass_params
def test_relationship_pass_params(self):
Base = automap_base(metadata=self.metadata)
mock = Mock()
def _gen_relationship(
base, direction, return_fn, attrname, local_cls, referred_cls, **kw
):
mock(base, direction, attrname)
return generate_relationship(
base,
direction,
return_fn,
attrname,
local_cls,
referred_cls,
**kw
)
Base.prepare(generate_relationship=_gen_relationship)
assert set(tuple(c[1]) for c in mock.mock_calls).issuperset(
[
(Base, interfaces.MANYTOONE, "nodes"),
(Base, interfaces.MANYTOMANY, "keywords_collection"),
(Base, interfaces.MANYTOMANY, "items_collection"),
(Base, interfaces.MANYTOONE, "users"),
(Base, interfaces.ONETOMANY, "addresses_collection"),
]
)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:29,代码来源:test_automap.py
示例10: init
def init():
Base = automap_base()
engine = create_engine(config.db_url)
Base.prepare(engine, reflect=True)
session = Session(engine)
add_role(Base, session)
add_user(Base, session)
开发者ID:gobabiertoAR,项目名称:dashboard-transito,代码行数:7,代码来源:008_tabla_users_roles.py
示例11: test_naming_schemes
def test_naming_schemes(self):
Base = automap_base(metadata=self.metadata)
def classname_for_table(base, tablename, table):
return str("cls_" + tablename)
def name_for_scalar_relationship(
base, local_cls, referred_cls, constraint):
return "scalar_" + referred_cls.__name__
def name_for_collection_relationship(
base, local_cls, referred_cls, constraint):
return "coll_" + referred_cls.__name__
Base.prepare(
classname_for_table=classname_for_table,
name_for_scalar_relationship=name_for_scalar_relationship,
name_for_collection_relationship=name_for_collection_relationship
)
User = Base.classes.cls_users
Address = Base.classes.cls_addresses
u1 = User()
a1 = Address()
u1.coll_cls_addresses.append(a1)
assert a1.scalar_cls_users is u1
开发者ID:cpcloud,项目名称:sqlalchemy,代码行数:27,代码来源:test_automap.py
示例12: get_taxa_photo_count
def get_taxa_photo_count(session, metadata):
"""Return the photo count for each (genus, section, species) combination.
Taxa are returned as 4-tuples ``(genus, section, species, photo_count)``.
"""
Base = automap_base(metadata=metadata)
Base.prepare()
configure_mappers()
Photo = Base.classes.photos
Taxon = Base.classes.taxa
Rank = Base.classes.ranks
stmt_genus = session.query(Photo.id, Taxon.name.label('genus')).\
join(Photo.taxa_collection, Taxon.ranks).\
filter(Rank.name == 'genus').subquery()
stmt_section = session.query(Photo.id, Taxon.name.label('section')).\
join(Photo.taxa_collection, Taxon.ranks).\
filter(Rank.name == 'section').subquery()
stmt_species = session.query(Photo.id, Taxon.name.label('species')).\
join(Photo.taxa_collection, Taxon.ranks).\
filter(Rank.name == 'species').subquery()
q = session.query('genus', 'section', 'species',
functions.count(Photo.id).label('photos')).\
select_from(Photo).\
join(stmt_genus, stmt_genus.c.id == Photo.id).\
outerjoin(stmt_section, stmt_section.c.id == Photo.id).\
join(stmt_species, stmt_species.c.id == Photo.id).\
group_by('genus', 'section', 'species')
return q
开发者ID:xieyanfu,项目名称:nbclassify,代码行数:34,代码来源:db.py
示例13: setupConnection
def setupConnection(self):
LOG.info('setupConnection:'+self.sql_url)
if self._scoped_session:
self._scoped_session.remove()
try:
engine = create_engine(self.sql_url, encoding='utf8', echo=False, pool_recycle=1)
except:
exc_type, exc_value, exc_traceback = sys.exc_info()
lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
LOG.error(''.join('!! ' + line for line in lines))
return
self._insp = reflection.Inspector.from_engine(engine)
self.d_base = declarative_base(bind=engine)
a_base = automap_base(bind=engine)
try:
a_base.metadata.reflect(views=True)
self.name = unicode(self.sql_url)
except:
LOG.info('Unable to reflect the whole DB!')
exc_type, exc_value, exc_traceback = sys.exc_info()
lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
for line in lines:
LOG.info(line)
a_base.metadata.reflect(views=True, only=[self.sql_table])
self.restricted = True
self.name = unicode(self.sql_url+'+'+self.sql_table)
a_base.prepare(a_base.metadata.bind, name_for_collection_relationship=unique_collection)
self.a_base = a_base
self._scoped_session = scoped_session(sessionmaker(bind=self.a_base.metadata.bind, extension=ZopeTransactionExtension(keep_session=True), autocommit=True))
开发者ID:Martronic-SA,项目名称:collective.behavior.sql,代码行数:29,代码来源:content.py
示例14: get_photos_with_taxa
def get_photos_with_taxa(session, metadata):
"""Return photos with genus, section, and species class.
This generator returns 4-tuples ``(photo, genus, section, species)``.
"""
Base = automap_base(metadata=metadata)
Base.prepare()
configure_mappers()
Photo = Base.classes.photos
Taxon = Base.classes.taxa
Rank = Base.classes.ranks
stmt_genus = session.query(Photo.id, Taxon.name.label('genus')).\
join(Photo.taxa_collection, Taxon.ranks).\
filter(Rank.name == 'genus').subquery()
stmt_section = session.query(Photo.id, Taxon.name.label('section')).\
join(Photo.taxa_collection, Taxon.ranks).\
filter(Rank.name == 'section').subquery()
stmt_species = session.query(Photo.id, Taxon.name.label('species')).\
join(Photo.taxa_collection, Taxon.ranks).\
filter(Rank.name == 'species').subquery()
q = session.query(Photo, 'genus', 'section', 'species').\
join(stmt_genus, stmt_genus.c.id == Photo.id).\
outerjoin(stmt_section, stmt_section.c.id == Photo.id).\
join(stmt_species, stmt_species.c.id == Photo.id)
return q
开发者ID:xieyanfu,项目名称:nbclassify,代码行数:31,代码来源:db.py
示例15: _automap
def _automap(self, e):
Base = automap_base()
Base.prepare(e, reflect=True)
time.sleep(.01)
configure_mappers()
开发者ID:cpcloud,项目名称:sqlalchemy,代码行数:7,代码来源:test_automap.py
示例16: collect_bnetza_data
def collect_bnetza_data():
"""
Collects data of wind turbines available in the BNetzA Anlagenstammdaten.
Parameters
----------
Returns
-------
plants : DataFrame of all wind generators in BNetzA
"""
meta.reflect(bind=conn, schema='model_draft',
only=['bnetza_eeg_anlagenstammdaten_wind_classification'])
Base = automap_base(metadata=meta)
Base.prepare()
Bnetza = Base.classes.bnetza_eeg_anlagenstammdaten_wind_classification
query = session.query(Bnetza.installierte_leistung,
Bnetza.wea_manufacturer,
Bnetza.wea_type,
Bnetza.nabenhöhe,
Bnetza.rotordurchmesser).filter(Bnetza.seelage == None)
plants = [(installierte_leistung,
wea_manufacturer,
wea_type,
nabenhöhe,
rotordurchmesser)
for installierte_leistung, wea_manufacturer, wea_type, nabenhöhe, rotordurchmesser
in query.all()]
plants.sort(key=itemgetter(0))
columns = ['capacity', 'manufacturer', 'type', 'hub', 'rotor']
plants = pd.DataFrame(plants, columns=columns)
return plants
开发者ID:openego,项目名称:data_processing,代码行数:35,代码来源:simple_feedin.py
示例17: collect_ego_turbines
def collect_ego_turbines():
"""
Collects data of wind turbines used in the eGo database.
Parameters
----------
Returns
-------
generators : capacity of turbines used in the eGo database
"""
meta.reflect(bind=conn, schema='model_draft',
only=['ego_dp_supply_res_powerplant'])
Base = automap_base(metadata=meta)
Base.prepare()
Dp = Base.classes.ego_dp_supply_res_powerplant
query = session.query(Dp.electrical_capacity).\
filter(and_(Dp.generation_subtype == 'wind_onshore',\
Dp.electrical_capacity < 7600 ,\
Dp.start_up_date > '1998-01-01 00:00:00',\
Dp.start_up_date < '2018-01-01 00:00:00'))
Gens = [(electrical_capacity) for electrical_capacity in query.all()]
generators = []
for i in range(0, len(Gens)):
generators.append(float(Gens[i][0]))
generators.sort()
return generators
开发者ID:openego,项目名称:data_processing,代码行数:31,代码来源:simple_feedin.py
示例18: main
def main(src):
Base = automap_base()
engine = create_engine(src)
Base.prepare(engine, reflect=True)
collector = Collector(Resolver())
d = collector.collect(Base.classes)
loading.dumpfile(d, format="json")
开发者ID:podhmo,项目名称:individual-sandbox,代码行数:7,代码来源:01gen.py
示例19: handle
def handle(self, *args, **options):
engine = create_engine(get_default_db_string(), convert_unicode=True)
metadata = MetaData()
app_config = apps.get_app_config('content')
# Exclude channelmetadatacache in case we are reflecting an older version of Kolibri
table_names = [model._meta.db_table for name, model in app_config.models.items() if name != 'channelmetadatacache']
metadata.reflect(bind=engine, only=table_names)
Base = automap_base(metadata=metadata)
# TODO map relationship backreferences using the django names
Base.prepare()
session = sessionmaker(bind=engine, autoflush=False)()
# Load fixture data into the test database with Django
call_command('loaddata', 'content_import_test.json', interactive=False)
def get_dict(item):
value = {key: value for key, value in item.__dict__.items() if key != '_sa_instance_state'}
return value
data = {}
for table_name, record in Base.classes.items():
data[table_name] = [get_dict(r) for r in session.query(record).all()]
with open(SCHEMA_PATH_TEMPLATE.format(name=options['version']), 'wb') as f:
pickle.dump(metadata, f, protocol=2)
with open(DATA_PATH_TEMPLATE.format(name=options['version']), 'w') as f:
json.dump(data, f)
开发者ID:indirectlylit,项目名称:kolibri,代码行数:32,代码来源:generate_schema.py
示例20: init_conn
def init_conn():
connection_string = 'postgres://postgres:[email protected]:5432/adna'
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import Session
from sqlalchemy.event import listens_for
from sqlalchemy.schema import Table
from sqlalchemy import create_engine, Column, DateTime, MetaData, Table
from datetime import datetime
engine = create_engine(connection_string)
metadata = MetaData()
metadata.reflect(engine, only=['results', 'job'])
Table('results', metadata,
Column('createdAt', DateTime, default=datetime.now),
Column('updatedAt', DateTime, default=datetime.now,
onupdate=datetime.now),
extend_existing=True)
Table('job', metadata,
Column('createdAt', DateTime, default=datetime.now),
Column('updatedAt', DateTime, default=datetime.now,
onupdate=datetime.now),
extend_existing=True)
Base = automap_base(metadata=metadata)
Base.prepare()
global Results, Job, session
Results, Job = Base.classes.results, Base.classes.job
session = Session(engine)
开发者ID:theboocock,项目名称:ancient-dna,代码行数:35,代码来源:interface.py
注:本文中的sqlalchemy.ext.automap.automap_base函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论