• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python client.SSHClient类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中reviewboard.ssh.client.SSHClient的典型用法代码示例。如果您正苦于以下问题:Python SSHClient类的具体用法?Python SSHClient怎么用?Python SSHClient使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了SSHClient类的18个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: ssh_settings

def ssh_settings(request, template_name='admin/ssh_settings.html'):
    client = SSHClient()
    key = client.get_user_key()

    if request.method == 'POST':
        form = SSHSettingsForm(request.POST, request.FILES)

        if form.is_valid():
            if form.did_request_delete() and client.get_user_key() is not None:
                try:
                    form.delete()
                    return HttpResponseRedirect('.')
                except Exception as e:
                    logging.error('Deleting SSH key failed: %s' % e)
            else:
                try:
                    form.create(request.FILES)
                    return HttpResponseRedirect('.')
                except Exception as e:
                    # Fall through. It will be reported inline and in the log.
                    logging.error('Uploading SSH key failed: %s' % e)
    else:
        form = SSHSettingsForm()

    if key:
        fingerprint = humanize_key(key)
    else:
        fingerprint = None

    return render_to_response(template_name, RequestContext(request, {
        'key': key,
        'fingerprint': fingerprint,
        'public_key': client.get_public_key(key),
        'form': form,
    }))
开发者ID:karthikkn,项目名称:reviewboard,代码行数:35,代码来源:views.py


示例2: _check_repository

    def _check_repository(self, scmtool_class, path, username, password,
                          local_site, trust_host, ret_cert, request):
        if local_site:
            local_site_name = local_site.name
        else:
            local_site_name = None

        while 1:
            # Keep doing this until we have an error we don't want
            # to ignore, or it's successful.
            try:
                scmtool_class.check_repository(path, username, password,
                                               local_site_name)
                return None
            except RepositoryNotFoundError:
                return MISSING_REPOSITORY
            except BadHostKeyError, e:
                if trust_host:
                    try:
                        client = SSHClient(namespace=local_site_name)
                        client.replace_host_key(e.hostname,
                                                e.raw_expected_key,
                                                e.raw_key)
                    except IOError, e:
                        return SERVER_CONFIG_ERROR, {
                            'reason': str(e),
                        }
                else:
                    return BAD_HOST_KEY, {
                        'hostname': e.hostname,
                        'expected_key': e.raw_expected_key.get_base64(),
                        'key': e.raw_key.get_base64(),
                    }
开发者ID:harrifeng,项目名称:reviewboard,代码行数:33,代码来源:repository.py


示例3: test_import_user_key

    def test_import_user_key(self, namespace=None):
        """Testing SSHClient.import_user_key"""
        self._set_home(self.tempdir)
        client = SSHClient(namespace=namespace)

        client.import_user_key(self.key1)
        self.assertEqual(client.get_user_key(), self.key1)
开发者ID:CrystalLokKoo,项目名称:reviewboard,代码行数:7,代码来源:tests.py


示例4: test_generate_user_key

    def test_generate_user_key(self, namespace=None):
        """Testing SSHClient.generate_user_key"""
        self._set_home(self.tempdir)

        client = SSHClient(namespace=namespace)
        key = client.generate_user_key(bits=1024)
        key_file = os.path.join(client.storage.get_ssh_dir(), 'id_rsa')
        self.assertTrue(os.path.exists(key_file))
        self.assertEqual(client.get_user_key(), key)
开发者ID:CrystalLokKoo,项目名称:reviewboard,代码行数:9,代码来源:tests.py


示例5: check_host

def check_host(hostname, username=None, password=None, namespace=None):
    """
    Checks if we can connect to a host with a known key.

    This will raise an exception if we cannot connect to the host. The
    exception will be one of BadHostKeyError, UnknownHostKeyError, or
    SCMError.
    """
    from django.conf import settings

    client = SSHClient(namespace=namespace)
    client.set_missing_host_key_policy(RaiseUnknownHostKeyPolicy())

    kwargs = {}

    # We normally want to notify on unknown host keys, but not when running
    # unit tests.
    if getattr(settings, 'RUNNING_TEST', False):
        client.set_missing_host_key_policy(paramiko.WarningPolicy())
        kwargs['allow_agent'] = False

    try:
        client.connect(hostname, username=username, password=password,
                       pkey=client.get_user_key(), **kwargs)
    except paramiko.BadHostKeyException, e:
        raise BadHostKeyError(e.hostname, e.key, e.expected_key)
