本文整理汇总了Python中storm.database.create_database函数的典型用法代码示例。如果您正苦于以下问题:Python create_database函数的具体用法?Python create_database怎么用?Python create_database使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了create_database函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: load_db
def load_db(self, uri):
global database
global tlocal
import threading
tlocal = threading.local()
if not store:
database = create_database(uri)
tlocal.store = Store(database)
if self.isClosed:
database = create_database(uri)
tlocal.store = Store(database)
self.isClose = False
return tlocal.store
开发者ID:pszafer,项目名称:dlna_upnp_invention,代码行数:13,代码来源:Database.py
示例2: test_charset_option
def test_charset_option(self):
uri = URI(os.environ["STORM_MYSQL_URI"])
uri.options["charset"] = "ascii"
database = create_database(uri)
connection = database.connect()
result = connection.execute("SELECT @@character_set_client")
self.assertEquals(result.get_one(), ("ascii",))
开发者ID:DamnWidget,项目名称:mamba-storm,代码行数:7,代码来源:mysql.py
示例3: setUp
def setUp(self):
super(PostgresTimeoutTracerTest, self).setUp()
self.database = create_database(os.environ["STORM_POSTGRES_URI"])
self.connection = self.database.connect()
install_tracer(self.tracer)
self.tracer.get_remaining_time = lambda: self.remaining_time
self.remaining_time = 10.5
开发者ID:Tibodef,项目名称:PythonBlog,代码行数:7,代码来源:postgres.py
示例4: test_wb_create_database
def test_wb_create_database(self):
database = create_database("mysql://un:[email protected]:12/db?unix_socket=us")
self.assertTrue(isinstance(database, MySQL))
for key, value in [("db", "db"), ("host", "ht"), ("port", 12),
("user", "un"), ("passwd", "pw"),
("unix_socket", "us")]:
self.assertEquals(database._connect_kwargs.get(key), value)
开发者ID:DamnWidget,项目名称:mamba-storm,代码行数:7,代码来源:mysql.py
示例5: test_commit_timeout
def test_commit_timeout(self):
"""Regression test for commit observing the timeout.
In 0.10, the timeout wasn't observed for connection.commit().
"""
# Create a database with a table.
database = create_database("sqlite:%s?timeout=0.3" % self.get_path())
connection1 = database.connect()
connection1.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)")
connection1.commit()
# Put some data in, but also make a second connection to the database,
# which will prevent a commit until it is closed.
connection1.execute("INSERT INTO test VALUES (1)")
connection2 = database.connect()
connection2.execute("SELECT id FROM test")
started = time.time()
try:
connection1.commit()
except OperationalError, exception:
self.assertEquals(str(exception), "database is locked")
# In 0.10, the next assertion failed because the timeout wasn't
# enforced for the "COMMIT" statement.
self.assertTrue(time.time()-started >= 0.3)
开发者ID:DamnWidget,项目名称:mamba-storm,代码行数:26,代码来源:sqlite.py
示例6: test_recover_after_timeout
def test_recover_after_timeout(self):
"""Regression test for recovering from database locked exception.
In 0.10, connection.commit() would forget that a transaction was in
progress if an exception was raised, such as an OperationalError due to
another connection being open. As a result, a subsequent modification
to the database would cause BEGIN to be issued to the database, which
would complain that a transaction was already in progress.
"""
# Create a database with a table.
database = create_database("sqlite:%s?timeout=0.3" % self.get_path())
connection1 = database.connect()
connection1.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)")
connection1.commit()
# Put some data in, but also make a second connection to the database,
# which will prevent a commit until it is closed.
connection1.execute("INSERT INTO test VALUES (1)")
connection2 = database.connect()
connection2.execute("SELECT id FROM test")
self.assertRaises(OperationalError, connection1.commit)
# Close the second connection - it should now be possible to commit.
connection2.close()
# In 0.10, the next statement raised OperationalError: cannot start a
# transaction within a transaction
connection1.execute("INSERT INTO test VALUES (2)")
connection1.commit()
# Check that the correct data is present
self.assertEquals(connection1.execute("SELECT id FROM test").get_all(),
[(1,), (2,)])
开发者ID:DamnWidget,项目名称:mamba-storm,代码行数:34,代码来源:sqlite.py
示例7: test_isolation_read_committed
def test_isolation_read_committed(self):
database = create_database(
os.environ["STORM_POSTGRES_URI"] + "?isolation=read-committed")
connection = database.connect()
self.addCleanup(connection.close)
result = connection.execute("SHOW TRANSACTION ISOLATION LEVEL")
self.assertEquals(result.get_one()[0], u"read committed")
connection.execute("INSERT INTO bin_test VALUES (1, 'foo')")
result = self.connection.execute("SELECT id FROM bin_test")
# Data should not be there already
self.assertEquals(result.get_all(), [])
connection.rollback()
# Start a transaction
result = connection.execute("SELECT 1")
self.assertEquals(result.get_one(), (1,))
self.connection.execute("INSERT INTO bin_test VALUES (1, 'foo')")
self.connection.commit()
result = connection.execute("SELECT id FROM bin_test")
# Data is already here!
self.assertEquals(result.get_one(), (1,))
connection.rollback()
开发者ID:Tibodef,项目名称:PythonBlog,代码行数:28,代码来源:postgres.py
示例8: __init__
def __init__(self, market_db):
self.database = create_database('sqlite:' + market_db)
self.store = MarketStore(self.database)
with open(os.path.join(BASE_DIR, 'database', 'schema.sql')) as fp:
schema = fp.read()
for cmd in schema.split(';'):
self.store.execute(cmd)
开发者ID:Tribler,项目名称:decentralized-mortgage-market,代码行数:8,代码来源:datamanager.py
示例9: _get_store
def _get_store(self):
if self.store is not None:
return self.store
db_dir_path = os.path.join(self.path, "db")
if not os.path.isdir(db_dir_path):
os.mkdir(db_dir_path)
db_path = os.path.join(db_dir_path, "hostdb.sqlite")
db = create_database("sqlite:%s?timeout=%f" % (db_path, self.timeout))
self.store = Store(db)
setup_schema(self.store)
return self.store
开发者ID:jelmer,项目名称:build-farm,代码行数:11,代码来源:__init__.py
示例10: is_supported
def is_supported(self):
uri = os.environ.get("STORM_POSTGRES_URI")
if not uri:
return False
global _max_prepared_transactions
if _max_prepared_transactions is None:
database = create_database(uri)
connection = database.connect()
result = connection.execute("SHOW MAX_PREPARED_TRANSACTIONS")
_max_prepared_transactions = int(result.get_one()[0])
connection.close()
return _max_prepared_transactions > 0
开发者ID:Tibodef,项目名称:PythonBlog,代码行数:12,代码来源:postgres.py
示例11: setUp
def setUp(self):
TestHelper.setUp(self)
# Allow classes with the same name in different tests to resolve
# property path strings properly.
SQLObjectBase._storm_property_registry.clear()
self.store = Store(create_database("sqlite:"))
class SQLObject(SQLObjectBase):
@staticmethod
def _get_store():
return self.store
self.SQLObject = SQLObject
self.store.execute("CREATE TABLE person "
"(id INTEGER PRIMARY KEY, name TEXT, age INTEGER,"
" ts TIMESTAMP, delta INTERVAL,"
" address_id INTEGER)")
self.store.execute("INSERT INTO person VALUES "
"(1, 'John Joe', 20, '2007-02-05 19:53:15',"
" '1 day, 12:34:56', 1)")
self.store.execute("INSERT INTO person VALUES "
"(2, 'John Doe', 20, '2007-02-05 20:53:15',"
" '42 days 12:34:56.78', 2)")
self.store.execute("CREATE TABLE address "
"(id INTEGER PRIMARY KEY, city TEXT)")
self.store.execute("INSERT INTO address VALUES (1, 'Curitiba')")
self.store.execute("INSERT INTO address VALUES (2, 'Sao Carlos')")
self.store.execute("CREATE TABLE phone "
"(id INTEGER PRIMARY KEY, person_id INTEGER,"
"number TEXT)")
self.store.execute("INSERT INTO phone VALUES (1, 2, '1234-5678')")
self.store.execute("INSERT INTO phone VALUES (2, 1, '8765-4321')")
self.store.execute("INSERT INTO phone VALUES (3, 2, '8765-5678')")
self.store.execute("CREATE TABLE person_phone "
"(id INTEGER PRIMARY KEY, person_id INTEGER, "
"phone_id INTEGER)")
self.store.execute("INSERT INTO person_phone VALUES (1, 2, 1)")
self.store.execute("INSERT INTO person_phone VALUES (2, 2, 2)")
self.store.execute("INSERT INTO person_phone VALUES (3, 1, 1)")
class Person(self.SQLObject):
_defaultOrder = "-Person.name"
name = StringCol()
age = IntCol()
ts = UtcDateTimeCol()
self.Person = Person
开发者ID:datnguyen0606,项目名称:storm,代码行数:52,代码来源:sqlobject.py
示例12: test_timeout
def test_timeout(self):
database = create_database("sqlite:%s?timeout=0.3" % self.get_path())
connection1 = database.connect()
connection2 = database.connect()
connection1.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)")
connection1.commit()
connection1.execute("INSERT INTO test VALUES (1)")
started = time.time()
try:
connection2.execute("INSERT INTO test VALUES (2)")
except OperationalError, exception:
self.assertEquals(str(exception), "database is locked")
self.assertTrue(time.time()-started >= 0.3)
开发者ID:DamnWidget,项目名称:mamba-storm,代码行数:13,代码来源:sqlite.py
示例13: __init__
def __init__(self,db="sqlite"):
uname="root"
passw=""
if db=="postgres":
passw="root"
elif db=="sqlite":
expr="sqlite:"
if db!="sqlite":
expr="{db}://{usern}:{passw}@localhost/test".format(db=db,usern=uname,passw=passw)
self.database = create_database(expr)
self.store = Store(self.database)
#self.store.execute("DROP TABLE users")
self.store.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, login VARCHAR(8), userid INTEGER, projid INTEGER)")
开发者ID:omelhoro,项目名称:python-coreapps,代码行数:13,代码来源:ex26.py
示例14: setUp
def setUp(self):
super(BuildFarmTestCase, self).setUp()
self.path = tempfile.mkdtemp()
for subdir in ["data", "data/upload", "data/oldrevs", "db", "web", "lcov", "lcov/data"]:
os.mkdir(os.path.join(self.path, subdir))
self.db_url = "sqlite:"+os.path.join(self.path, "db", "hostdb.sqlite")
db = database.create_database(self.db_url)
store = Store(db)
setup_schema(store)
store.commit()
self.write_compilers([])
self.write_hosts({})
开发者ID:krishnatejaperannagari,项目名称:build-farm,代码行数:14,代码来源:__init__.py
示例15: __init__
def __init__(self):
GladeDelegate.__init__(self,
gladefile="interface.ui",
delete_handler=self.quit_if_last)
self.proxy = None
self.db = create_database("sqlite:laps.sqlite")
self.store = Store(self.db)
self.race = self._check_race()
self.race_proxy = self.add_proxy(self.race, self.race_widgets)
self.register_validate_function(self._validation_changed)
self._check_categories()
self.setup_widgets()
开发者ID:romaia,项目名称:race-lap-timer,代码行数:14,代码来源:timer.py
示例16: _get_store_internal
def _get_store_internal(self, dbname):
from stoqlib.database.runtime import StoqlibStore
uri = self._create_uri(dbname)
try:
if uri.host == "":
pair = test_local_database()
if pair is None:
raise DatabaseError(
_("Could not find a database server on this computer"))
uri.host = pair[0]
uri.port = int(pair[1])
self._log_connect(uri)
store = StoqlibStore(create_database(uri))
except OperationalError, e:
log.info('OperationalError: %s' % e)
raise DatabaseError(e.args[0])
开发者ID:romaia,项目名称:stoq,代码行数:16,代码来源:settings.py
示例17: _get_store_internal
def _get_store_internal(self, dbname):
from stoqlib.database.runtime import StoqlibStore
uri = self._create_uri(dbname)
try:
self._log_connect(uri)
store = StoqlibStore(create_database(uri))
except OperationalError as e:
log.info('OperationalError: %s' % e)
raise DatabaseError(e.args[0])
except Exception as e:
value = sys.exc_info()[1]
raise DatabaseError(
_("Could not connect to %s database. The error message is "
"'%s'. Please fix the connection settings you have set "
"and try again.") % (DEFAULT_RDBMS, value))
return store
开发者ID:hackedbellini,项目名称:stoq,代码行数:16,代码来源:settings.py
示例18: test_isolation_autocommit
def test_isolation_autocommit(self):
database = create_database(
os.environ["STORM_POSTGRES_URI"] + "?isolation=autocommit")
connection = database.connect()
self.addCleanup(connection.close)
result = connection.execute("SHOW TRANSACTION ISOLATION LEVEL")
# It matches read committed in Postgres internel
self.assertEquals(result.get_one()[0], u"read committed")
connection.execute("INSERT INTO bin_test VALUES (1, 'foo')")
result = self.connection.execute("SELECT id FROM bin_test")
# I didn't commit, but data should already be there
self.assertEquals(result.get_all(), [(1,)])
connection.rollback()
开发者ID:Tibodef,项目名称:PythonBlog,代码行数:17,代码来源:postgres.py
示例19: main
def main():
db = create_database("sqlite:laps.sqlite")
store = Store(db)
racers = store.find(Racer)
print 'Categoria,Número,Nome,L1,L2,L3,L4,L5,L6,L7,L8,Total'
for r in racers:
data = [r.category.name, r.number, r.name]
#print r.number, r.name
for i, lap in enumerate(list(r.get_laps()), 1):
assert i == lap.lap_number
#print ' ', i, lap.lap_number, lap.lap_time, lap
#data.append(str(lap.lap_time))
data.append(lap.lap_time.seconds)
data.extend([0] * (11 - len(data)))
data.append(r.total_time)
print ','.join(str(i) for i in data)
开发者ID:romaia,项目名称:race-lap-timer,代码行数:18,代码来源:export.py
示例20: test_isolation_serializable
def test_isolation_serializable(self):
database = create_database(
os.environ["STORM_POSTGRES_URI"] + "?isolation=serializable")
connection = database.connect()
self.addCleanup(connection.close)
result = connection.execute("SHOW TRANSACTION ISOLATION LEVEL")
self.assertEquals(result.get_one()[0], u"serializable")
# Start a transaction
result = connection.execute("SELECT 1")
self.assertEquals(result.get_one(), (1,))
self.connection.execute("INSERT INTO bin_test VALUES (1, 'foo')")
self.connection.commit()
result = connection.execute("SELECT id FROM bin_test")
# We can't see data yet, because transaction started before
self.assertEquals(result.get_one(), None)
connection.rollback()
开发者ID:Tibodef,项目名称:PythonBlog,代码行数:21,代码来源:postgres.py
注:本文中的storm.database.create_database函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论