本文整理汇总了Python中reviewboard.ssh.client.SSHClient类的典型用法代码示例。如果您正苦于以下问题:Python SSHClient类的具体用法?Python SSHClient怎么用?Python SSHClient使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SSHClient类的18个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: ssh_settings
def ssh_settings(request, template_name='admin/ssh_settings.html'):
client = SSHClient()
key = client.get_user_key()
if request.method == 'POST':
form = SSHSettingsForm(request.POST, request.FILES)
if form.is_valid():
if form.did_request_delete() and client.get_user_key() is not None:
try:
form.delete()
return HttpResponseRedirect('.')
except Exception as e:
logging.error('Deleting SSH key failed: %s' % e)
else:
try:
form.create(request.FILES)
return HttpResponseRedirect('.')
except Exception as e:
# Fall through. It will be reported inline and in the log.
logging.error('Uploading SSH key failed: %s' % e)
else:
form = SSHSettingsForm()
if key:
fingerprint = humanize_key(key)
else:
fingerprint = None
return render_to_response(template_name, RequestContext(request, {
'key': key,
'fingerprint': fingerprint,
'public_key': client.get_public_key(key),
'form': form,
}))
开发者ID:karthikkn,项目名称:reviewboard,代码行数:35,代码来源:views.py
示例2: _check_repository
def _check_repository(self, scmtool_class, path, username, password,
local_site, trust_host, ret_cert, request):
if local_site:
local_site_name = local_site.name
else:
local_site_name = None
while 1:
# Keep doing this until we have an error we don't want
# to ignore, or it's successful.
try:
scmtool_class.check_repository(path, username, password,
local_site_name)
return None
except RepositoryNotFoundError:
return MISSING_REPOSITORY
except BadHostKeyError, e:
if trust_host:
try:
client = SSHClient(namespace=local_site_name)
client.replace_host_key(e.hostname,
e.raw_expected_key,
e.raw_key)
except IOError, e:
return SERVER_CONFIG_ERROR, {
'reason': str(e),
}
else:
return BAD_HOST_KEY, {
'hostname': e.hostname,
'expected_key': e.raw_expected_key.get_base64(),
'key': e.raw_key.get_base64(),
}
开发者ID:harrifeng,项目名称:reviewboard,代码行数:33,代码来源:repository.py
示例3: test_import_user_key
def test_import_user_key(self, namespace=None):
"""Testing SSHClient.import_user_key"""
self._set_home(self.tempdir)
client = SSHClient(namespace=namespace)
client.import_user_key(self.key1)
self.assertEqual(client.get_user_key(), self.key1)
开发者ID:CrystalLokKoo,项目名称:reviewboard,代码行数:7,代码来源:tests.py
示例4: test_generate_user_key
def test_generate_user_key(self, namespace=None):
"""Testing SSHClient.generate_user_key"""
self._set_home(self.tempdir)
client = SSHClient(namespace=namespace)
key = client.generate_user_key(bits=1024)
key_file = os.path.join(client.storage.get_ssh_dir(), 'id_rsa')
self.assertTrue(os.path.exists(key_file))
self.assertEqual(client.get_user_key(), key)
开发者ID:CrystalLokKoo,项目名称:reviewboard,代码行数:9,代码来源:tests.py
示例5: check_host
def check_host(hostname, username=None, password=None, namespace=None):
"""
Checks if we can connect to a host with a known key.
This will raise an exception if we cannot connect to the host. The
exception will be one of BadHostKeyError, UnknownHostKeyError, or
SCMError.
"""
from django.conf import settings
client = SSHClient(namespace=namespace)
client.set_missing_host_key_policy(RaiseUnknownHostKeyPolicy())
kwargs = {}
# We normally want to notify on unknown host keys, but not when running
# unit tests.
if getattr(settings, 'RUNNING_TEST', False):
client.set_missing_host_key_policy(paramiko.WarningPolicy())
kwargs['allow_agent'] = False
try:
client.connect(hostname, username=username, password=password,
pkey=client.get_user_key(), **kwargs)
except paramiko.BadHostKeyException, e:
raise BadHostKeyError(e.hostname, e.key, e.expected_key)
开发者ID:FishingCactus,项目名称:reviewboard,代码行数:26,代码来源:utils.py
示例6: ssh_settings
def ssh_settings(request, template_name='admin/ssh_settings.html'):
client = SSHClient()
key = client.get_user_key()
if request.method == 'POST':
form = SSHSettingsForm(request.POST, request.FILES)
if form.is_valid():
try:
form.create(request.FILES)
return HttpResponseRedirect('.')
except Exception, e:
# Fall through. It will be reported inline and in the log.
logging.error('Uploading SSH key failed: %s' % e)
开发者ID:aam1r,项目名称:reviewboard,代码行数:14,代码来源:views.py
示例7: check_host
def check_host(netloc, username=None, password=None, namespace=None):
"""
Checks if we can connect to a host with a known key.
This will raise an exception if we cannot connect to the host. The
exception will be one of BadHostKeyError, UnknownHostKeyError, or
SCMError.
"""
from django.conf import settings
client = SSHClient(namespace=namespace)
client.set_missing_host_key_policy(RaiseUnknownHostKeyPolicy())
kwargs = {}
if ':' in netloc:
hostname, port = netloc.split(':')
try:
port = int(port)
except ValueError:
raise SSHInvalidPortError(port)
else:
hostname = netloc
port = SSH_PORT
# We normally want to notify on unknown host keys, but not when running
# unit tests.
if getattr(settings, 'RUNNING_TEST', False):
client.set_missing_host_key_policy(paramiko.WarningPolicy())
kwargs['allow_agent'] = False
try:
client.connect(hostname, port, username=username, password=password,
pkey=client.get_user_key(), **kwargs)
except paramiko.BadHostKeyException as e:
raise BadHostKeyError(e.hostname, e.key, e.expected_key)
except paramiko.AuthenticationException as e:
# Some AuthenticationException instances have allowed_types set,
# and some don't.
allowed_types = getattr(e, 'allowed_types', [])
if 'publickey' in allowed_types:
key = client.get_user_key()
else:
key = None
raise SSHAuthenticationError(allowed_types=allowed_types, user_key=key)
except paramiko.SSHException as e:
msg = six.text_type(e)
if msg == 'No authentication methods available':
raise SSHAuthenticationError
else:
raise SSHError(msg)
开发者ID:chipx86,项目名称:reviewboard,代码行数:54,代码来源:utils.py
示例8: test_add_host_key
def test_add_host_key(self, namespace=None):
"""Testing SSHClient.add_host_key"""
self._set_home(self.tempdir)
client = SSHClient(namespace=namespace)
client.add_host_key('example.com', self.key1)
known_hosts_file = client.storage.get_host_keys_filename()
self.assertTrue(os.path.exists(known_hosts_file))
with open(known_hosts_file, 'r') as f:
lines = f.readlines()
self.assertEqual(len(lines), 1)
self.assertEqual(lines[0].split(),
['example.com', self.key1.get_name(), self.key1_b64])
开发者ID:CrystalLokKoo,项目名称:reviewboard,代码行数:16,代码来源:tests.py
示例9: setUp
def setUp(self):
super(SSHSettingsFormTestCase, self).setUp()
# Setup temp directory to prevent the original ssh related
# configurations been overwritten.
self.old_home = os.getenv('HOME')
self.tempdir = tempfile.mkdtemp(prefix='rb-tests-home-')
os.environ['RBSSH_ALLOW_AGENT'] = '0'
self._set_home(self.tempdir)
self.ssh_client = SSHClient()
开发者ID:Greyhatno,项目名称:reviewboard,代码行数:11,代码来源:tests.py
示例10: setUp
def setUp(self):
# Setup temp directory to prevent the original ssh related
# configurations been overwritten.
self.old_home = os.getenv('HOME')
self.tempdir = tempfile.mkdtemp(prefix='rb-tests-home-')
os.environ['RBSSH_ALLOW_AGENT'] = '0'
self._set_home(self.tempdir)
# Init client for http request, ssh_client for ssh config manipulation.
self.client = Client()
self.ssh_client = SSHClient()
开发者ID:B-Rich,项目名称:reviewboard,代码行数:11,代码来源:tests.py
示例11: test_delete_user_key
def test_delete_user_key(self, namespace=None):
"""Testing SSHClient.delete_user_key"""
self._set_home(self.tempdir)
client = SSHClient(namespace=namespace)
client.import_user_key(self.key1)
key_file = os.path.join(client.storage.get_ssh_dir(), 'id_rsa')
self.assertTrue(os.path.exists(key_file))
self.assertEqual(client.get_user_key(), self.key1)
client.delete_user_key()
self.assertFalse(os.path.exists(key_file))
开发者ID:CrystalLokKoo,项目名称:reviewboard,代码行数:13,代码来源:tests.py
示例12: __init__
#.........这里部分代码省略.........
for hosting_service_id, hosting_service in get_hosting_services():
if hosting_service.supports_repositories:
hosting_service_choices.append((hosting_service_id,
hosting_service.name))
if hosting_service.supports_bug_trackers:
bug_tracker_choices.append((hosting_service_id,
hosting_service.name))
self.bug_tracker_forms[hosting_service_id] = {}
self.repository_forms[hosting_service_id] = {}
self.hosting_service_info[hosting_service_id] = {
'scmtools': hosting_service.supported_scmtools,
'plans': [],
'planInfo': {},
'self_hosted': hosting_service.self_hosted,
'needs_authorization': hosting_service.needs_authorization,
'supports_bug_trackers': hosting_service.supports_bug_trackers,
'supports_ssh_key_association':
hosting_service.supports_ssh_key_association,
'accounts': [
{
'pk': account.pk,
'hosting_url': account.hosting_url,
'username': account.username,
'is_authorized': account.is_authorized,
}
for account in hosting_accounts
if account.service_name == hosting_service_id
],
}
try:
if hosting_service.plans:
for type_id, info in hosting_service.plans:
form = info.get('form', None)
if form:
self._load_hosting_service(hosting_service_id,
hosting_service,
type_id,
info['name'],
form,
*args, **kwargs)
elif hosting_service.form:
self._load_hosting_service(hosting_service_id,
hosting_service,
self.DEFAULT_PLAN_ID,
self.DEFAULT_PLAN_NAME,
hosting_service.form,
*args, **kwargs)
except Exception as e:
logging.error('Error loading hosting service %s: %s'
% (hosting_service_id, e),
exc_info=1)
# Build the list of hosting service choices, sorted, with
# "None" being first.
hosting_service_choices.sort(key=lambda x: x[1])
hosting_service_choices.insert(0, (self.NO_HOSTING_SERVICE_ID,
self.NO_HOSTING_SERVICE_NAME))
self.fields['hosting_type'].choices = hosting_service_choices
# Now do the same for bug trackers, but have separate None and Custom
# entries.
bug_tracker_choices.sort(key=lambda x: x[1])
bug_tracker_choices.insert(0, (self.NO_BUG_TRACKER_ID,
self.NO_BUG_TRACKER_NAME))
bug_tracker_choices.insert(1, (self.CUSTOM_BUG_TRACKER_ID,
self.CUSTOM_BUG_TRACKER_NAME))
self.fields['bug_tracker_type'].choices = bug_tracker_choices
# Get the current SSH public key that would be used for repositories,
# if one has been created.
self.ssh_client = SSHClient(namespace=self.local_site_name)
ssh_key = self.ssh_client.get_user_key()
if ssh_key:
self.public_key = self.ssh_client.get_public_key(ssh_key)
self.public_key_str = '%s %s' % (
ssh_key.get_name(),
''.join(str(self.public_key).splitlines())
)
else:
self.public_key = None
self.public_key_str = ''
# If no SSH key has been created, disable the key association field.
if not self.public_key:
self.fields['associate_ssh_key'].help_text = \
self.NO_KEY_HELP_FMT % local_site_reverse(
'settings-ssh',
local_site_name=self.local_site_name)
self.fields['associate_ssh_key'].widget.attrs['disabled'] = \
'disabled'
if self.instance:
self._populate_repository_info_fields()
self._populate_hosting_service_fields()
self._populate_bug_tracker_fields()
开发者ID:prodigeni,项目名称:reviewboard,代码行数:101,代码来源:forms.py
示例13: _test_ssh_with_site
def _test_ssh_with_site(self, repo_path, filename=None):
"""Helper for testing an SSH connection and using a Local Site.
This will attempt to SSH into the local machine and connect to the
given repository, using an SSH key and repository based on a Local
Site. It will check the repository for validity and optionally fetch
a file.
If this is unable to connect to the local machine, the test will be
flagged as skipped.
Args:
repo_path (unicode):
The repository path to check.
filename (unicode, optional):
The optional file in the repository to fetch.
"""
self._check_can_test_ssh()
# Get the user's .ssh key, for use in the tests
user_key = self.ssh_client.get_user_key()
self.assertNotEqual(user_key, None)
# Switch to a new SSH directory.
self.tempdir = mkdtemp(prefix='rb-tests-home-')
sshdir = os.path.join(self.tempdir, '.ssh')
self._set_home(self.tempdir)
self.assertEqual(sshdir, self.ssh_client.storage.get_ssh_dir())
self.assertFalse(os.path.exists(os.path.join(sshdir, 'id_rsa')))
self.assertFalse(os.path.exists(os.path.join(sshdir, 'id_dsa')))
self.assertEqual(self.ssh_client.get_user_key(), None)
tool_class = self.repository.tool
# Make sure we aren't using the old SSH key. We want auth errors.
repo = Repository(name='SSH Test', path=repo_path, tool=tool_class)
tool = repo.get_scmtool()
self.assertRaises(AuthenticationError,
lambda: tool.check_repository(repo_path))
if filename:
self.assertRaises(SCMError,
lambda: tool.get_file(filename, HEAD))
for local_site_name in ('site-1',):
local_site = LocalSite(name=local_site_name)
local_site.save()
repo = Repository(name='SSH Test', path=repo_path, tool=tool_class,
local_site=local_site)
tool = repo.get_scmtool()
ssh_client = SSHClient(namespace=local_site_name)
self.assertEqual(ssh_client.storage.get_ssh_dir(),
os.path.join(sshdir, local_site_name))
ssh_client.import_user_key(user_key)
self.assertEqual(ssh_client.get_user_key(), user_key)
# Make sure we can verify the repository and access files.
tool.check_repository(repo_path, local_site_name=local_site_name)
if filename:
self.assertNotEqual(tool.get_file(filename, HEAD), None)
开发者ID:chipx86,项目名称:reviewboard,代码行数:65,代码来源:testcases.py
示例14: SSHSettingsFormTestCase
class SSHSettingsFormTestCase(TestCase):
"""Unit tests for SSHSettingsForm in /admin/forms.py"""
fixtures = ['test_users']
def setUp(self):
super(SSHSettingsFormTestCase, self).setUp()
# Setup temp directory to prevent the original ssh related
# configurations been overwritten.
self.old_home = os.getenv('HOME')
self.tempdir = tempfile.mkdtemp(prefix='rb-tests-home-')
os.environ['RBSSH_ALLOW_AGENT'] = '0'
self._set_home(self.tempdir)
self.ssh_client = SSHClient()
def tearDown(self):
super(SSHSettingsFormTestCase, self).tearDown()
self._set_home(self.old_home)
if self.tempdir:
shutil.rmtree(self.tempdir)
def _set_home(self, homedir):
os.environ['HOME'] = homedir
def test_generate_key(self):
"""Testing SSHSettingsForm POST with generate_key=1"""
# Should have no ssh key at this point.
self.assertEqual(self.ssh_client.get_user_key(), None)
# Send post request with 'generate_key' = 1.
self.client.login(username='admin', password='admin')
response = self.client.post(local_site_reverse('settings-ssh'), {
'generate_key': 1,
})
# On success, the form returns HTTP 302 (redirect).
self.assertEqual(response.status_code, 302)
# Check whether the key has been created.
self.assertNotEqual(self.ssh_client.get_user_key(), None)
def test_delete_key(self):
"""Testing SSHSettingsForm POST with delete_key=1"""
# Should have no ssh key at this point, generate one.
self.assertEqual(self.ssh_client.get_user_key(), None)
self.ssh_client.generate_user_key()
self.assertNotEqual(self.ssh_client.get_user_key(), None)
# Send post request with 'delete_key' = 1.
self.client.login(username='admin', password='admin')
response = self.client.post(local_site_reverse('settings-ssh'), {
'delete_key': 1,
})
# On success, the form returns HTTP 302 (redirect).
self.assertEqual(response.status_code, 302)
# Check whether the key has been deleted.
self.assertEqual(self.ssh_client.get_user_key(), None)
开发者ID:Greyhatno,项目名称:reviewboard,代码行数:62,代码来源:tests.py
示例15: RepositoryForm
#.........这里部分代码省略.........
info['name'],
form,
*args, **kwargs)
elif hosting_service.form:
self._load_hosting_service(hosting_service_id,
hosting_service,
self.DEFAULT_PLAN_ID,
self.DEFAULT_PLAN_NAME,
hosting_service.form,
*args, **kwargs)
except Exception as e:
logging.error('Error loading hosting service %s: %s'
% (hosting_service_id, e),
exc_info=1)
# Build the list of hosting service choices, sorted, with
# "None" being first.
hosting_service_choices.sort(key=lambda x: x[1])
hosting_service_choices.insert(0, (self.NO_HOSTING_SERVICE_ID,
self.NO_HOSTING_SERVICE_NAME))
self.fields['hosting_type'].choices = hosting_service_choices
# Now do the same for bug trackers, but have separate None and Custom
# entries.
bug_tracker_choices.sort(key=lambda x: x[1])
bug_tracker_choices.insert(0, (self.NO_BUG_TRACKER_ID,
self.NO_BUG_TRACKER_NAME))
bug_tracker_choices.insert(1, (self.CUSTOM_BUG_TRACKER_ID,
self.CUSTOM_BUG_TRACKER_NAME))
self.fields['bug_tracker_type'].choices = bug_tracker_choices
# Get the current SSH public key that would be used for repositories,
# if one has been created.
self.ssh_client = SSHClient(namespace=self.local_site_name)
ssh_key = self.ssh_client.get_user_key()
if ssh_key:
self.public_key = self.ssh_client.get_public_key(ssh_key)
self.public_key_str = '%s %s' % (
ssh_key.get_name(),
''.join(str(self.public_key).splitlines())
)
else:
self.public_key = None
self.public_key_str = ''
# If no SSH key has been created, disable the key association field.
if not self.public_key:
self.fields['associate_ssh_key'].help_text = \
self.NO_KEY_HELP_FMT % local_site_reverse(
'settings-ssh',
local_site_name=self.local_site_name)
self.fields['associate_ssh_key'].widget.attrs['disabled'] = \
'disabled'
if self.instance:
self._populate_repository_info_fields()
self._populate_hosting_service_fields()
self._populate_bug_tracker_fields()
def _load_hosting_service(self, hosting_service_id, hosting_service,
repo_type_id, repo_type_label, form_class,
*args, **kwargs):
"""Loads a hosting service form.
The form will be instantiated and added to the list of forms to be
开发者ID:prodigeni,项目名称:reviewboard,代码行数:67,代码来源:forms.py
示例16: _check_repository
def _check_repository(self, scmtool_class, path, username, password,
local_site, trust_host, ret_cert, request):
if local_site:
local_site_name = local_site.name
else:
local_site_name = None
while 1:
# Keep doing this until we have an error we don't want
# to ignore, or it's successful.
try:
scmtool_class.check_repository(path, username, password,
local_site_name)
return None
except RepositoryNotFoundError:
return MISSING_REPOSITORY
except BadHostKeyError as e:
if trust_host:
try:
client = SSHClient(namespace=local_site_name)
client.replace_host_key(e.hostname,
e.raw_expected_key,
e.raw_key)
except IOError as e:
return SERVER_CONFIG_ERROR, {
'reason': six.text_type(e),
}
else:
return BAD_HOST_KEY, {
'hostname': e.hostname,
'expected_key': e.raw_expected_key.get_base64(),
'key': e.raw_key.get_base64(),
}
except UnknownHostKeyError as e:
if trust_host:
try:
client = SSHClient(namespace=local_site_name)
client.add_host_key(e.hostname, e.raw_key)
except IOError as e:
return SERVER_CONFIG_ERROR, {
'reason': six.text_type(e),
}
else:
return UNVERIFIED_HOST_KEY, {
'hostname': e.hostname,
'key': e.raw_key.get_base64(),
}
except UnverifiedCertificateError as e:
if trust_host:
try:
cert = scmtool_class.accept_certificate(
path, local_site_name)
if cert:
ret_cert.update(cert)
except IOError as e:
return SERVER_CONFIG_ERROR, {
'reason': six.text_type(e),
}
else:
return UNVERIFIED_HOST_CERT, {
'certificate': {
'failures': e.certificate.failures,
'fingerprint': e.certificate.fingerprint,
'hostname': e.certificate.hostname,
'issuer': e.certificate.issuer,
'valid': {
'from': e.certificate.valid_from,
'until': e.certificate.valid_until,
},
},
}
except AuthenticationError as e:
if 'publickey' in e.allowed_types and e.user_key is None:
return MISSING_USER_KEY
else:
return REPO_AUTHENTICATION_ERROR, {
'reason': six.text_type(e),
}
except SSHError as e:
logging.error('Got unexpected SSHError when checking '
'repository: %s'
% e, exc_info=1, request=request)
return REPO_INFO_ERROR, {
'error': six.text_type(e),
}
except SCMError as e:
logging.error('Got unexpected SCMError when checking '
'repository: %s'
% e, exc_info=1, request=request)
return REPO_INFO_ERROR, {
'error': six.text_type(e),
}
except Exception as e:
logging.error('Unknown error in checking repository %s: %s',
path, e, exc_info=1, request=request)
# We should give something better, but I don't have anything.
# This will at least give a HTTP 500.
raise
开发者ID:CharanKamal-CLI,项目名称:reviewboard,代码行数:100,代码来源:repository.py
示例17: main
def main():
"""Run the application."""
if DEBUG:
pid = os.getpid()
log_filename = 'rbssh-%s.log' % pid
if DEBUG_LOGDIR:
log_path = os.path.join(DEBUG_LOGDIR, log_filename)
else:
log_path = log_filename
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(name)-18s %(levelname)-8s '
'%(message)s',
datefmt='%m-%d %H:%M',
filename=log_path,
filemode='w')
logging.debug('%s' % sys.argv)
logging.debug('PID %s' % pid)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(logging.Formatter('%(message)s'))
ch.addFilter(logging.Filter('root'))
logging.getLogger('').addHandler(ch)
path, port, command = parse_options(sys.argv[1:])
if '://' not in path:
path = 'ssh://' + path
username, hostname = SCMTool.get_auth_from_uri(path, options.username)
if username is None:
username = getpass.getuser()
logging.debug('!!! %s, %s, %s' % (hostname, username, command))
client = SSHClient(namespace=options.local_site_name)
client.set_missing_host_key_policy(paramiko.WarningPolicy())
attempts = 0
password = None
key = client.get_user_key()
while True:
try:
client.connect(hostname, port, username=username,
password=password, pkey=key,
allow_agent=options.allow_agent)
break
except paramiko.AuthenticationException as e:
if attempts == 3 or not sys.stdin.isatty():
logging.error('Too many authentication failures for %s' %
username)
sys.exit(1)
attempts += 1
password = getpass.getpass("%[email protected]%s's password: " %
(username, hostname))
except paramiko.SSHException as e:
logging.error('Error connecting to server: %s' % e)
sys.exit(1)
except Exception as e:
logging.error('Unknown exception during connect: %s (%s)' %
(e, type(e)))
sys.exit(1)
transport = client.get_transport()
channel = transport.open_session()
if sys.platform in ('cygwin', 'win32'):
logging.debug('!!! Using WindowsHandler')
handler = WindowsHandler(channel)
else:
logging.debug('!!! Using PosixHandler')
handler = PosixHandler(channel)
if options.subsystem == 'sftp':
logging.debug('!!! Invoking sftp subsystem')
channel.invoke_subsystem('sftp')
handler.transfer()
elif command:
logging.debug('!!! Sending command %s' % command)
channel.exec_command(' '.join(command))
handler.transfer()
else:
logging.debug('!!! Opening shell')
channel.get_pty()
channel.invoke_shell()
handler.shell()
logging.debug('!!! Done')
status = channel.recv_exit_status()
client.close()
return status
开发者ID:Anastasiya2307,项目名称:reviewboard,代码行数:99,代码来源:rbssh.py
示例18: main
def main():
if DEBUG:
pid = os.getpid()
log_filename = 'rbssh-%s.log' % pid
if DEBUG_LOGDIR:
log_path = os.path.join(DEBUG_LOGDIR, log_filename)
else:
log_path = log_filename
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(name)-18s %(levelname)-8s '
'%(message)s',
datefmt='%m-%d %H:%M',
filename=log_path,
filemode='w')
logging.debug('%s' % sys.argv)
logging.debug('PID %s' % pid)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(logging.Formatter('%(message)s'))
ch.addFilter(logging.Filter('root'))
logging.getLogger('').addHandler(ch)
path, port, command = parse_options(sys.argv[1:])
if '://' not in path:
path = 'ssh://' + path
username, hostname = SCMTool.get_auth_from_uri(path, options.username)
if username is None:
username = getpass.getuser()
logging.debug('!!! %s, %s, %s' % (hostname, username, command))
client = SSHClient(namespace=options.local_site_name)
client.set_missing_host_key_policy(paramiko.WarningPolicy())
attempts = 0
password = None
key = client.get_user_key()
while True:
try:
client.connect(hostname, port, username=username, password=password,
pkey=key, allow_agent=options.allow_agent)
break
except paramiko.AuthenticationException, e:
if attempts == 3 or not sys.stdin.isatty():
logging.error('Too many authentication failures for %s' %
username)
sys.exit(1)
attempts += 1
password = getpass.getpass("%[email protected]%s's password: " %
(username, hostname))
except paramiko.SSHException, e:
logging.error('Error connecting to server: %s' % e)
sys.exit(1)
开发者ID:chicken-rice,项目名称:reviewboard,代码行数:63,代码来源:rbssh.py
注:本文中的reviewboard.ssh.client.SSHClient类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论