开发者ID:FishingCactus,项目名称:reviewboard,代码行数:26,代码来源:utils.py


示例6: ssh_settings

def ssh_settings(request, template_name='admin/ssh_settings.html'):
    client = SSHClient()
    key = client.get_user_key()

    if request.method == 'POST':
        form = SSHSettingsForm(request.POST, request.FILES)

        if form.is_valid():
            try:
                form.create(request.FILES)
                return HttpResponseRedirect('.')
            except Exception, e:
                # Fall through. It will be reported inline and in the log.
                logging.error('Uploading SSH key failed: %s' % e)
开发者ID:aam1r,项目名称:reviewboard,代码行数:14,代码来源:views.py


示例7: check_host

def check_host(netloc, username=None, password=None, namespace=None):
    """
    Checks if we can connect to a host with a known key.

    This will raise an exception if we cannot connect to the host. The
    exception will be one of BadHostKeyError, UnknownHostKeyError, or
    SCMError.
    """
    from django.conf import settings

    client = SSHClient(namespace=namespace)
    client.set_missing_host_key_policy(RaiseUnknownHostKeyPolicy())

    kwargs = {}

    if ':' in netloc:
        hostname, port = netloc.split(':')

        try:
            port = int(port)
        except ValueError:
            raise SSHInvalidPortError(port)
    else:
        hostname = netloc
        port = SSH_PORT

    # We normally want to notify on unknown host keys, but not when running
    # unit tests.
    if getattr(settings, 'RUNNING_TEST', False):
        client.set_missing_host_key_policy(paramiko.WarningPolicy())
        kwargs['allow_agent'] = False

    try:
        client.connect(hostname, port, username=username, password=password,
                       pkey=client.get_user_key(), **kwargs)
    except paramiko.BadHostKeyException as e:
        raise BadHostKeyError(e.hostname, e.key, e.expected_key)
    except paramiko.AuthenticationException as e:
        # Some AuthenticationException instances have allowed_types set,
        # and some don't.
        allowed_types = getattr(e, 'allowed_types', [])

        if 'publickey' in allowed_types:
            key = client.get_user_key()
        else:
            key = None

        raise SSHAuthenticationError(allowed_types=allowed_types, user_key=key)
    except paramiko.SSHException as e:
        msg = six.text_type(e)
        if msg == 'No authentication methods available':
            raise SSHAuthenticationError
        else:
            raise SSHError(msg)
开发者ID:chipx86,项目名称:reviewboard,代码行数:54,代码来源:utils.py


示例8: test_add_host_key

    def test_add_host_key(self, namespace=None):
        """Testing SSHClient.add_host_key"""
        self._set_home(self.tempdir)
        client = SSHClient(namespace=namespace)

        client.add_host_key('example.com', self.key1)

        known_hosts_file = client.storage.get_host_keys_filename()
        self.assertTrue(os.path.exists(known_hosts_file))

        with open(known_hosts_file, 'r') as f:
            lines = f.readlines()

        self.assertEqual(len(lines), 1)
        self.assertEqual(lines[0].split(),
                         ['example.com', self.key1.get_name(), self.key1_b64])
开发者ID:CrystalLokKoo,项目名称:reviewboard,代码行数:16,代码来源:tests.py


示例9: setUp

    def setUp(self):
        super(SSHSettingsFormTestCase, self).setUp()

        # Setup temp directory to prevent the original ssh related
        # configurations been overwritten.
        self.old_home = os.getenv('HOME')
        self.tempdir = tempfile.mkdtemp(prefix='rb-tests-home-')
        os.environ['RBSSH_ALLOW_AGENT'] = '0'
        self._set_home(self.tempdir)

        self.ssh_client = SSHClient()
开发者ID:Greyhatno,项目名称:reviewboard,代码行数:11,代码来源:tests.py


