本文整理汇总了Python中sqlalchemy.sql.text函数的典型用法代码示例。如果您正苦于以下问题:Python text函数的具体用法?Python text怎么用?Python text使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了text函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: upgrade
def upgrade():
op.drop_constraint("queues_pkey", "queues")
op.execute(CreateSequence(Sequence("queues_id_seq")))
op.add_column("queues", Column("id", Integer, server_default=text("nextval('queues_id_seq'::regclass)")))
op.create_primary_key("queues_pkey", "queues", ["id"])
Email.__table__.create(bind=engine)
op.create_table(
"queue_notifications",
Column("queue_id", Integer, ForeignKey("queues.id")),
Column("email_id", Integer, ForeignKey("emails.id")),
)
op.add_column("users", Column("email_id", Integer, ForeignKey("emails.id")))
conn = op.get_bind()
s = select([text("users.email")]).select_from(text("users"))
users = conn.execute(s)
table_user = User.__table__
for row in users:
ins = Email.__table__.insert().values(address=row["email"])
result_insert_email = conn.execute(ins)
upd = (
table_user.update()
.values(email_id=result_insert_email.inserted_primary_key[0])
.where(text("users.email = :email"))
)
conn.execute(upd, email=row["email"])
op.drop_column("users", "email")
开发者ID:yaoweizhen,项目名称:pulseguardian,代码行数:29,代码来源:2d19bced283e_move_email_and_associate_to_queue.py
示例2: update_view_count_to_db
def update_view_count_to_db(url):
try:
return
original_url = url
count = redis_views.get(url)
url = url.replace('http://d35wlof4jnjr70.cloudfront.net/', 'https://s3.amazonaws.com/franklymestorage/')
types = ['_ultralow', '_low', '_medium', '_opt', '_promo']
for suffix in types:
url = url.replace(suffix, '')
if count:
db.session.execute(text("""UPDATE users
SET view_count=view_count+:count, total_view_count=total_view_count+:count
WHERE profile_video=:url
"""),
params = {"url":url, "count":int(count)}
)
db.session.execute(text("""UPDATE posts
SET view_count=view_count+:count
WHERE media_url=:url
"""),
params = {"url":url, "count":int(count)}
)
db.session.commit()
redis_views.delete(original_url)
except:
db.session.rollback()
开发者ID:Aranjedeath,项目名称:fbackend,代码行数:29,代码来源:video_db.py
示例3: get_candidate_tasks
def get_candidate_tasks(app_id, user_id=None, user_ip=None, n_answers=30, offset=0):
"""Gets all available tasks for a given project and user"""
rows = None
if user_id and not user_ip:
query = text('''
SELECT id FROM task WHERE NOT EXISTS
(SELECT task_id FROM task_run WHERE
app_id=:app_id AND user_id=:user_id AND task_id=task.id)
AND app_id=:app_id AND state !='completed'
ORDER BY priority_0 DESC, id ASC LIMIT 10''')
rows = session.execute(query, dict(app_id=app_id, user_id=user_id))
else:
if not user_ip:
user_ip = '127.0.0.1'
query = text('''
SELECT id FROM task WHERE NOT EXISTS
(SELECT task_id FROM task_run WHERE
app_id=:app_id AND user_ip=:user_ip AND task_id=task.id)
AND app_id=:app_id AND state !='completed'
ORDER BY priority_0 DESC, id ASC LIMIT 10''')
rows = session.execute(query, dict(app_id=app_id, user_ip=user_ip))
tasks = []
for t in rows:
tasks.append(session.query(Task).get(t.id))
return tasks
开发者ID:alisheikh,项目名称:pybossa,代码行数:26,代码来源:sched.py
示例4: downgrade
def downgrade():
conn = op.get_bind()
conn.execute(
text(
"ALTER TABLE contact"
" ADD CONSTRAINT contact_ibfk_1 FOREIGN KEY"
" (namespace_id) REFERENCES namespace(id)"
)
)
conn.execute(
text(
"ALTER TABLE phonenumber"
" ADD CONSTRAINT phonenumber_ibfk_1 FOREIGN KEY"
" (contact_id) REFERENCES contact(id)"
)
)
conn.execute(
text(
"ALTER TABLE messagecontactassociation"
" ADD CONSTRAINT messagecontactassociation_ibfk_1"
" FOREIGN KEY (contact_id) REFERENCES contact(id)"
)
)
开发者ID:nylas,项目名称:sync-engine,代码行数:25,代码来源:231_drop_contact_foreign_keys.py
示例5: upgrade
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('product', sa.Column('create_date', sa.DateTime(), nullable=True))
results = op.get_bind().execute(text("""
select prd.id, min(po.order_date) from purchase_order po, product prd, purchase_order_line pol
where pol.product_id = prd.id and po.id = pol.purchase_order_id
group by prd.id
""")).fetchall()
for r in results:
sup_id = r[0]
po_date = r[1]
sql = "update product set create_date = '{0}' where id={1}".format(po_date, sup_id)
op.get_bind().execute(text(sql))
results = op.get_bind().execute(text("""
select p.id, min(so.order_date) from sales_order so, sales_order_line sol,
product p where so.id = sol.sales_order_id and
sol.product_id = p.id group by p.id;
""")).fetchall()
for r in results:
sup_id = r[0]
so_date = r[1]
sql = "update product set create_date = '{0}' where id={1} and create_date is null".format(so_date, sup_id)
op.get_bind().execute(text(sql))
op.get_bind().execute(text("update product set create_date = '{0}' where create_date is null".format(datetime.now())))
op.alter_column('product', 'create_date', existing_type=sa.DateTime(), nullable=False)
开发者ID:betterlife,项目名称:psi,代码行数:28,代码来源:42_67607ed6ab04_.py
示例6: get_table_names
def get_table_names(self, connection, schema=None, **kw):
if schema is None:
schema = self.default_schema_name
if schema is None:
result = connection.execute(
text("SELECT TABLE_NAME FROM DB..TABLES"))
return [r[0] for r in result]
if '.' not in schema:
schema += '.'
catalog, schema = schema.split('.', 1)
if catalog:
if schema:
result = connection.execute(
text("SELECT TABLE_NAME FROM DB..TABLES WHERE"
"TABLE_CATALOG=:catalog AND TABLE_SCHEMA = :schema"),
catalog=catalog, schema=schema)
else:
result = connection.execute(
text("SELECT TABLE_NAME FROM DB..TABLES WHERE"
"TABLE_CATALOG=:catalog"), catalog=catalog)
else:
result = connection.execute(
text("SELECT TABLE_NAME FROM DB..TABLES WHERE"
"TABLE_SCHEMA=:schema"), schema=schema)
return [r[0] for r in result]
开发者ID:philippeluickx,项目名称:virtuoso-python,代码行数:25,代码来源:alchemy.py
示例7: test_filtered_counting
def test_filtered_counting( self ):
user2 = self.user_manager.create( **user2_data )
history = self.history_manager.create( name='history', user=user2 )
contents = []
contents.extend([ self.add_hda_to_history( history, name=( 'hda-' + str( x ) ) ) for x in xrange( 3 ) ])
contents.append( self.add_list_collection_to_history( history, contents[:3] ) )
contents.extend([ self.add_hda_to_history( history, name=( 'hda-' + str( x ) ) ) for x in xrange( 4, 6 ) ])
contents.append( self.add_list_collection_to_history( history, contents[4:6] ) )
self.log( "should show correct count with filters" )
self.hda_manager.delete( contents[1] )
self.hda_manager.delete( contents[4] )
contents[6].deleted = True
self.app.model.context.flush()
contents[2].visible = False
contents[5].visible = False
contents[6].visible = False
self.app.model.context.flush()
HDA = self.hda_manager.model_class
self.assertEqual( self.contents_manager.contents_count( history, filters=[ HDA.deleted == true() ] ), 3 )
filters = [ text( 'visible = 0' ) ]
self.assertEqual( self.contents_manager.contents_count( history, filters=filters ), 3 )
filters = [ text( 'deleted = 1' ), text( 'visible = 0' ) ]
self.assertEqual( self.contents_manager.contents_count( history, filters=filters ), 1 )
开发者ID:BenjaminHCCarr,项目名称:galaxy,代码行数:27,代码来源:test_HistoryContentsManager.py
示例8: test_mysql_innodb
def test_mysql_innodb(self):
"""Test that table creation on mysql only builds InnoDB tables."""
with mock.patch('manila.db.sqlalchemy.api.get_engine',
return_value=self.engine):
self._walk_versions(snake_walk=False, downgrade=False)
# sanity check
sanity_check = """SELECT count(*)
FROM information_schema.tables
WHERE table_schema = :database;"""
total = self.engine.execute(
text(sanity_check),
database=self.engine.url.database)
self.assertTrue(total.scalar() > 0, "No tables found. Wrong schema?")
noninnodb_query = """
SELECT count(*)
FROM information_schema.TABLES
WHERE table_schema = :database
AND engine != 'InnoDB'
AND table_name != 'alembic_version';"""
count = self.engine.execute(
text(noninnodb_query),
database=self.engine.url.database
).scalar()
self.assertEqual(count, 0, "%d non InnoDB tables created" % count)
开发者ID:jcsp,项目名称:manila,代码行数:28,代码来源:test_migration.py
示例9: has_table
def has_table(self, connection, table_name, schema=None):
# seems like case gets folded in pg_class...
if schema is None:
cursor = connection.execute(
sql.text(
"select relname from pg_class c join pg_namespace n on "
"n.oid=c.relnamespace where n.nspname=current_schema() and "
"lower(relname)=:name",
bindparams=[
sql.bindparam('name', unicode(table_name.lower()),
type_=sqltypes.Unicode)]
)
)
else:
cursor = connection.execute(
sql.text(
"select relname from pg_class c join pg_namespace n on "
"n.oid=c.relnamespace where n.nspname=:schema and "
"lower(relname)=:name",
bindparams=[
sql.bindparam('name',
unicode(table_name.lower()), type_=sqltypes.Unicode),
sql.bindparam('schema',
unicode(schema), type_=sqltypes.Unicode)]
)
)
return bool(cursor.first())
开发者ID:advatar,项目名称:thevault,代码行数:27,代码来源:base.py
示例10: createServiceRequest
def createServiceRequest():
if request.method == 'POST':
sql = text('''INSERT INTO service_request(client_username, address, title, description, tag)
VALUES(:user, :address, :title, :description, :tag);''')
db.engine.execute(sql, user=session["user"], address=request.form.get('address'), title=request.form.get('title'),\
description=request.form.get('description'), tag=request.form.get('tag'))
sql2 = text('''SELECT service_id FROM service_request WHERE client_username=:user AND address=:address AND
title=:title AND description=:description;''')
result = db.engine.execute(sql2, user=session["user"], address=request.form.get('address'), title=request.form.get('title'),\
description=request.form.get('description'))
idnum = result.fetchone()
days = ['mo', 'tu', 'we', 'th', 'fr', 'sa', 'su']
for day in days:
times = request.form.getlist(day)
for time in times:
sql = text('''INSERT INTO service_schedule(service_id,day,hour) VALUES(:id,:day,:hour);''')
db.engine.execute(sql, id=idnum[0], day=day, hour=time)
flash('Service request created.')
return redirect('/requests/'+str(idnum[0])+'/workers')
if not session.get('user'):
flash('Please login before creating requests.')
return redirect('/')
elif session['type'] == 'worker':
flash('You must be a client to create service requests.')
return redirect('/')
else:
sql = text('''SELECT tag FROM job_tag''')
results = db.engine.execute(sql)
tags = [res[0] for res in results]
return render_template('createRequest.jade', tags=tags)
开发者ID:mibewh,项目名称:helpingHand,代码行数:30,代码来源:request.py
示例11: getAltitudesChilds
def getAltitudesChilds(connection, cd_ref):
# construction du select de la requete a partir des cles de la table
sql = """
SELECT label_altitude
FROM atlas.bib_altitudes
ORDER BY altitude_min
"""
qalt = connection.execute(text(sql))
alt = [k[0] for k in qalt]
sumSelect = ', '.join(
"SUM({}) AS {}".format(k, k) for k in alt
)
sql = """
SELECT {sumSelect}
FROM atlas.vm_altitudes alt
WHERE
alt.cd_ref IN (
SELECT * FROM atlas.find_all_taxons_childs(:thiscdref)
) OR alt.cd_ref = :thiscdref
""".format(sumSelect=sumSelect)
mesAltitudes = connection.execute(text(sql), thiscdref=cd_ref)
altiList = list()
for a in mesAltitudes:
for k in alt:
temp = {"altitude": k.replace('_', '-')[1:], "value": getattr(a, k)}
altiList.append(temp)
return altiList
开发者ID:PnEcrins,项目名称:GeoNature-atlas,代码行数:31,代码来源:vmAltitudesRepository.py
示例12: get_unique_constraints
def get_unique_constraints(self, connection, table_name, schema=None, **kw):
# Same as get_indexes except only for "unique"=2
table_id = self.get_table_id(connection, table_name, schema,
info_cache=kw.get("info_cache"))
# unique=2 -> unique constraint
INDEX_SQL = text("""
SELECT i.index_id as index_id, i.index_name AS name
FROM sys.sysidx i join sys.systab t on i.table_id=t.table_id
WHERE t.table_id = :table_id and i.index_category = 3 and i."unique"=2
""")
results = connection.execute(INDEX_SQL, table_id=table_id)
indexes = []
for r in results:
INDEXCOL_SQL = text("""
select tc.column_name as col
FROM sys.sysidxcol ic
join sys.systabcol tc on (ic.table_id=tc.table_id and ic.column_id=tc.column_id)
WHERE ic.index_id = :index_id and ic.table_id = :table_id
ORDER BY ic.sequence ASC
""")
idx_cols = connection.execute(INDEXCOL_SQL, index_id=r["index_id"],
table_id=table_id)
column_names = [ic["col"] for ic in idx_cols]
index_info = {"name": r["name"],
"column_names": column_names}
indexes.append(index_info)
return indexes
开发者ID:sqlanywhere,项目名称:sqlalchemy-sqlany,代码行数:30,代码来源:base.py
示例13: get_pk_constraint
def get_pk_constraint(self, connection, table_name, schema=None, **kw):
table_id = self.get_table_id(connection, table_name, schema,
info_cache=kw.get("info_cache"))
# index_category=1 -> primary key
PK_SQL = text("""
SELECT t.table_name AS table_name, i.index_id as index_id,
i.index_name AS name
FROM sys.sysidx i join sys.systab t on i.table_id=t.table_id
WHERE t.table_id = :table_id and i.index_category = 1
""")
results = connection.execute(PK_SQL, table_id=table_id)
pks = results.fetchone()
results.close()
if not pks:
return {"constrained_columns": [],
"name": None}
PKCOL_SQL = text("""
select tc.column_name as col
FROM sys.sysidxcol ic
join sys.systabcol tc on (ic.table_id=tc.table_id and ic.column_id=tc.column_id)
WHERE ic.index_id = :index_id and ic.table_id = :table_id
""")
pk_cols = connection.execute(PKCOL_SQL, index_id=pks["index_id"],
table_id=table_id )
column_names = [pkc["col"] for pkc in pk_cols]
return {"constrained_columns": column_names,
"name": pks["name"]}
开发者ID:sqlanywhere,项目名称:sqlalchemy-sqlany,代码行数:31,代码来源:base.py
示例14: get_indexes
def get_indexes(self, connection, table_name, schema=None, **kw):
table_id = self.get_table_id(connection, table_name, schema,
info_cache=kw.get("info_cache"))
# index_category=3 -> not primary key, not foreign key, not text index
# unique=1 -> unique index, 2 -> unique constraint, 5->unique index with
# nulls not distinct
INDEX_SQL = text("""
SELECT i.index_id as index_id, i.index_name AS name,
if i."unique" in (1,2,5) then 1 else 0 endif AS "unique"
FROM sys.sysidx i join sys.systab t on i.table_id=t.table_id
WHERE t.table_id = :table_id and i.index_category = 3
""")
results = connection.execute(INDEX_SQL, table_id=table_id)
indexes = []
for r in results:
INDEXCOL_SQL = text("""
select tc.column_name as col
FROM sys.sysidxcol ic
join sys.systabcol tc on (ic.table_id=tc.table_id and ic.column_id=tc.column_id)
WHERE ic.index_id = :index_id and ic.table_id = :table_id
ORDER BY ic.sequence ASC
""")
idx_cols = connection.execute(INDEXCOL_SQL, index_id=r["index_id"],
table_id=table_id)
column_names = [ic["col"] for ic in idx_cols]
index_info = {"name": r["name"],
"unique": bool(r["unique"]),
"column_names": column_names}
indexes.append(index_info)
return indexes
开发者ID:sqlanywhere,项目名称:sqlalchemy-sqlany,代码行数:33,代码来源:base.py
示例15: upgrade_6_0_to_6_1_plugins_cluster_attrs_use_ids_mapping
def upgrade_6_0_to_6_1_plugins_cluster_attrs_use_ids_mapping(connection):
"""Convert plugin version mapping to plugin ids
In Fuel 6.0 we had plugin version in cluster attributes
to identify which plugin should be enabled or disabled.
In 6.1 release we have plugins updates feature, it means
that a single plugin can be updated/overwritten with newer
version. For example 1.0.0 can be replaced with 1.0.1.
As result we cannot rely on versions anymore, here we
convert version mapping to plugin ids.
See blueprint:
https://blueprints.launchpad.net/fuel/+spec/plugins-security-fixes-delivery
"""
select_attrs = text("""SELECT id, editable FROM attributes""")
select_plugins = text(
"""SELECT id FROM plugins
WHERE name = :plugin_name AND
version = :plugin_version""")
update_attrs = text(
"""UPDATE attributes
SET editable = :editable
WHERE id = :id""")
attrs_list = connection.execute(select_attrs)
for raw_attrs in attrs_list:
attr_id = raw_attrs[0]
attrs = jsonutils.loads(raw_attrs[1])
for key, attr in six.iteritems(attrs):
metadata = attr.get('metadata', {})
plugin_version = metadata.get('plugin_version')
if not plugin_version:
continue
plugin_name = key
# If there is no plugin with such version
# and name, it means that something was wrong
# and somebody deleted the plugin from database
# we must not fail migration in this case
plugin_id = None
plugins = list(connection.execute(
select_plugins,
plugin_name=plugin_name,
plugin_version=plugin_version))
if plugins:
plugin_id = plugins[0][0]
del attr['metadata']['plugin_version']
attr['metadata']['plugin_id'] = plugin_id
connection.execute(
update_attrs,
editable=jsonutils.dumps(attrs),
id=attr_id)
开发者ID:katepimenova,项目名称:fuel-web,代码行数:60,代码来源:migration.py
示例16: has_sequence
def has_sequence(self, connection, sequence_name, schema=None):
if schema is None:
cursor = connection.execute(
sql.text(
"SELECT relname FROM pg_class c join pg_namespace n on "
"n.oid=c.relnamespace where relkind='S' and "
"n.nspname=current_schema() "
"and lower(relname)=:name",
bindparams=[
sql.bindparam('name', unicode(sequence_name.lower()),
type_=sqltypes.Unicode)
]
)
)
else:
cursor = connection.execute(
sql.text(
"SELECT relname FROM pg_class c join pg_namespace n on "
"n.oid=c.relnamespace where relkind='S' and "
"n.nspname=:schema and lower(relname)=:name",
bindparams=[
sql.bindparam('name', unicode(sequence_name.lower()),
type_=sqltypes.Unicode),
sql.bindparam('schema',
unicode(schema), type_=sqltypes.Unicode)
]
)
)
return bool(cursor.first())
开发者ID:advatar,项目名称:thevault,代码行数:30,代码来源:base.py
示例17: upgrade_ubuntu_cobbler_profile_6_0_to_6_1
def upgrade_ubuntu_cobbler_profile_6_0_to_6_1(connection):
select_query = text("SELECT id, generated FROM attributes")
update_query = text(
"UPDATE attributes SET generated = :generated WHERE id = :attr_id")
for attr_id, generated in connection.execute(select_query):
attrs = jsonutils.loads(generated)
if attrs['cobbler']['profile'] == 'ubuntu_1204_x86_64':
attrs['cobbler']['profile'] = 'ubuntu_1404_x86_64'
connection.execute(
update_query,
generated=jsonutils.dumps(attrs),
attr_id=attr_id)
select_query = text("SELECT id, attributes_metadata FROM releases")
update_query = text(
"UPDATE releases SET attributes_metadata = :attrs_meta"
" WHERE id = :release_id")
for release_id, attributes_metadata in connection.execute(select_query):
attrs = jsonutils.loads(attributes_metadata)
if attrs['generated']['cobbler']['profile']['generator_arg'] == \
'ubuntu_1204_x86_64':
attrs['generated']['cobbler']['profile']['generator_arg'] = \
'ubuntu_1404_x86_64'
connection.execute(
update_query,
attrs_meta=jsonutils.dumps(attrs),
release_id=release_id)
开发者ID:katepimenova,项目名称:fuel-web,代码行数:27,代码来源:migration.py
示例18: sources2removals
def sources2removals(source_list, suite_id, session):
"""Compute removals items given a list of names of source packages
@type source_list: list
@param source_list: A list of names of source packages
@type suite_id: int
@param suite_id: The id of the suite from which these sources should be removed
@type session: SQLA Session
@param session: The database session in use
@rtype: list
@return: A list of items to be removed to remove all sources and their binaries from the given suite
"""
to_remove = []
params = {"suite_id": suite_id, "sources": tuple(source_list)}
q = session.execute(sql.text("""
SELECT s.source, s.version, 'source', s.id
FROM source s
JOIN src_associations sa ON sa.source = s.id
WHERE sa.suite = :suite_id AND s.source IN :sources"""), params)
to_remove.extend(q)
q = session.execute(sql.text("""
SELECT b.package, b.version, a.arch_string, b.id
FROM binaries b
JOIN bin_associations ba ON b.id = ba.bin
JOIN architecture a ON b.architecture = a.id
JOIN source s ON b.source = s.id
WHERE ba.suite = :suite_id AND s.source IN :sources"""), params)
to_remove.extend(q)
return to_remove
开发者ID:Debian,项目名称:dak,代码行数:32,代码来源:auto_decruft.py
示例19: submit_recording
def submit_recording(connection, data):
data_json = json.dumps(data, sort_keys=True, separators=(',', ':'))
data_sha256 = sha256(data_json.encode("utf-8")).hexdigest()
meta = {"artist": data["artist"], "title": data["title"]}
meta_json = json.dumps(meta, sort_keys=True, separators=(',', ':'))
meta_sha256 = sha256(meta_json.encode("utf-8")).hexdigest()
artist = get_artist_credit(connection, data["artist"])
if not artist:
artist = add_artist_credit(connection, data["artist"])
if "release" in data:
release = get_release(connection, data["release"])
if not release:
release = add_release(connection, data["release"])
else:
release = None
query = text("""INSERT INTO recording_json (data, data_sha256, meta_sha256)
VALUES (:data, :data_sha256, :meta_sha256)
RETURNING id""")
result = connection.execute(query, {"data": data_json,
"data_sha256": data_sha256,
"meta_sha256": meta_sha256})
id = result.fetchone()["id"]
gid = str(uuid.uuid4())
query = text("""INSERT INTO recording (gid, data, artist, release, submitted)
VALUES (:gid, :data, :artist, :release, now())""")
connection.execute(query, {"gid": gid,
"data": id,
"artist": artist,
"release": release})
return gid
开发者ID:Freso,项目名称:messybrainz-server,代码行数:33,代码来源:data.py
示例20: main
def main(url = Config.get("URL", "ships"), debug=False):
req = urllib2.Request(url)
req.add_header('User-Agent', useragent)
stats = urllib2.urlopen(req).read()
session.execute(Ship.__table__.delete())
if Config.get("DB", "dbms") == "mysql":
session.execute(text("ALTER TABLE ships AUTO_INCREMENT=1;", bindparams=[false]))
else:
session.execute(text("SELECT setval('ships_id_seq', 1, :false);", bindparams=[false]))
for line in sre.findall(stats):
ship = Ship()
line = list(line)
for index, key in enumerate(keys):
if line[index] in mapping:
line[index] = mapping[line[index]]
elif line[index].isdigit():
line[index] = int(line[index])
if line[index] not in ('-', '',):
setattr(ship,key,line[index])
ship.total_cost = ship.metal + ship.crystal + ship.eonium
if debug: print "%12s%12s%12s%12s" % (ship.name, ship.class_, ship.race, ship.type,)
session.add(ship)
session.commit()
session.close()
开发者ID:TBBTNut,项目名称:merlin,代码行数:27,代码来源:shipstats.py
注:本文中的sqlalchemy.sql.text函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论