本文整理汇总了Python中sshtunnel.SSHTunnelForwarder类的典型用法代码示例。如果您正苦于以下问题:Python SSHTunnelForwarder类的具体用法?Python SSHTunnelForwarder怎么用?Python SSHTunnelForwarder使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SSHTunnelForwarder类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: SSHTunnel
class SSHTunnel(object):
class TunnelException(Exception):
pass
def __init__(self, host, username, key_file, remote_port, host_port=nat_ssh_port_forwarding):
"""
Returns tuple consisting of local port and sshtunnel SSHTunnelForwarder object.
Caller must call stop() on object when finished
"""
logger = logging.getLogger('sshtunnel')
logger.setLevel(logging.ERROR)
try:
self._server = SSHTunnelForwarder((host, host_port),
ssh_username=username, ssh_private_key=key_file,
remote_bind_address=('127.0.0.1', remote_port), logger=logger)
except sshtunnel.BaseSSHTunnelForwarderError as e:
raise self.TunnelException(e)
def connect(self):
self._server.start()
self.local_port = self._server.local_bind_port
def close(self):
self._server.stop()
def __enter__(self):
self.connect()
return self
def __exit__(self, type, value, traceback):
self.close()
开发者ID:balramr,项目名称:clusterous,代码行数:33,代码来源:helpers.py
示例2: on_Btn_cups_clicked
def on_Btn_cups_clicked(self, widget, ip, usuario, password, puerto, notebook, spinner, estado):
ssh_path = os.environ['HOME'] + '/.ssh/id_rsa'
spinner.start()
estado.set_text("Creando tunel...")
puerto = int(puerto)
try:
#Borra en el archivo cupsd.conf la autentificacion
with settings(host_string=ip, port=puerto, password=password, user=usuario):
sudo('sed -i."old" "/Require user @SYSTEM/d" /etc/cups/cupsd.conf;sed -i."old2" "/AuthType Default/d" /etc/cups/cupsd.conf;/etc/init.d/cups reload')
server = SSHTunnelForwarder((ip, puerto), ssh_username=usuario, ssh_private_key=ssh_path, remote_bind_address=('127.0.0.1', 631))
server.start()
puerto_local = str(server.local_bind_port)
scrolledwindow = Gtk.ScrolledWindow()
scrolledwindow.set_hexpand(True)
scrolledwindow.set_vexpand(True)
page = WebKit.WebView()
page.set_border_width(10)
page.open("http://127.0.0.1:" + puerto_local)
scrolledwindow.add(page)
tab_label = tablabel.TabLabel("CUPS " + ip, Gtk.Image.new_from_file("/usr/share/grx/icons/cups32.png"))
tab_label.connect("close-clicked", tablabel.on_close_clicked, notebook, page)
notebook.append_page(scrolledwindow, tab_label)
self.show_all()
except:
self.mensaje("No se ha podido ejecutar cups", "Atencion", atencion)
spinner.stop()
estado.set_text("")
开发者ID:aavidad,项目名称:grx-asistencia,代码行数:27,代码来源:equipos_linux.py
示例3: create_ssh_tunnel
def create_ssh_tunnel(self, tunnel_password):
"""
This method is used to create ssh tunnel and update the IP Address and
IP Address and port to localhost and the local bind port return by the
SSHTunnelForwarder class.
:return: True if tunnel is successfully created else error message.
"""
# Fetch Logged in User Details.
user = User.query.filter_by(id=current_user.id).first()
if user is None:
return False, gettext("Unauthorized request.")
if tunnel_password is not None and tunnel_password != '':
try:
tunnel_password = decrypt(tunnel_password, user.password)
# Handling of non ascii password (Python2)
if hasattr(str, 'decode'):
tunnel_password = \
tunnel_password.decode('utf-8').encode('utf-8')
# password is in bytes, for python3 we need it in string
elif isinstance(tunnel_password, bytes):
tunnel_password = tunnel_password.decode()
except Exception as e:
current_app.logger.exception(e)
return False, "Failed to decrypt the SSH tunnel " \
"password.\nError: {0}".format(str(e))
try:
# If authentication method is 1 then it uses identity file
# and password
if self.tunnel_authentication == 1:
self.tunnel_object = SSHTunnelForwarder(
(self.tunnel_host, int(self.tunnel_port)),
ssh_username=self.tunnel_username,
ssh_pkey=get_complete_file_path(self.tunnel_identity_file),
ssh_private_key_password=tunnel_password,
remote_bind_address=(self.host, self.port)
)
else:
self.tunnel_object = SSHTunnelForwarder(
(self.tunnel_host, int(self.tunnel_port)),
ssh_username=self.tunnel_username,
ssh_password=tunnel_password,
remote_bind_address=(self.host, self.port)
)
self.tunnel_object.start()
self.tunnel_created = True
except BaseSSHTunnelForwarderError as e:
current_app.logger.exception(e)
return False, "Failed to create the SSH tunnel." \
"\nError: {0}".format(str(e))
# Update the port to communicate locally
self.local_bind_port = self.tunnel_object.local_bind_port
return True, None
开发者ID:asheshv,项目名称:pgadmin4,代码行数:58,代码来源:server_manager.py
示例4: sshtunnel
def sshtunnel():
server = SSHTunnelForwarder(
(current_app.config['SSH_HOST'],current_app.config['SSH_PORT']),
ssh_username=current_app.config['SSH_USER'],
ssh_password=current_app.config['SSH_PASSWORD'],
remote_bind_address=(current_app.config['SSH_REMOTE_HOST'], current_app.config['SSH_REMOTE_PORT'])
)
server.start()
return server.local_bind_port
开发者ID:ZUNbado,项目名称:pybearmon,代码行数:10,代码来源:sql.py
示例5: TunelSSH
class TunelSSH():
def __init__(self, ssh_address, ssh_port, ssh_username, ssh_password, remote_bind_address, remote_bind_port):
self.server = SSHTunnelForwarder(ssh_address=(ssh_address, ssh_port), ssh_username=ssh_username,
ssh_password=ssh_password, remote_bind_address=(remote_bind_address, remote_bind_port))
def Iniciar(self):
self.server.start()
return self.server.local_bind_port
def Cerrar(self):
self.server.stop()
开发者ID:procamora,项目名称:Librerias-Varias,代码行数:11,代码来源:ssh_forward.py
示例6: on_Btn_vncviewer_clicked
def on_Btn_vncviewer_clicked(self, widget, ip, usuario, puerto, password, clave_remoto, spinner, estado):
spinner.start()
estado.set_text("Conectando con el equipo")
ssh_path=os.environ['HOME'] + '/.ssh/id_rsa'
try:
puerto = int(puerto)
server = SSHTunnelForwarder((ip, puerto), ssh_username=usuario,ssh_private_key=ssh_path,remote_bind_address=(ip, 5900))
server.start()
puerto_local = str(server.local_bind_port)
msg = local('echo "' + clave_remoto + '" | vncviewer -autopass -compresslevel 9 -bgr233 127.0.0.1:' + puerto_local + ' &')
#msg=local('echo "'+clave_remoto+'" | vinagre -autopass -compresslevel 9 -bgr233 127.0.0.1:'+puerto_local+' &')
except:
self.mensaje("No se ha podido ejecutar 'vncviewer' en el equipo remoto", "Atencion", atencion)
spinner.stop()
estado.set_text("")
开发者ID:aavidad,项目名称:grx-asistencia,代码行数:16,代码来源:equipos_linux.py
示例7: dal_connect
def dal_connect():
server = SSHTunnelForwarder(
(settings.ssh_host, settings.ssh_port),
ssh_password=settings.ssh_password,
ssh_username=settings.ssh_username,
remote_bind_address=('127.0.0.1', 3306))
server.start()
uri = 'mysql://{username}:{password}@127.0.0.1:{port}/{db}'.format(
username = settings.mysql_username,
password = settings.mysql_password,
db = settings.mysql_dbname,
port = server.local_bind_port
)
db = DAL(uri, migrate = False)
return db
开发者ID:62mkv,项目名称:py-redmine-helper,代码行数:18,代码来源:redmine_pydal.py
示例8: create_tunnel_to_cdh_manager
def create_tunnel_to_cdh_manager(self, local_bind_address='localhost', local_bind_port=7180, remote_bind_port=7180):
self._local_bind_address = local_bind_address
self._local_bind_port = local_bind_port
self.cdh_manager_tunnel = SSHTunnelForwarder(
(self._hostname, self._hostport),
ssh_username=self._username,
local_bind_address=(local_bind_address, local_bind_port),
remote_bind_address=(self.extract_cdh_manager_host(), remote_bind_port),
ssh_private_key_password=self._key_password,
ssh_private_key=self._key
)
开发者ID:butla,项目名称:apployer,代码行数:11,代码来源:cdh_utilities.py
示例9: port_forward
def port_forward(self, port=PORT):
key = paramiko.RSAKey.from_private_key_file(
config.get('EC2', 'PrivateKeyFile')
)
self._forward = SSHTunnelForwarder(
ssh_address=(self.ip, 22),
ssh_username=config.get('EC2', 'User'),
ssh_private_key=key,
remote_bind_address=('127.0.0.1', PORT),
local_bind_address=('127.0.0.1', PORT)
)
self._forward.start()
开发者ID:ralph-group,项目名称:mucloud,代码行数:12,代码来源:mucloud.py
示例10: _portforward_agent_start
def _portforward_agent_start(self):
"""Setup local port forward to enable communication with the Needle server running on the device."""
self.printer.debug('{} Setting up port forwarding on port {}'.format(Constants.AGENT_TAG, self._agent_port))
localhost = '127.0.0.1'
self._port_forward_agent = SSHTunnelForwarder(
(self._ip, int(self._port)),
ssh_username=self._username,
ssh_password=self._password,
local_bind_address=(localhost, self._agent_port),
remote_bind_address=(localhost, self._agent_port),
)
self._port_forward_agent.start()
开发者ID:mwrlabs,项目名称:needle,代码行数:12,代码来源:device.py
示例11: _portforward_frida_start
def _portforward_frida_start(self):
"""Setup local port forward to enable communication with the Frida server running on the device."""
self.printer.debug('{} Setting up port forwarding on port {}'.format("[FRIDA]", Constants.FRIDA_PORT))
localhost = '127.0.0.1'
self._frida_server = SSHTunnelForwarder(
(self._ip, int(self._port)),
ssh_username=self._username,
ssh_password=self._password,
local_bind_address=(localhost, Constants.FRIDA_PORT),
remote_bind_address=(localhost, Constants.FRIDA_PORT),
)
self._frida_server.start()
开发者ID:mwrlabs,项目名称:needle,代码行数:12,代码来源:device.py
示例12: get_connection_params
def get_connection_params(self):
kwargs = super(DatabaseWrapper, self).get_connection_params()
host = kwargs['host']
port = kwargs['port']
config = self.settings_dict["TUNNEL_CONFIG"]
config['remote_bind_address'] = (host, port)
self.tunnel = SSHTunnelForwarder(**config)
self.tunnel.daemon_forward_servers = True
self.tunnel.daemon_transport = True
self.tunnel.start()
kwargs["host"] = '127.0.0.1'
kwargs['port'] = self.tunnel.local_bind_port
return kwargs
开发者ID:DrWrong,项目名称:django_extra,代码行数:13,代码来源:base.py
示例13: __init__
def __init__(self, host, username, key_file, remote_port, host_port=nat_ssh_port_forwarding):
"""
Returns tuple consisting of local port and sshtunnel SSHTunnelForwarder object.
Caller must call stop() on object when finished
"""
logger = logging.getLogger('sshtunnel')
logger.setLevel(logging.ERROR)
try:
self._server = SSHTunnelForwarder((host, host_port),
ssh_username=username, ssh_private_key=key_file,
remote_bind_address=('127.0.0.1', remote_port), logger=logger)
except sshtunnel.BaseSSHTunnelForwarderError as e:
raise self.TunnelException(e)
开发者ID:balramr,项目名称:clusterous,代码行数:14,代码来源:helpers.py
示例14: DatabaseWrapper
class DatabaseWrapper(MysqlDatabaseWrapper):
def __init__(self, *args, **kwargs):
super(DatabaseWrapper, self).__init__(*args, **kwargs)
self.tunnel = None
def get_connection_params(self):
kwargs = super(DatabaseWrapper, self).get_connection_params()
host = kwargs['host']
port = kwargs['port']
config = self.settings_dict["TUNNEL_CONFIG"]
config['remote_bind_address'] = (host, port)
self.tunnel = SSHTunnelForwarder(**config)
self.tunnel.daemon_forward_servers = True
self.tunnel.daemon_transport = True
self.tunnel.start()
kwargs["host"] = '127.0.0.1'
kwargs['port'] = self.tunnel.local_bind_port
return kwargs
def _close(self):
super(DatabaseWrapper, self)._close()
if self.tunnel is not None:
self.tunnel.stop()
开发者ID:DrWrong,项目名称:django_extra,代码行数:24,代码来源:base.py
示例15: __init__
def __init__(self):
ssh_tunnel = Credentials.ssh_tunnel
db_config = Credentials.vicnode_db
self._server = SSHTunnelForwarder(
((ssh_tunnel['host']), (int(ssh_tunnel['port']))),
ssh_password=ssh_tunnel['ssh_password'],
ssh_username=(ssh_tunnel['username']),
ssh_pkey=(ssh_tunnel['private_key_file']),
remote_bind_address=(db_config['host'], 5432),
allow_agent=False
)
self._server.start()
# we are about to bind to a 'local' server by means of an ssh tunnel
# ssh tunnel: which will be seen as a local server...
# so replace the loaded config host
db_config['host'] = 'localhost'
db_config['port'] = self._server.local_bind_port
self._db_connection = psycopg2.connect(**db_config)
self._db_cur = self._db_connection.cursor(
cursor_factory=psycopg2.extras.RealDictCursor)
self.test_connection()
开发者ID:MartinPaulo,项目名称:ReportsAlpha,代码行数:21,代码来源:bc_storage.py
示例16: DB
class DB(object):
"""
Read: https://colinnewell.wordpress.com/2016/01/21/hand-coding-sql-with-psycopg2-for-odoo/
"""
_db_connection = None
_db_cur = None
_server = None
def __init__(self):
db_config = Configuration.get_vicnode_db()
ssh_intermediate = Configuration.get_ssh_tunnel_info()
self._server = SSHTunnelForwarder(
((ssh_intermediate['host']), (int(ssh_intermediate['port']))),
ssh_password=ssh_intermediate['ssh_password'],
ssh_username=(ssh_intermediate['username']),
ssh_pkey=(ssh_intermediate['private_key_file']),
remote_bind_address=(db_config['host'], 5432),
allow_agent=False
)
self._server.start()
# we are about to bind to a 'local' server by means of an ssh tunnel
# ssh tunnel: which will be seen as a local server...
# so replace the loaded config host
db_config['host'] = 'localhost'
db_config['port'] = self._server.local_bind_port
self._db_connection = psycopg2.connect(**db_config)
self._db_cur = self._db_connection.cursor(
cursor_factory=psycopg2.extras.RealDictCursor)
self.test_connection()
def __del__(self):
self.close_connection()
def close_connection(self):
if self._server:
logging.info("Closing the VicNode DB connection")
# if the connection was not established, there will be no close()
# attribute...
self._db_connection.close()
logging.info("Stopping the ssh tunnel")
self._server.stop()
logging.info("The VicNode DB connection is closed")
self._server = None
@staticmethod
def get_product_code(product):
products = settings.STORAGE_PRODUCT_CODES
if product == COMPUTATIONAL:
products = settings.COMPUTE_PRODUCT_CODES
elif product == MARKET:
products = settings.MARKET_PRODUCT_CODES
elif product == VAULT:
products = settings.VAULT_MARKET_CODES
return products
@connection_required
def test_connection(self):
self._db_cur.execute("SELECT * FROM applications_suborganization;")
rows = self._db_cur.fetchall()
# print(rows)
@connection_required
def get_allocated(self, day_date):
"""
:param self:
:param day_date:
:return:
"""
q_allocated = """
SELECT
sum(size),
CASE
WHEN storage_product_id IN %(compute)s
THEN 'computational'
WHEN storage_product_id IN %(market)s
THEN 'market'
ELSE 'vault' END AS product
FROM applications_allocation
WHERE storage_product_id IN %(all_types)s
AND COALESCE(applications_allocation.creation_date, '2014-11-14' :: DATE) <
(%(day_date)s :: DATE + '1 day' :: INTERVAL)
GROUP BY storage_product_id;
"""
self._db_cur.execute(q_allocated, {
'compute': settings.COMPUTE_PRODUCT_CODES,
'market': settings.MARKET_PRODUCT_CODES,
'all_types': settings.STORAGE_PRODUCT_CODES,
'day_date': day_date
})
return self._db_cur.fetchall()
@connection_required
def get_allocated_by_faculty(self, day_date, product='all'):
"""
:param product:
:param self:
:param day_date:
:return:
"""
#.........这里部分代码省略.........
开发者ID:MartinPaulo,项目名称:ReportsAlpha,代码行数:101,代码来源:vicnode_db.py
示例17: Device
#.........这里部分代码省略.........
timeout = 30
endtime = time.time() + timeout
while not stdout.channel.eof_received:
time.sleep(1)
if time.time() > endtime:
stdout.channel.close()
break
# Paramiko Exec Command
stdin, stdout, stderr = self.ssh.exec_command(cmd)
hotfix_67()
# Parse STDOUT/ERR
out = stdout.readlines()
err = stderr.readlines()
if internal:
# For processing, don't display output
if err:
# Show error and abort run
err_str = ''.join(err)
raise Exception(err_str)
else:
# Display output
if out: map(lambda x: print('\t%s' % x, end=''), out)
if err: map(lambda x: print('\t%s%s%s' % (Colors.R, x, Colors.N), end=''), err)
return out, err
# ==================================================================================================================
# UTILS - AGENT
# ==================================================================================================================
def _portforward_agent_start(self):
"""Setup local port forward to enable communication with the Needle server running on the device."""
self.printer.debug('{} Setting up port forwarding on port {}'.format(Constants.AGENT_TAG, self._agent_port))
localhost = '127.0.0.1'
self._port_forward_agent = SSHTunnelForwarder(
(self._ip, int(self._port)),
ssh_username=self._username,
ssh_password=self._password,
local_bind_address=(localhost, self._agent_port),
remote_bind_address=(localhost, self._agent_port),
)
self._port_forward_agent.start()
def _portforward_agent_stop(self):
"""Stop local port forwarding for Needle server."""
self.printer.debug('{} Stopping port forwarding'.format(Constants.AGENT_TAG))
if self._port_forward_agent:
self._port_forward_agent.stop()
def _connect_agent(self):
self.agent.connect()
# Ensure the tunnel has been established (especially after auto-reconnecting)
self.agent.exec_command_agent(Constants.AGENT_CMD_OS_VERSION)
def _disconnect_agent(self):
self.agent.disconnect()
# ==================================================================================================================
# FRIDA PORT FORWARDING
# ==================================================================================================================
def _portforward_frida_start(self):
"""Setup local port forward to enable communication with the Frida server running on the device."""
self.printer.debug('{} Setting up port forwarding on port {}'.format("[FRIDA]", Constants.FRIDA_PORT))
localhost = '127.0.0.1'
self._frida_server = SSHTunnelForwarder(
(self._ip, int(self._port)),
ssh_username=self._username,
开发者ID:mwrlabs,项目名称:needle,代码行数:67,代码来源:device.py
示例18: getRemoteDB
def getRemoteDB(self):
# TODO: This needs to add new rows since the last update, rather than replace everything.
'''
# Get the local database last record timestamp
conn = pymysql.connect(host='127.0.0.1', port=3306, user=self.config['localusername'], passwd=self.config['localpassword'], db=self.config['localdbname'])
cur = conn.cursor()
cur.execute('SELECT Time_uploaded_to_server FROM remotedata ORDER BY Time_uploaded_to_server ASC;')
lts = cur.fetchone()
lastTimestamp = lts[0]
cur.close()
conn.close()
'''
# The database query
sql = """select enc.dateTime, enc.pushedToServerDateTime, enc.howFeeling, enc.takenMedsToday+0,
MAX(IF(Observation.question_id = "20140544-1bee-4d02-b764-d80102437adc", Observation.valueNumeric, NULL)) AS Nose,
MAX(IF(Observation.question_id = "22d7940f-eb30-42cd-b511-d0d65b89eec6", Observation.valueNumeric, NULL)) AS Eyes,
MAX(IF(Observation.question_id = "2cd9490f-d8ca-4f14-948a-1adcdf105fd0", Observation.valueNumeric, NULL)) AS Breathing,
demo.birthYear, demo.gender, demo.allergiesDivulged+0, demo.hayFever+0, demo.asthma+0, demo.otherAllergy+0,
demo.unknownAllergy+0, loc.latitude, loc.longitude, loc.accuracy, loc.whenObtained
from Encounter as enc inner join Observation on Observation.encounter_id = enc.id
inner join Question on Observation.question_id = Question.id join Demographics as demo on demo.id = enc.demographics_id
join LocationInfo as loc on loc.id = enc.locationInfo_id
group by enc.id;"""
# Open SSH tunnel
server = SSHTunnelForwarder(
(self.config['remotehostname'], 1522),
ssh_username=self.config['remoteusername'],
ssh_password=self.config['remotepassword'],
remote_bind_address=('127.0.0.1', 3306)
)
# Select data from remote database
server.start()
#print(server.local_bind_port)
#print(server.tunnel_is_up)
#print(server.local_is_up(('127.0.0.1', 3306)))
if server.is_alive:
print('Connection up...executing sql...')
#print(os.system('mysql -h 127.0.0.1 --port='+str(server.local_bind_port)+' -u'+self.config['remoteusername']+' -p'))
try:
conn = pymysql.connect(host='127.0.0.1', port=server.local_bind_port, user=self.config['remoteusername'], passwd=self.config['remotedbpassword'], db=self.config['remotedbname'])
cur = conn.cursor()
cur.execute(sql)
cur.close()
conn.close()
except pymysql.err.OperationalError:
print('MySQL failed...exiting.')
server.stop()
sys.exit(0)
# Close the ssh tunnel
server.stop()
# Update local database
lconn = pymysql.connect(host='127.0.0.1', port=3306, user=self.config['localusername'], passwd=self.config['localpassword'], db=self.config['localdbname'], autocommit=True)
lcur = lconn.cursor()
lcur.execute('TRUNCATE TABLE remotedata;')
rowcount = 0
for row in cur:
#print(row[0])
sql = """INSERT INTO remotedata (id, Time_answered_on_phone, Time_uploaded_to_server, How_feeling, Taken_meds_today, Nose, Eyes, Breathing, Year_of_Birth, Gender, Optional_data_shared, hay_fever, asthma, other_allergy, unknown, latitude, longitude, accuracy, time_of_location_fix) VALUES ('', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}');
""".format(row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11], row[12], row[13], row[14], row[15], row[16], row[17])
# Update the remotedata table
lcur.execute(sql)
# Update the row count
rowcount = rowcount+1
print('Rows processed: ', rowcount)
# Done
lcur.close()
lconn.close()
# DB sync complete, update the postcodes
self.addPostcodesToDB()
开发者ID:robdunne-uom,项目名称:bb-data-viz,代码行数:84,代码来源:api.py
示例19: __init__
def __init__(self, ssh_address, ssh_port, ssh_username, ssh_password, remote_bind_address, remote_bind_port):
self.server = SSHTunnelForwarder(ssh_address=(ssh_address, ssh_port), ssh_username=ssh_username,
ssh_password=ssh_password, remote_bind_address=(remote_bind_address, remote_bind_port))
开发者ID:procamora,项目名称:Librerias-Varias,代码行数:3,代码来源:ssh_forward.py
示例20: __init__
class StorageDB:
_db_connection = None
_db_cur = None
def __init__(self):
ssh_tunnel = Credentials.ssh_tunnel
db_config = Credentials.vicnode_db
self._server = SSHTunnelForwarder(
((ssh_tunnel['host']), (int(ssh_tunnel['port']))),
ssh_password=ssh_tunnel['ssh_password'],
ssh_username=(ssh_tunnel['username']),
ssh_pkey=(ssh_tunnel['private_key_file']),
remote_bind_address=(db_config['host'], 5432),
allow_agent=False
)
self._server.start()
# we are about to bind to a 'local' server by means of an ssh tunnel
# ssh tunnel: which will be seen as a local server...
# so replace the loaded config host
db_config['host'] = 'localhost'
db_config['port'] = self._server.local_bind_port
self._db_connection = psycopg2.connect(**db_config)
self._db_cur = self._db_connection.cursor(
cursor_factory=psycopg2.extras.RealDictCursor)
self.test_connection()
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close_connection()
def __del__(self):
self.close_connection()
def close_connection(self):
if self._server:
self._db_connection.close()
self._server.stop()
self._server = None
def test_connection(self):
self._db_cur.execute("SELECT * FROM applications_suborganization;")
rows = self._db_cur.fetchall()
def get_contacts(self):
"""
Returns: every project name and it's chief investigator (can be more
than one).
"""
query = """
SELECT
collection.id,
contacts_contact.first_name,
contacts_contact.last_name,
contacts_contact.email_address,
contacts_contact.business_email_address
FROM applications_project AS collection
LEFT JOIN applications_custodian
ON collection_id = collection.id
LEFT JOIN contacts_contact
ON applications_custodian.person_id = contacts_contact.id
WHERE applications_custodian.role_id = 293
ORDER BY id;
"""
self._db_cur.execute(query)
return self._db_cur.fetchall()
def get_sub_organizations(self):
"""
Returns: all the projects that have a suborganization
"""
query = """
SELECT
applications_allocation.collection_id AS project_id,
CASE
WHEN applications_suborganization.id = 1
THEN 'ABP'
WHEN applications_suborganization.id = 2
THEN 'FBE'
WHEN applications_suborganization.id = 3
THEN 'FoA'
WHEN applications_suborganization.id = 4
THEN 'MGSE'
WHEN applications_suborganization.id = 5
THEN 'MSE'
WHEN applications_suborganization.id = 6
THEN 'MLS'
WHEN applications_suborganization.id = 7
THEN 'MDHS'
WHEN applications_suborganization.id = 8
THEN 'FoS'
WHEN applications_suborganization.id = 9
THEN 'VAS'
WHEN applications_suborganization.id = 10
THEN 'VCAMCM'
WHEN applications_suborganization.id = 11
THEN 'Services'
ELSE 'Unknown' END AS faculty
FROM applications_allocation
#.........这里部分代码省略.........
开发者ID:MartinPaulo,项目名称:ReportsAlpha,代码行数:101,代码来源:storage_yearly_usage.py
注:本文中的sshtunnel.SSHTunnelForwarder类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论