示例10: setUp

    def setUp(self):
        # Setup temp directory to prevent the original ssh related
        # configurations been overwritten.
        self.old_home = os.getenv('HOME')
        self.tempdir = tempfile.mkdtemp(prefix='rb-tests-home-')
        os.environ['RBSSH_ALLOW_AGENT'] = '0'
        self._set_home(self.tempdir)

        # Init client for http request, ssh_client for ssh config manipulation.
        self.client = Client()
        self.ssh_client = SSHClient()
开发者ID:B-Rich,项目名称:reviewboard,代码行数:11,代码来源:tests.py


示例11: test_delete_user_key

    def test_delete_user_key(self, namespace=None):
        """Testing SSHClient.delete_user_key"""
        self._set_home(self.tempdir)

        client = SSHClient(namespace=namespace)
        client.import_user_key(self.key1)

        key_file = os.path.join(client.storage.get_ssh_dir(), 'id_rsa')
        self.assertTrue(os.path.exists(key_file))
        self.assertEqual(client.get_user_key(), self.key1)

        client.delete_user_key()
        self.assertFalse(os.path.exists(key_file))
开发者ID:CrystalLokKoo,项目名称:reviewboard,代码行数:13,代码来源:tests.py


示例12: __init__


#.........这里部分代码省略.........
        for hosting_service_id, hosting_service in get_hosting_services():
            if hosting_service.supports_repositories:
                hosting_service_choices.append((hosting_service_id,
                                                hosting_service.name))

            if hosting_service.supports_bug_trackers:
                bug_tracker_choices.append((hosting_service_id,
                                            hosting_service.name))

            self.bug_tracker_forms[hosting_service_id] = {}
            self.repository_forms[hosting_service_id] = {}
            self.hosting_service_info[hosting_service_id] = {
                'scmtools': hosting_service.supported_scmtools,
                'plans': [],
                'planInfo': {},
                'self_hosted': hosting_service.self_hosted,
                'needs_authorization': hosting_service.needs_authorization,
                'supports_bug_trackers': hosting_service.supports_bug_trackers,
                'supports_ssh_key_association':
                    hosting_service.supports_ssh_key_association,
                'accounts': [
                    {
                        'pk': account.pk,
                        'hosting_url': account.hosting_url,
                        'username': account.username,
                        'is_authorized': account.is_authorized,
                    }
                    for account in hosting_accounts
                    if account.service_name == hosting_service_id
                ],
            }

            try:
                if hosting_service.plans:
                    for type_id, info in hosting_service.plans:
                        form = info.get('form', None)

                        if form:
                            self._load_hosting_service(hosting_service_id,
                                                       hosting_service,
                                                       type_id,
                                                       info['name'],
                                                       form,
                                                       *args, **kwargs)
                elif hosting_service.form:
                    self._load_hosting_service(hosting_service_id,
                                               hosting_service,
                                               self.DEFAULT_PLAN_ID,
                                               self.DEFAULT_PLAN_NAME,
                                               hosting_service.form,
                                               *args, **kwargs)
            except Exception as e:
                logging.error('Error loading hosting service %s: %s'
                              % (hosting_service_id, e),
                              exc_info=1)

        # Build the list of hosting service choices, sorted, with
        # "None" being first.
        hosting_service_choices.sort(key=lambda x: x[1])
        hosting_service_choices.insert(0, (self.NO_HOSTING_SERVICE_ID,
                                           self.NO_HOSTING_SERVICE_NAME))
        self.fields['hosting_type'].choices = hosting_service_choices

        # Now do the same for bug trackers, but have separate None and Custom
        # entries.
        bug_tracker_choices.sort(key=lambda x: x[1])
        bug_tracker_choices.insert(0, (self.NO_BUG_TRACKER_ID,
                                       self.NO_BUG_TRACKER_NAME))
        bug_tracker_choices.insert(1, (self.CUSTOM_BUG_TRACKER_ID,
                                       self.CUSTOM_BUG_TRACKER_NAME))
        self.fields['bug_tracker_type'].choices = bug_tracker_choices

        # Get the current SSH public key that would be used for repositories,
        # if one has been created.
        self.ssh_client = SSHClient(namespace=self.local_site_name)
        ssh_key = self.ssh_client.get_user_key()

        if ssh_key:
            self.public_key = self.ssh_client.get_public_key(ssh_key)
            self.public_key_str = '%s %s' % (
                ssh_key.get_name(),
                ''.join(str(self.public_key).splitlines())
            )
        else:
            self.public_key = None
            self.public_key_str = ''

        # If no SSH key has been created, disable the key association field.
        if not self.public_key:
            self.fields['associate_ssh_key'].help_text = \
                self.NO_KEY_HELP_FMT % local_site_reverse(
                    'settings-ssh',
                    local_site_name=self.local_site_name)
            self.fields['associate_ssh_key'].widget.attrs['disabled'] = \
                'disabled'

        if self.instance:
            self._populate_repository_info_fields()
            self._populate_hosting_service_fields()
            self._populate_bug_tracker_fields()
