本文整理汇总了Python中pymysql.connect函数的典型用法代码示例。如果您正苦于以下问题:Python connect函数的具体用法?Python connect怎么用?Python connect使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了connect函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _connect
def _connect(self, host, port, mysql_sock, user, password, defaults_file):
service_check_tags = [
'host:%s' % host,
'port:%s' % port
]
try:
if defaults_file != '':
db = pymysql.connect(read_default_file=defaults_file)
elif mysql_sock != '':
db = pymysql.connect(unix_socket=mysql_sock,
user=user,
passwd=password)
elif port:
db = pymysql.connect(host=host,
port=port,
user=user,
passwd=password)
else:
db = pymysql.connect(host=host,
user=user,
passwd=password)
self.log.debug("Connected to MySQL")
self.service_check(self.SERVICE_CHECK_NAME, AgentCheck.OK, tags=service_check_tags)
except Exception:
self.service_check(self.SERVICE_CHECK_NAME, AgentCheck.CRITICAL, tags=service_check_tags)
raise
return db
开发者ID:AirbornePorcine,项目名称:dd-agent,代码行数:29,代码来源:mysql.py
示例2: formula_to_dbid
def formula_to_dbid(formula_str, backslash_fix=False):
"""
Convert a LaTeX formula to the database index.
Parameters
----------
formula_str : string
The formula as LaTeX code.
backslash_fix : boolean
If this is set to true, then it will be checked if the same formula
exists with a preceeding backslash.
Returns
-------
int :
The database index.
"""
global __formula_to_dbid_cache
if __formula_to_dbid_cache is None:
mysql = utils.get_mysql_cfg()
connection = pymysql.connect(host=mysql['host'],
user=mysql['user'],
passwd=mysql['passwd'],
db=mysql['db'],
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
cursor = connection.cursor()
# Get all formulas that should get examined
sql = ("SELECT `id`, `formula_in_latex` FROM `wm_formula` ")
cursor.execute(sql)
formulas = cursor.fetchall()
__formula_to_dbid_cache = {}
for fm in formulas:
__formula_to_dbid_cache[fm['formula_in_latex']] = fm['id']
if formula_str in __formula_to_dbid_cache:
return __formula_to_dbid_cache[formula_str]
elif backslash_fix and ('\\%s' % formula_str) in __formula_to_dbid_cache:
return __formula_to_dbid_cache['\\%s' % formula_str]
else:
logging.info("Symbol '%s' was not found. Add it to write-math.com.",
formula_str)
mysql = utils.get_mysql_cfg()
connection = pymysql.connect(host=mysql['host'],
user=mysql['user'],
passwd=mysql['passwd'],
db=mysql['db'],
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
cursor = connection.cursor()
sql = ("INSERT INTO `wm_formula` (`user_id`, `formula_name`, "
"`formula_in_latex`, "
"`mode`, `package`) VALUES ("
"'10', %s, %s, 'bothmodes', NULL);")
if len(formula_str) < 20:
logging.info("Insert formula %s.", formula_str)
cursor.execute(sql, (formula_str, formula_str))
connection.commit()
__formula_to_dbid_cache[formula_str] = connection.insert_id()
return __formula_to_dbid_cache[formula_str]
开发者ID:Duum,项目名称:hwrt,代码行数:60,代码来源:__init__.py
示例3: mysql_test
def mysql_test(options):
"""Returns True if successful, False if unsuccessful."""
status = True
try:
pymysql.connect(
host="localhost",
user=options.app_username,
passwd=options.app_password,
db=options.database,
)
except pymysql.err.OperationalError:
status = False
LOG.debug("Could not connect as app user.")
try:
pymysql.connect(
host="localhost",
user=options.admin_username,
passwd=options.admin_password,
db=options.database,
)
except pymysql.err.OperationalError:
status = False
LOG.debug("Could not connect as admin user.")
return status
开发者ID:ianmackinnon,项目名称:mango,代码行数:27,代码来源:mysql.py
示例4: connect_mysql
def connect_mysql(instance, role='admin'):
"""Connect to a MySQL instance as admin
Args:
hostaddr - object describing which mysql instance to connect to
role - a string of the name of the mysql role to use. A bootstrap role can
be called for MySQL instances lacking any grants.
Returns:
a connection to the server as administrator
"""
log.info('Connecting to {}, please be patient. Timeout is 30 seconds.'.format(instance))
if role == 'bootstrap':
socket = host_utils.get_cnf_setting('socket', instance.port)
username = 'root'
password = ''
# when super_read_only is enabled and autocommit is false, `change master to` is not allowed to execute
# so we have to enable autocommit.
db = pymysql.connect(unix_socket=socket,
user=username,
passwd=password,
cursorclass=pymysql.cursors.DictCursor)
else:
username, password = get_mysql_user_for_role(role)
db = pymysql.connect(host=instance.hostname,
port=instance.port,
user=username,
passwd=password,
cursorclass=pymysql.cursors.DictCursor,
connect_timeout=CONNECT_TIMEOUT)
return db
开发者ID:xiachufang,项目名称:mysql_utils,代码行数:33,代码来源:mysql_lib.py
示例5: similarity
def similarity():
conn_yelp = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123456', db='biafinal_db')
cur_yelp = conn_yelp.cursor()
cur_yelp.execute("SELECT name,location,phone,id FROM restaurant_yelp where location like '%Hoboken%'")
infos_of_yelp = []
for row in cur_yelp:
infos_of_yelp.append(row)
cur_yelp.close()
conn_yelp.close()
conn_allmenu = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123456', db='biafinal_db')
cur_allmenu = conn_allmenu.cursor()
cur_allmenu.execute("SELECT rest_name,address,telephone,rest_id as id FROM restaurant_info")
infos_of_allmenu = []
for row in cur_allmenu:
infos_of_allmenu.append(row)
cur_allmenu.close()
conn_allmenu.close()
result = []
for info in infos_of_allmenu:
simi = find_most_simi(info,infos_of_yelp)
if simi[1]<2:
#print "======",simi[0][3],info[3],"--------";
addNewResterant(simi[0][3],info[3])
#print info,simi
result.append(info)
print len(result)
开发者ID:MichaelTQ,项目名称:bia660_final,代码行数:28,代码来源:similarity.py
示例6: opendb
def opendb(name):
"""Open the database with the name given"""
dbc = dbcredentials.DBcredfile()
creds = dbc.get(name)
if creds.login is None:
return pymysql.connect(host=creds.host, user=creds.user, passwd=creds.password, db=creds.database)
# We need SSH tunnel, first see if we've got one already
try:
return pymysql.connect(host='localhost', port=int(creds.localport), user=creds.user, passwd=creds.password, db=creds.database)
except pymysql.OperationalError as e:
if e.args[0] != 2003:
raise dbopsError("Could not connect to database error was " + e.args[1])
if os.fork() == 0:
os.execvp("ssh", ["ssh", "-nNT", "-L", "%d:localhost:%d" % (int(creds.localport), int(creds.remoteport)), "%[email protected]%s" % (creds.login, creds.host)])
sys.exit(255)
time.sleep(5)
try:
return pymysql.connect(host='localhost', port=int(creds.localport), user=creds.user, passwd=creds.password, db=creds.database)
except pymysql.OperationalError as e:
raise dbopsError("Could not connect to database after SSH tunnel error was " + e.args[1])
开发者ID:JohnMCollins,项目名称:python-astro-library,代码行数:25,代码来源:dbops.py
示例7: execute
def execute(self, sql, params=None):
self.dbConn = pymysql.connect(host=DB_HOST, user=DB_USER, passwd=DB_PASSWORD, db=DB,
charset=DB_CHARSET,
cursorclass=pymysql.cursors.DictCursor)
with pymysql.connect(host=DB_HOST, user=DB_USER, passwd=DB_PASSWORD, db=DB, charset=DB_CHARSET,
cursorclass=pymysql.cursors.DictCursor) as cursor:
r = DBResult()
try:
if not params:
r.Rows = cursor.execute(sql)
else:
r.Rows = cursor.execute(sql, params)
r.Result = cursor.fetchall() if r.Rows != 0 else []
r.Suc = True
self.dbConn.commit()
except Exception as e:
r.Err = e
print(e, 'execute Error')
self.dbConn.rollback()
try:
self.dbConn.close()
except:
pass
return r
开发者ID:c0hb1rd,项目名称:ShellScript,代码行数:26,代码来源:ManagerCreator.py
示例8: search_results_byrequests
def search_results_byrequests(query):
connzsky = pymysql.connect(host=DB_HOST,port=DB_PORT_MYSQL,user=DB_USER,password=DB_PASS,db=DB_NAME_MYSQL,charset=DB_CHARSET,cursorclass=pymysql.cursors.DictCursor)
currzsky = connzsky.cursor()
taginsertsql = 'REPLACE INTO search_tags(tag) VALUES(%s)'
currzsky.execute(taginsertsql,query)
connzsky.commit()
currzsky.close()
connzsky.close()
page=request.args.get('page',1,type=int)
conn = pymysql.connect(host=DB_HOST,port=DB_PORT_SPHINX,user=DB_USER,password=DB_PASS,db=DB_NAME_SPHINX,charset=DB_CHARSET,cursorclass=pymysql.cursors.DictCursor)
curr = conn.cursor()
querysql='SELECT * FROM film WHERE MATCH(%s) ORDER BY requests DESC limit %s,20 OPTION max_matches=1000'
curr.execute(querysql,[query,(page-1)*20])
result=curr.fetchall()
#countsql='SELECT COUNT(*) FROM film WHERE MATCH(%s)'
countsql='SHOW META'
curr.execute(countsql)
resultcounts=curr.fetchall()
counts=int(resultcounts[0]['Value'])
taketime=float(resultcounts[2]['Value'])
curr.close()
conn.close()
pages=(counts+19)/20
tags=Search_Tags.query.order_by(Search_Tags.id.desc()).limit(50)
form=SearchForm()
form.search.data=query
return render_template('list_byrequests.html',form=form,query=query,pages=pages,page=page,hashs=result,counts=counts,taketime=taketime,tags=tags)
开发者ID:wonderking,项目名称:zsky,代码行数:27,代码来源:manage.py
示例9: tweetData
def tweetData():
global mood_lag
if db.port:
cnx = pymysql.connect(charset='utf8', host=db.hostname, port=db.port, user=db.username, passwd=db.password, db=db.path[1:])
else:
cnx = pymysql.connect(charset='utf8', host=db.hostname, user=db.username, passwd=db.password, db=db.path[1:])
cursor = cnx.cursor()
tweets = []
rs = "SELECT text, sentiment, date, lat, lng FROM tweets WHERE `date` > %s ORDER BY `date` DESC LIMIT 1000"
lag = datetime.datetime.now() - datetime.timedelta(minutes=mood_lag)
params = [lag.isoformat(' ')]
cursor.execute(rs, params)
for t, s, d, lat, lng in cursor:
tweet = {}
tweet['text'] = t
tweet['sentiment'] = s
tweet['date'] = d.isoformat(' ')
tweet['lat'] = lat
tweet['lng'] = lng
tweets.append(tweet)
cursor.close()
cnx.close()
return json.dumps(tweets)
开发者ID:jacobdfriedmann,项目名称:twitter_mood_server,代码行数:28,代码来源:twitter_mood_server.py
示例10: connect
def connect(self):
rw = self.rw
try:
import pymysql as MySQLdb
import pymysql.converters
conv_dict = pymysql.converters.conversions.copy()
conv_dict[pymysql.constants.FIELD_TYPE.DECIMAL] = pymysql.converters.convert_float
conv_dict[pymysql.constants.FIELD_TYPE.NEWDECIMAL] = pymysql.converters.convert_float
pymysql.converters.encoders[np.float64] = pymysql.converters.escape_float
pymysql.converters.encoders[np.float32] = pymysql.converters.escape_float
MySQLdb.converters = pymysql.converters
_sqlcompress = False # compression not supported by pymysql yet
except:
import MySQLdb
import MySQLdb.converters
conv_dict = MySQLdb.converters.conversions.copy()
conv_dict[246] = float # Convert Decimal fields to float automatically
_sqlcompress = True
if rw:
self.db = MySQLdb.connect(host = GAVRTDB.host, port=GAVRTDB.port ,user=GAVRTDB.write_user,passwd=GAVRTDB.write_passwd,db=GAVRTDB.db,conv=conv_dict,compress=_sqlcompress)
else:
self.db = MySQLdb.connect(host = GAVRTDB.host, port=GAVRTDB.port ,user=GAVRTDB.read_user,passwd=GAVRTDB.read_passwd,db=GAVRTDB.db,conv=conv_dict,compress=_sqlcompress)
self.c = self.db.cursor()
开发者ID:gitj,项目名称:gavrt,代码行数:26,代码来源:gavrtdb.py
示例11: connect
def connect(self):
""" connect database """
try:
self.dbcon = pymysql.connect(
host=self.mysql_url,
user=self.user_name,
passwd=self.passwd,
db=self.DBname,
charset=self.charset)
self.cur = self.dbcon.cursor();
except pymysql.Error as e:
# database not exists, create it
con = pymysql.connect(
host=self.mysql_url,
user=self.user_name,
passwd=self.passwd,
charset=self.charset)
cur = con.cursor()
if cur.execute("CREATE DATABASE IF NOT EXISTS %s DEFAULT CHARSET 'utf8'" % self.DBname):
self.dbcon = pymysql.connect(
host=self.mysql_url,
user=self.user_name,
passwd=self.passwd,
db=self.DBname,
charset=self.charset)
self.cur = self.dbcon.cursor();
else:
raise e
开发者ID:JaeZheng,项目名称:GC,代码行数:28,代码来源:DBUtil.py
示例12: realTestPamAuth
def realTestPamAuth(self):
db = self.db.copy()
import os
db['password'] = os.environ.get('PASSWORD')
cur = self.connect().cursor()
try:
cur.execute('show grants for ' + TestAuthentication.osuser + '@localhost')
grants = cur.fetchone()[0]
cur.execute('drop user ' + TestAuthentication.osuser + '@localhost')
except pymysql.OperationalError as e:
# assuming the user doesn't exist which is ok too
self.assertEqual(1045, e.args[0])
grants = None
with TempUser(cur, TestAuthentication.osuser + '@localhost',
self.databases[0]['db'], 'pam', os.environ.get('PAMSERVICE')) as u:
try:
c = pymysql.connect(user=TestAuthentication.osuser, **db)
db['password'] = 'very bad guess at password'
with self.assertRaises(pymysql.err.OperationalError):
pymysql.connect(user=TestAuthentication.osuser,
auth_plugin_map={b'mysql_cleartext_password': TestAuthentication.DefectiveHandler},
**self.db)
except pymysql.OperationalError as e:
self.assertEqual(1045, e.args[0])
# we had 'bad guess at password' work with pam. Well at least we get a permission denied here
with self.assertRaises(pymysql.err.OperationalError):
pymysql.connect(user=TestAuthentication.osuser,
auth_plugin_map={b'mysql_cleartext_password': TestAuthentication.DefectiveHandler},
**self.db)
if grants:
# recreate the user
cur.execute(grants)
开发者ID:jilagan,项目名称:PyMySQL,代码行数:32,代码来源:test_connection.py
示例13: databaseSetup
def databaseSetup():
try:
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='test') # test DEFINATLEY exists
cur = conn.cursor()
cur.execute("CREATE DATABASE IF NOT EXISTS upStatsAPP") # DB name is now upStatsAPP
cur.close()
conn.close() # close test
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='upStatsAPP') # test DEFINATLEY exists
cur = conn.cursor() # open the 'newly created' upStatsAPP
cur.execute("DROP TABLE IF EXISTS CarStats")
cur.execute("CREATE TABLE IF NOT EXISTS CarStats(" +
"EventID TEXT, Rev TEXT, Gear TEXT, RPM TEXT, " +
"Speed TEXT, FRrpm TEXT, FLrpm TEXT, " +
"RRrp TEXT, RLrpm TEXT, Suspension1 TEXT, " +
"Suspension2 TEXT, Suspension3 TEXT, Suspension4 TEXT, GForceX TEXT, " +
"GForceY TEXT, AirTemp TEXT, Throttle TEXT, " +
"CoolentTemp TEXT, Battery TEXT, BatteryTemp TEXT, " +
"Lambda TEXT)") #SQL query for table setup
print("Database Reset Successful")
cur.close()
conn.close()
except (Exception):
print("Database Did Not Reset, Something Went Wrong")
开发者ID:djentleman,项目名称:Data_Retreval_Project,代码行数:30,代码来源:UPracing.py
示例14: getConnection
def getConnection(self, nodb=False):
"""
@rtype L{pymysqlConnection}
"""
if not self.startMysqld():
raise RuntimeError("Can't start mysqld server!")
if nodb:
conn = pymysql.connect(unix_socket=self.getSocket(), user=self.getUser(), passwd=self.getPassword())
else:
if self._conn:
return self._conn
conv = pymysql.converters.conversions.copy()
conv[246]=float # convert decimals to floats
conv[10]=str # convert dates to strings
conn = self._conn = pymysql.connect(
unix_socket=self.getSocket(),
user=self.getUser(),
passwd=self.getPassword(),
db=self.getDbName(),
conv=conv
)
self._conn.autocommit(self.getAutocommit())
cur = conn.cursor()
if not self.getAutocommit():
cur.execute("SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED")
cur.close()
return conn
开发者ID:sergey-dryabzhinsky,项目名称:dedupsqlfs,代码行数:33,代码来源:manager.py
示例15: connDB_plate
def connDB_plate():
if socket.gethostname() == 'hskj-backup250':
conn= pymysql.connect(host='210.14.134.77',port=13306,user='remote_query',passwd='20141024',db='super_plate',charset='utf8')
else:
conn= pymysql.connect(host='192.168.120.12',user='root',passwd='123456',db='provinces_test_un',charset='utf8')
cur = conn.cursor()
return (conn,cur)
开发者ID:jinyalin,项目名称:Myproject,代码行数:7,代码来源:views.py
示例16: main
def main(): # Performs getAvg for each item_id in the specified database ( 4223 item_ids in database )
totalValue = 0
passes = 0
db1 = pymysql.connect(credentials.host, credentials.username, credentials.password, "initial_item_list") # DB of item ids
cursor1 = db1.cursor() # creates cursor object
cursor1.execute("SELECT item_id FROM items") # Use as reference list for ids
db3 = pymysql.connect(credentials.host, credentials.username, credentials.password, "primary_scrape_test") # DB to iterate through
cursor3 = db3.cursor()
cursor3.execute("SHOW TABLES")
tables = cursor3.fetchall()
cursor3.execute("SHOW TABLES")
cursor3.execute("SELECT FOUND_ROWS()")
numTables = cursor3.fetchone()
numTables = formatString(numTables)
IDtoUse = cursor1.fetchone()
while IDtoUse is not None:
ID = formatString(IDtoUse)
getAvg(ID, tables, passes, numTables)
IDtoUse = cursor1.fetchone()
开发者ID:agodfrey3,项目名称:MerchBuddy,代码行数:26,代码来源:getAvg.py
示例17: _connect
def _connect(self, host, port, mysql_sock, user, password, defaults_file):
try:
import pymysql
except ImportError:
raise Exception("Cannot import pymysql module. Check the instructions "
"to install this module at https://app.datadoghq.com/account/settings#integrations/mysql")
if defaults_file != '':
db = pymysql.connect(read_default_file=defaults_file)
elif mysql_sock != '':
db = pymysql.connect(unix_socket=mysql_sock,
user=user,
passwd=password)
elif port:
db = pymysql.connect(host=host,
port=port,
user=user,
passwd=password)
else:
db = pymysql.connect(host=host,
user=user,
passwd=password)
self.log.debug("Connected to MySQL")
return db
开发者ID:anthonyjohnson,项目名称:dd-agent,代码行数:25,代码来源:mysql.py
示例18: connDB_yw
def connDB_yw(server,dbname,port):
if socket.gethostname() == 'hskj-backup250':
conn= pymysql.connect(host=server,port=port,user='remote_query',passwd='20141024',db=dbname,charset='utf8')
else:
conn= pymysql.connect(host='192.168.120.12',user='root',passwd='123456',db='sms_server',charset='utf8')
cur = conn.cursor()
return (conn,cur)
开发者ID:jinyalin,项目名称:Myproject,代码行数:7,代码来源:views.py
示例19: connect
def connect(self):
"""Connects to a database as set in `site_config.json`."""
warnings.filterwarnings('ignore', category=pymysql.Warning)
usessl = 0
if frappe.conf.db_ssl_ca and frappe.conf.db_ssl_cert and frappe.conf.db_ssl_key:
usessl = 1
self.ssl = {
'ca':frappe.conf.db_ssl_ca,
'cert':frappe.conf.db_ssl_cert,
'key':frappe.conf.db_ssl_key
}
conversions.update({
FIELD_TYPE.NEWDECIMAL: float,
FIELD_TYPE.DATETIME: get_datetime,
TimeDelta: conversions[binary_type],
UnicodeWithAttrs: conversions[text_type]
})
if usessl:
self._conn = pymysql.connect(self.host, self.user or '', self.password or '',
charset='utf8mb4', use_unicode = True, ssl=self.ssl, conv = conversions)
else:
self._conn = pymysql.connect(self.host, self.user or '', self.password or '',
charset='utf8mb4', use_unicode = True, conv = conversions)
# MYSQL_OPTION_MULTI_STATEMENTS_OFF = 1
# # self._conn.set_server_option(MYSQL_OPTION_MULTI_STATEMENTS_OFF)
self._cursor = self._conn.cursor()
if self.user != 'root':
self.use(self.user)
frappe.local.rollback_observers = []
开发者ID:dalwadani,项目名称:frappe,代码行数:33,代码来源:database.py
示例20: _connect
def _connect(self, host, port, mysql_sock, user, password, defaults_file):
try:
import pymysql
except ImportError:
raise Exception(
"Cannot import PyMySQl module. Check the instructions "
"to install this module at https://pypi.python.org/pypi/PyMySQL")
if defaults_file != '':
db = pymysql.connect(read_default_file=defaults_file)
elif mysql_sock != '':
db = pymysql.connect(unix_socket=mysql_sock,
user=user,
passwd=password)
elif port:
db = pymysql.connect(host=host,
port=port,
user=user,
passwd=password)
else:
db = pymysql.connect(host=host,
user=user,
passwd=password)
self.log.debug("Connected to MySQL")
return db
开发者ID:heekof,项目名称:monasca-agent,代码行数:26,代码来源:mysql.py
注:本文中的pymysql.connect函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论