开发者ID:prodigeni,项目名称:reviewboard,代码行数:101,代码来源:forms.py


示例13: _test_ssh_with_site

    def _test_ssh_with_site(self, repo_path, filename=None):
        """Helper for testing an SSH connection and using a Local Site.

        This will attempt to SSH into the local machine and connect to the
        given repository, using an SSH key and repository based on a Local
        Site. It will check the repository for validity and optionally fetch
        a file.

        If this is unable to connect to the local machine, the test will be
        flagged as skipped.

        Args:
            repo_path (unicode):
                The repository path to check.

            filename (unicode, optional):
                The optional file in the repository to fetch.
        """
        self._check_can_test_ssh()

        # Get the user's .ssh key, for use in the tests
        user_key = self.ssh_client.get_user_key()
        self.assertNotEqual(user_key, None)

        # Switch to a new SSH directory.
        self.tempdir = mkdtemp(prefix='rb-tests-home-')
        sshdir = os.path.join(self.tempdir, '.ssh')
        self._set_home(self.tempdir)

        self.assertEqual(sshdir, self.ssh_client.storage.get_ssh_dir())
        self.assertFalse(os.path.exists(os.path.join(sshdir, 'id_rsa')))
        self.assertFalse(os.path.exists(os.path.join(sshdir, 'id_dsa')))
        self.assertEqual(self.ssh_client.get_user_key(), None)

        tool_class = self.repository.tool

        # Make sure we aren't using the old SSH key. We want auth errors.
        repo = Repository(name='SSH Test', path=repo_path, tool=tool_class)
        tool = repo.get_scmtool()
        self.assertRaises(AuthenticationError,
                          lambda: tool.check_repository(repo_path))

        if filename:
            self.assertRaises(SCMError,
                              lambda: tool.get_file(filename, HEAD))

        for local_site_name in ('site-1',):
            local_site = LocalSite(name=local_site_name)
            local_site.save()

            repo = Repository(name='SSH Test', path=repo_path, tool=tool_class,
                              local_site=local_site)
            tool = repo.get_scmtool()

            ssh_client = SSHClient(namespace=local_site_name)
            self.assertEqual(ssh_client.storage.get_ssh_dir(),
                             os.path.join(sshdir, local_site_name))
            ssh_client.import_user_key(user_key)
            self.assertEqual(ssh_client.get_user_key(), user_key)

            # Make sure we can verify the repository and access files.
            tool.check_repository(repo_path, local_site_name=local_site_name)

            if filename:
                self.assertNotEqual(tool.get_file(filename, HEAD), None)
开发者ID:chipx86,项目名称:reviewboard,代码行数:65,代码来源:testcases.py


示例14: SSHSettingsFormTestCase

class SSHSettingsFormTestCase(TestCase):
    """Unit tests for SSHSettingsForm in /admin/forms.py"""
    fixtures = ['test_users']

    def setUp(self):
        super(SSHSettingsFormTestCase, self).setUp()

        # Setup temp directory to prevent the original ssh related
        # configurations been overwritten.
        self.old_home = os.getenv('HOME')
        self.tempdir = tempfile.mkdtemp(prefix='rb-tests-home-')
        os.environ['RBSSH_ALLOW_AGENT'] = '0'
        self._set_home(self.tempdir)

        self.ssh_client = SSHClient()

    def tearDown(self):
        super(SSHSettingsFormTestCase, self).tearDown()

        self._set_home(self.old_home)

        if self.tempdir:
            shutil.rmtree(self.tempdir)

    def _set_home(self, homedir):
        os.environ['HOME'] = homedir

    def test_generate_key(self):
        """Testing SSHSettingsForm POST with generate_key=1"""
        # Should have no ssh key at this point.
        self.assertEqual(self.ssh_client.get_user_key(), None)

        # Send post request with 'generate_key' = 1.
        self.client.login(username='admin', password='admin')
        response = self.client.post(local_site_reverse('settings-ssh'), {
            'generate_key': 1,
        })

        # On success, the form returns HTTP 302 (redirect).
        self.assertEqual(response.status_code, 302)

        # Check whether the key has been created.
        self.assertNotEqual(self.ssh_client.get_user_key(), None)

    def test_delete_key(self):
        """Testing SSHSettingsForm POST with delete_key=1"""
        # Should have no ssh key at this point, generate one.
        self.assertEqual(self.ssh_client.get_user_key(), None)
        self.ssh_client.generate_user_key()
        self.assertNotEqual(self.ssh_client.get_user_key(), None)

        # Send post request with 'delete_key' = 1.
        self.client.login(username='admin', password='admin')
        response = self.client.post(local_site_reverse('settings-ssh'), {
            'delete_key': 1,
        })

        # On success, the form returns HTTP 302 (redirect).
        self.assertEqual(response.status_code, 302)

        # Check whether the key has been deleted.
        self.assertEqual(self.ssh_client.get_user_key(), None)
开发者ID:Greyhatno,项目名称:reviewboard,代码行数:62,代码来源:tests.py


示例15: RepositoryForm


#.........这里部分代码省略.........
                                                       info['name'],
                                                       form,
                                                       *args, **kwargs)
                elif hosting_service.form:
                    self._load_hosting_service(hosting_service_id,
                                               hosting_service,
                                               self.DEFAULT_PLAN_ID,
                                               self.DEFAULT_PLAN_NAME,
                                               hosting_service.form,
                                               *args, **kwargs)
            except Exception as e:
                logging.error('Error loading hosting service %s: %s'
                              % (hosting_service_id, e),
                              exc_info=1)

        # Build the list of hosting service choices, sorted, with
        # "None" being first.
        hosting_service_choices.sort(key=lambda x: x[1])
        hosting_service_choices.insert(0, (self.NO_HOSTING_SERVICE_ID,
                                           self.NO_HOSTING_SERVICE_NAME))
        self.fields['hosting_type'].choices = hosting_service_choices

        # Now do the same for bug trackers, but have separate None and Custom
        # entries.
        bug_tracker_choices.sort(key=lambda x: x[1])
        bug_tracker_choices.insert(0, (self.NO_BUG_TRACKER_ID,
                                       self.NO_BUG_TRACKER_NAME))
        bug_tracker_choices.insert(1, (self.CUSTOM_BUG_TRACKER_ID,
                                       self.CUSTOM_BUG_TRACKER_NAME))
        self.fields['bug_tracker_type'].choices = bug_tracker_choices

        # Get the current SSH public key that would be used for repositories,
        # if one has been created.
        self.ssh_client = SSHClient(namespace=self.local_site_name)
        ssh_key = self.ssh_client.get_user_key()

        if ssh_key:
            self.public_key = self.ssh_client.get_public_key(ssh_key)
            self.public_key_str = '%s %s' % (
                ssh_key.get_name(),
                ''.join(str(self.public_key).splitlines())
            )
        else:
            self.public_key = None
            self.public_key_str = ''

        # If no SSH key has been created, disable the key association field.
        if not self.public_key:
            self.fields['associate_ssh_key'].help_text = \
                self.NO_KEY_HELP_FMT % local_site_reverse(
                    'settings-ssh',
                    local_site_name=self.local_site_name)
            self.fields['associate_ssh_key'].widget.attrs['disabled'] = \
                'disabled'

        if self.instance:
            self._populate_repository_info_fields()
            self._populate_hosting_service_fields()
            self._populate_bug_tracker_fields()

    def _load_hosting_service(self, hosting_service_id, hosting_service,
                              repo_type_id, repo_type_label, form_class,
                              *args, **kwargs):
        """Loads a hosting service form.

        The form will be instantiated and added to the list of forms to be
开发者ID:prodigeni,项目名称:reviewboard,代码行数:67,代码来源:forms.py


示例16: _check_repository

    def _check_repository(self, scmtool_class, path, username, password,
                          local_site, trust_host, ret_cert, request):
        if local_site:
            local_site_name = local_site.name
        else:
            local_site_name = None

        while 1:
            # Keep doing this until we have an error we don't want
            # to ignore, or it's successful.
            try:
                scmtool_class.check_repository(path, username, password,
                                               local_site_name)
                return None
            except RepositoryNotFoundError:
                return MISSING_REPOSITORY
            except BadHostKeyError as e:
                if trust_host:
                    try:
                        client = SSHClient(namespace=local_site_name)
                        client.replace_host_key(e.hostname,
                                                e.raw_expected_key,
                                                e.raw_key)
                    except IOError as e:
                        return SERVER_CONFIG_ERROR, {
                            'reason': six.text_type(e),
                        }
                else:
                    return BAD_HOST_KEY, {
                        'hostname': e.hostname,
                        'expected_key': e.raw_expected_key.get_base64(),
                        'key': e.raw_key.get_base64(),
                    }
            except UnknownHostKeyError as e:
                if trust_host:
                    try:
                        client = SSHClient(namespace=local_site_name)
                        client.add_host_key(e.hostname, e.raw_key)
                    except IOError as e:
                        return SERVER_CONFIG_ERROR, {
                            'reason': six.text_type(e),
                        }
                else:
                    return UNVERIFIED_HOST_KEY, {
                        'hostname': e.hostname,
                        'key': e.raw_key.get_base64(),
                    }
            except UnverifiedCertificateError as e:
                if trust_host:
                    try:
                        cert = scmtool_class.accept_certificate(
                            path, local_site_name)

                        if cert:
                            ret_cert.update(cert)
                    except IOError as e:
                        return SERVER_CONFIG_ERROR, {
                            'reason': six.text_type(e),
                        }
                else:
                    return UNVERIFIED_HOST_CERT, {
                        'certificate': {
                            'failures': e.certificate.failures,
                            'fingerprint': e.certificate.fingerprint,
                            'hostname': e.certificate.hostname,
                            'issuer': e.certificate.issuer,
                            'valid': {
                                'from': e.certificate.valid_from,
                                'until': e.certificate.valid_until,
                            },
                        },
                    }
            except AuthenticationError as e:
                if 'publickey' in e.allowed_types and e.user_key is None:
                    return MISSING_USER_KEY
                else:
                    return REPO_AUTHENTICATION_ERROR, {
                        'reason': six.text_type(e),
                    }
            except SSHError as e:
                logging.error('Got unexpected SSHError when checking '
                              'repository: %s'
                              % e, exc_info=1, request=request)
                return REPO_INFO_ERROR, {
                    'error': six.text_type(e),
                }
            except SCMError as e:
                logging.error('Got unexpected SCMError when checking '
                              'repository: %s'
                              % e, exc_info=1, request=request)
                return REPO_INFO_ERROR, {
                    'error': six.text_type(e),
                }
            except Exception as e:
                logging.error('Unknown error in checking repository %s: %s',
                              path, e, exc_info=1, request=request)

                # We should give something better, but I don't have anything.
                # This will at least give a HTTP 500.
                raise
开发者ID:CharanKamal-CLI,项目名称:reviewboard,代码行数:100,代码来源:repository.py


示例17: main

def main():
    """Run the application."""
    if DEBUG:
        pid = os.getpid()
        log_filename = 'rbssh-%s.log' % pid

        if DEBUG_LOGDIR:
            log_path = os.path.join(DEBUG_LOGDIR, log_filename)
        else:
            log_path = log_filename

        logging.basicConfig(level=logging.DEBUG,
                            format='%(asctime)s %(name)-18s %(levelname)-8s '
                                   '%(message)s',
                            datefmt='%m-%d %H:%M',
                            filename=log_path,
                            filemode='w')

        logging.debug('%s' % sys.argv)
        logging.debug('PID %s' % pid)

    ch = logging.StreamHandler()
    ch.setLevel(logging.INFO)
    ch.setFormatter(logging.Formatter('%(message)s'))
    ch.addFilter(logging.Filter('root'))
    logging.getLogger('').addHandler(ch)

    path, port, command = parse_options(sys.argv[1:])

    if '://' not in path:
        path = 'ssh://' + path

    username, hostname = SCMTool.get_auth_from_uri(path, options.username)

    if username is None:
        username = getpass.getuser()

    logging.debug('!!! %s, %s, %s' % (hostname, username, command))

    client = SSHClient(namespace=options.local_site_name)
    client.set_missing_host_key_policy(paramiko.WarningPolicy())

    attempts = 0
    password = None

    key = client.get_user_key()

    while True:
        try:
            client.connect(hostname, port, username=username,
                           password=password, pkey=key,
                           allow_agent=options.allow_agent)
            break
        except paramiko.AuthenticationException as e:
            if attempts == 3 or not sys.stdin.isatty():
                logging.error('Too many authentication failures for %s' %
                              username)
                sys.exit(1)

            attempts += 1
            password = getpass.getpass("%[email protected]%s's password: " %
                                       (username, hostname))
        except paramiko.SSHException as e:
            logging.error('Error connecting to server: %s' % e)
            sys.exit(1)
        except Exception as e:
            logging.error('Unknown exception during connect: %s (%s)' %
                          (e, type(e)))
            sys.exit(1)

    transport = client.get_transport()
    channel = transport.open_session()

    if sys.platform in ('cygwin', 'win32'):
        logging.debug('!!! Using WindowsHandler')
        handler = WindowsHandler(channel)
    else:
        logging.debug('!!! Using PosixHandler')
        handler = PosixHandler(channel)

    if options.subsystem == 'sftp':
        logging.debug('!!! Invoking sftp subsystem')
        channel.invoke_subsystem('sftp')
        handler.transfer()
    elif command:
        logging.debug('!!! Sending command %s' % command)
        channel.exec_command(' '.join(command))
        handler.transfer()
    else:
        logging.debug('!!! Opening shell')
        channel.get_pty()
        channel.invoke_shell()
        handler.shell()

    logging.debug('!!! Done')
    status = channel.recv_exit_status()
    client.close()

    return status
开发者ID:Anastasiya2307,项目名称:reviewboard,代码行数:99,代码来源:rbssh.py


示例18: main

def main():
    if DEBUG:
        pid = os.getpid()
        log_filename = 'rbssh-%s.log' % pid

        if DEBUG_LOGDIR:
            log_path = os.path.join(DEBUG_LOGDIR, log_filename)
        else:
            log_path = log_filename

        logging.basicConfig(level=logging.DEBUG,
                            format='%(asctime)s %(name)-18s %(levelname)-8s '
                                   '%(message)s',
                            datefmt='%m-%d %H:%M',
                            filename=log_path,
                            filemode='w')

        logging.debug('%s' % sys.argv)
        logging.debug('PID %s' % pid)

    ch = logging.StreamHandler()
    ch.setLevel(logging.INFO)
    ch.setFormatter(logging.Formatter('%(message)s'))
    ch.addFilter(logging.Filter('root'))
    logging.getLogger('').addHandler(ch)

    path, port, command = parse_options(sys.argv[1:])

    if '://' not in path:
        path = 'ssh://' + path

    username, hostname = SCMTool.get_auth_from_uri(path, options.username)

    if username is None:
        username = getpass.getuser()

    logging.debug('!!! %s, %s, %s' % (hostname, username, command))

    client = SSHClient(namespace=options.local_site_name)
    client.set_missing_host_key_policy(paramiko.WarningPolicy())

    attempts = 0
    password = None

    key = client.get_user_key()

    while True:
        try:
            client.connect(hostname, port, username=username, password=password,
                           pkey=key, allow_agent=options.allow_agent)
            break
        except paramiko.AuthenticationException, e:
            if attempts == 3 or not sys.stdin.isatty():
                logging.error('Too many authentication failures for %s' %
                              username)
                sys.exit(1)

            attempts += 1
            password = getpass.getpass("%[email protected]%s's password: " %
                                       (username, hostname))
        except paramiko.SSHException, e:
            logging.error('Error connecting to server: %s' % e)
            sys.exit(1)
开发者ID:chicken-rice,项目名称:reviewboard,代码行数:63,代码来源:rbssh.py



注:本文中的reviewboard.ssh.client.SSHClient类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.register_rbssh函数代码示例发布时间:2022-05-26
下一篇:
Python validation.validate_users函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap