• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python utils.sftpcmd函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pysftpserver.tests.utils.sftpcmd函数的典型用法代码示例。如果您正苦于以下问题:Python sftpcmd函数的具体用法?Python sftpcmd怎么用?Python sftpcmd使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了sftpcmd函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_rename

    def test_rename(self):
        self.server.input_queue = sftpcmd(
            SSH2_FXP_OPEN,
            sftpstring(b'services'),
            sftpint(SSH2_FXF_CREAT | SSH2_FXF_WRITE),
            sftpint(SSH2_FILEXFER_ATTR_PERMISSIONS),
            sftpint(0o644)
        )
        self.server.process()
        handle = get_sftphandle(self.server.output_queue)

        # reset output queue
        self.server.output_queue = b''
        self.server.input_queue = sftpcmd(
            SSH2_FXP_CLOSE,
            sftpstring(handle),
        )
        self.server.process()

        self.server.input_queue = sftpcmd(
            SSH2_FXP_RENAME,
            sftpstring(b'services'),
            sftpstring(b'other_services'),
        )
        self.server.process()
        self.assertIn('other_services', os.listdir('.'))
开发者ID:20tab,项目名称:pysftpserver,代码行数:26,代码来源:test_server_chroot.py


示例2: test_remove

    def test_remove(self):
        self.server.input_queue = sftpcmd(
            SSH2_FXP_OPEN,
            sftpstring(b'services'),
            sftpint(SSH2_FXF_CREAT | SSH2_FXF_WRITE),
            sftpint(SSH2_FILEXFER_ATTR_PERMISSIONS),
            sftpint(0o644)
        )
        self.server.process()
        handle = get_sftphandle(self.server.output_queue)

        # reset output queue
        self.server.output_queue = b''
        self.server.input_queue = sftpcmd(
            SSH2_FXP_CLOSE,
            sftpstring(handle)
        )
        self.server.process()

        self.server.input_queue = sftpcmd(
            SSH2_FXP_REMOVE,
            sftpstring(b'services'),
            sftpint(0)
        )
        self.server.process()
开发者ID:20tab,项目名称:pysftpserver,代码行数:25,代码来源:test_server_chroot.py


示例3: test_write

 def test_write(self, mock_request):
     filename = b'services'
     write_offset = 5
     self.server.input_queue = sftpcmd(
         SSH2_FXP_OPEN,
         sftpstring(filename),
         sftpint(SSH2_FXF_CREAT | SSH2_FXF_WRITE | SSH2_FXF_READ),
         sftpint(SSH2_FILEXFER_ATTR_PERMISSIONS),
         sftpint(0o644),
     )
     self.server.process()
     handle = get_sftphandle(self.server.output_queue)
     self.server.output_queue = b''
     chunk = open('/etc/services', 'rb').read()
     self.server.input_queue = sftpcmd(
         SSH2_FXP_WRITE,
         sftpstring(handle),
         sftpint64(write_offset),
         sftpstring(chunk),
     )
     self.server.process()
     self.server.output_queue = b''
     self.server.input_queue = sftpcmd(SSH2_FXP_CLOSE, sftpstring(handle))
     self.server.process()
     mock_request.assert_has_calls([
         mock.ANY,  # open
         mock.call(
             'POST', 'test_url/write', auth=None,
             data={
                 'method': 'write', 'filename': filename,
                 'offset': write_offset}),
         mock.ANY,  # close
     ])
     os.unlink(filename)
开发者ID:20tab,项目名称:pysftpserver,代码行数:34,代码来源:test_urlrequesthook.py


示例4: test_fstat

    def test_fstat(self):
        self.server.input_queue = sftpcmd(
            SSH2_FXP_OPEN,
            sftpstring(b'services'),
            sftpint(SSH2_FXF_CREAT),
            sftpint(0)
        )
        self.server.process()
        handle = get_sftphandle(self.server.output_queue)

        self.server.output_queue = b''
        self.server.input_queue = sftpcmd(
            SSH2_FXP_FSTAT,
            sftpstring(handle)
        )
        self.server.process()
        stat = get_sftpstat(self.server.output_queue)
        self.assertEqual(stat['size'], 0)
        self.assertEqual(stat['uid'], os.getuid())

        self.server.output_queue = b''
        self.server.input_queue = sftpcmd(
            SSH2_FXP_CLOSE,
            sftpstring(handle)
        )
        self.server.process()

        os.unlink('services')
开发者ID:20tab,项目名称:pysftpserver,代码行数:28,代码来源:test_server_chroot.py


示例5: test_open

 def test_open(self, mock_request):
     filename = b'services'
     flags = SSH2_FXF_CREAT | SSH2_FXF_WRITE
     perm = 0o100600
     self.server.input_queue = sftpcmd(
         SSH2_FXP_OPEN,
         sftpstring(filename),
         sftpint(flags),
         sftpint(SSH2_FILEXFER_ATTR_PERMISSIONS),
         sftpint(perm),
     )
     self.server.process()
     handle = get_sftphandle(self.server.output_queue)
     self.server.output_queue = b''
     self.server.input_queue = sftpcmd(SSH2_FXP_CLOSE, sftpstring(handle))
     self.server.process()
     mock_request.assert_has_calls([
         mock.call(
             'POST', 'test_url/open', auth=None,
             data={
                 'method': 'open', 'filename': filename,
                 'flags': self.server.get_explicit_flags(flags),
                 'attrs': {b'perm': perm}}),
         mock.ANY,  # close
     ])
     os.unlink(filename)
开发者ID:20tab,项目名称:pysftpserver,代码行数:26,代码来源:test_urlrequesthook.py


示例6: test_rm

 def test_rm(self, mock_request):
     filename = b'services'
     self.server.input_queue = sftpcmd(
         SSH2_FXP_OPEN,
         sftpstring(filename),
         sftpint(SSH2_FXF_CREAT | SSH2_FXF_WRITE),
         sftpint(SSH2_FILEXFER_ATTR_PERMISSIONS),
         sftpint(0o644)
     )
     self.server.process()
     handle = get_sftphandle(self.server.output_queue)
     self.server.output_queue = b''
     self.server.input_queue = sftpcmd(SSH2_FXP_CLOSE, sftpstring(handle))
     self.server.process()
     self.server.output_queue = b''
     self.server.input_queue = sftpcmd(
         SSH2_FXP_REMOVE,
         sftpstring(filename),
         sftpint(0)
     )
     self.server.process()
     mock_request.assert_has_calls([
         mock.ANY,  # open
         mock.ANY,  # close
         mock.call(
             'POST', 'test_url/rm', auth=None,
             data={'method': 'rm', 'filename': filename}),
     ])
开发者ID:20tab,项目名称:pysftpserver,代码行数:28,代码来源:test_urlrequesthook.py


示例7: test_rename

 def test_rename(self):
     oldpath = b'services'
     newpath = b'other_services'
     self.server.input_queue = sftpcmd(
         SSH2_FXP_OPEN,
         sftpstring(oldpath),
         sftpint(SSH2_FXF_CREAT | SSH2_FXF_WRITE),
         sftpint(SSH2_FILEXFER_ATTR_PERMISSIONS),
         sftpint(0o644)
     )
     self.server.process()
     handle = get_sftphandle(self.server.output_queue)
     self.server.output_queue = b''
     self.server.input_queue = sftpcmd(SSH2_FXP_CLOSE, sftpstring(handle))
     self.server.process()
     self.server.output_queue = b''
     self.server.input_queue = sftpcmd(
         SSH2_FXP_RENAME,
         sftpstring(oldpath),
         sftpstring(newpath),
     )
     self.server.process()
     self.assertEqual(self.hook.get_result('rename', 'oldpath'), oldpath)
     self.assertEqual(self.hook.get_result('rename', 'newpath'), newpath)
     os.unlink(newpath)
开发者ID:20tab,项目名称:pysftpserver,代码行数:25,代码来源:test_hook.py


示例8: test_realpath

 def test_realpath(self, mock_request):
     """Additionally tests multiple urls and no path."""
     self.server.hook = UrlRequestHook(
         'test_url',
         urls_mapping={
             'realpath': ['test_url_1', 'test_url_2']},
         paths_mapping={
             'realpath': ''})
     filename = b'services'
     flags = SSH2_FXF_CREAT | SSH2_FXF_WRITE
     perm = 0o100600
     self.server.input_queue = sftpcmd(
         SSH2_FXP_OPEN,
         sftpstring(filename),
         sftpint(flags),
         sftpint(SSH2_FILEXFER_ATTR_PERMISSIONS),
         sftpint(perm),
     )
     self.server.process()
     self.server.output_queue = b''
     self.server.input_queue = sftpcmd(SSH2_FXP_REALPATH,
                                       sftpstring(filename))
     self.server.process()
     mock_request.assert_has_calls([
         mock.ANY,  # open
         mock.call(
             'POST', 'test_url_1/', auth=None,
             data={'method': 'realpath', 'filename': filename}),
         mock.call(
             'POST', 'test_url_2/', auth=None,
             data={'method': 'realpath', 'filename': filename}),
     ])
     os.unlink(filename)
开发者ID:20tab,项目名称:pysftpserver,代码行数:33,代码来源:test_urlrequesthook.py


示例9: test_opendir

 def test_opendir(self, mock_request):
     """Additionally tests single url and multiple paths."""
     self.server.hook = UrlRequestHook(
         'test_url',
         paths_mapping={
             'opendir': ['test_path_1', 'test_path_2', 'test_path_3']})
     dirname = b'foo'
     os.mkdir(dirname)
     self.server.input_queue = sftpcmd(SSH2_FXP_OPENDIR,
                                       sftpstring(dirname))
     self.server.process()
     handle = get_sftphandle(self.server.output_queue)
     self.server.output_queue = b''
     self.server.input_queue = sftpcmd(SSH2_FXP_CLOSE, sftpstring(handle))
     self.server.process()
     mock_request.assert_has_calls([
         mock.call(
             'POST', 'test_url/test_path_1', auth=None,
             data={'method': 'opendir', 'filename': dirname}),
         mock.call(
             'POST', 'test_url/test_path_2', auth=None,
             data={'method': 'opendir', 'filename': dirname}),
         mock.call(
             'POST', 'test_url/test_path_3', auth=None,
             data={'method': 'opendir', 'filename': dirname}),
         mock.ANY,  # close
     ])
     rmtree(dirname)
开发者ID:20tab,项目名称:pysftpserver,代码行数:28,代码来源:test_urlrequesthook.py


示例10: test_open_already_existing

    def test_open_already_existing(self):
        self.server.input_queue = sftpcmd(
            SSH2_FXP_OPEN,
            sftpstring(b'services'),
            sftpint(SSH2_FXF_CREAT),
            sftpint(0)
        )
        self.server.process()
        handle = get_sftphandle(self.server.output_queue)

        # reset output queue
        self.server.output_queue = b''
        self.server.input_queue = sftpcmd(
            SSH2_FXP_CLOSE,
            sftpstring(handle)
        )
        self.server.process()

        self.server.input_queue = sftpcmd(
            SSH2_FXP_OPEN,
            sftpstring(b'services'),
            sftpint(SSH2_FXF_CREAT | SSH2_FXF_EXCL),
            sftpint(0)
        )
        self.assertRaises(SFTPException, self.server.process)

        os.unlink('services')
开发者ID:20tab,项目名称:pysftpserver,代码行数:27,代码来源:test_server_chroot.py


示例11: test_write

 def test_write(self):
     filename = b'services'
     write_offset = 5
     self.server.input_queue = sftpcmd(
         SSH2_FXP_OPEN,
         sftpstring(filename),
         sftpint(SSH2_FXF_CREAT | SSH2_FXF_WRITE | SSH2_FXF_READ),
         sftpint(SSH2_FILEXFER_ATTR_PERMISSIONS),
         sftpint(0o644),
     )
     self.server.process()
     handle = get_sftphandle(self.server.output_queue)
     self.server.output_queue = b''
     chunk = open('/etc/services', 'rb').read()
     self.server.input_queue = sftpcmd(
         SSH2_FXP_WRITE,
         sftpstring(handle),
         sftpint64(write_offset),
         sftpstring(chunk),
     )
     self.server.process()
     self.assertEqual(self.hook.get_result('write', 'filename'), filename)
     self.assertEqual(self.hook.get_result('write', 'offset'), write_offset)
     self.server.output_queue = b''
     self.server.input_queue = sftpcmd(SSH2_FXP_CLOSE, sftpstring(handle))
     self.server.process()
     os.unlink(filename)
开发者ID:20tab,项目名称:pysftpserver,代码行数:27,代码来源:test_hook.py


示例12: test_rename

 def test_rename(self, mock_request):
     oldpath = b'services'
     newpath = b'other_services'
     self.server.input_queue = sftpcmd(
         SSH2_FXP_OPEN,
         sftpstring(oldpath),
         sftpint(SSH2_FXF_CREAT | SSH2_FXF_WRITE),
         sftpint(SSH2_FILEXFER_ATTR_PERMISSIONS),
         sftpint(0o644)
     )
     self.server.process()
     handle = get_sftphandle(self.server.output_queue)
     self.server.output_queue = b''
     self.server.input_queue = sftpcmd(SSH2_FXP_CLOSE, sftpstring(handle))
     self.server.process()
     self.server.output_queue = b''
     self.server.input_queue = sftpcmd(
         SSH2_FXP_RENAME,
         sftpstring(oldpath),
         sftpstring(newpath),
     )
     self.server.process()
     mock_request.assert_has_calls([
         mock.ANY,  # open
         mock.ANY,  # close
         mock.call(
             'POST', 'test_url/rename', auth=None,
             data={
                 'method': 'rename', 'oldpath': oldpath,
                 'newpath': newpath}),
     ])
     os.unlink(newpath)
开发者ID:20tab,项目名称:pysftpserver,代码行数:32,代码来源:test_urlrequesthook.py


示例13: test_mkdir_forbidden

    def test_mkdir_forbidden(self):
        self.server.input_queue = sftpcmd(
            SSH2_FXP_MKDIR, sftpstring(b'../foo'), sftpint(0))
        self.assertRaises(SFTPForbidden, self.server.process)

        self.server.input_queue = sftpcmd(
            SSH2_FXP_MKDIR, sftpstring(b'/foo'), sftpint(0))
        self.assertRaises(SFTPForbidden, self.server.process)
开发者ID:20tab,项目名称:pysftpserver,代码行数:8,代码来源:test_server_chroot.py


示例14: test_rmdir

 def test_rmdir(self):
     dirname = b'foo'
     # sftpint(0) means no attrs
     self.server.input_queue = sftpcmd(
         SSH2_FXP_MKDIR, sftpstring(dirname), sftpint(0))
     self.server.process()
     self.server.output_queue = b''
     self.server.input_queue = sftpcmd(SSH2_FXP_RMDIR, sftpstring(dirname))
     self.server.process()
     self.assertEqual(self.hook.get_result('rmdir'), dirname)
开发者ID:20tab,项目名称:pysftpserver,代码行数:10,代码来源:test_hook.py


示例15: test_fsetstat

 def test_fsetstat(self, mock_request):
     filename = b'services'
     attrs = {
         b'size': 10**2,
         b'perm': 0o100600,
         b'atime': 1415626110,
         b'mtime': 1415626120,
     }
     self.server.input_queue = sftpcmd(
         SSH2_FXP_OPEN,
         sftpstring(filename),
         sftpint(SSH2_FXF_CREAT | SSH2_FXF_WRITE),
         sftpint(0)
     )
     self.server.process()
     handle = get_sftphandle(self.server.output_queue)
     self.server.output_queue = b''
     etc_services = open('/etc/services', 'rb').read()
     self.server.input_queue = sftpcmd(
         SSH2_FXP_WRITE,
         sftpstring(handle),
         sftpint64(0),
         sftpstring(etc_services)
     )
     self.server.process()
     self.server.output_queue = b''
     self.server.input_queue = sftpcmd(
         SSH2_FXP_FSETSTAT,
         sftpstring(handle),
         sftpint(
             SSH2_FILEXFER_ATTR_SIZE |
             SSH2_FILEXFER_ATTR_PERMISSIONS |
             SSH2_FILEXFER_ATTR_ACMODTIME
         ),
         sftpint64(attrs[b'size']),
         sftpint(attrs[b'perm']),
         sftpint(attrs[b'atime']),
         sftpint(attrs[b'mtime']),
     )
     self.server.process()
     self.server.output_queue = b''
     self.server.input_queue = sftpcmd(SSH2_FXP_CLOSE, sftpstring(handle))
     self.server.process()
     mock_request.assert_has_calls([
         mock.ANY,  # open
         mock.ANY,  # write
         mock.call(
             'POST', 'test_url/fsetstat', auth=None,
             data={
                 'method': 'fsetstat', 'filename': filename,
                 'attrs': attrs}),
         mock.ANY,  # close
     ])
     os.unlink(filename)
开发者ID:20tab,项目名称:pysftpserver,代码行数:54,代码来源:test_urlrequesthook.py


示例16: test_opendir

 def test_opendir(self):
     dirname = b'foo'
     os.mkdir(dirname)
     self.server.input_queue = sftpcmd(SSH2_FXP_OPENDIR,
                                       sftpstring(dirname))
     self.server.process()
     self.assertEqual(self.hook.get_result('opendir'), dirname)
     handle = get_sftphandle(self.server.output_queue)
     self.server.output_queue = b''
     self.server.input_queue = sftpcmd(SSH2_FXP_CLOSE, sftpstring(handle))
     self.server.process()
     rmtree(dirname)
开发者ID:20tab,项目名称:pysftpserver,代码行数:12,代码来源:test_hook.py


示例17: test_open_forbidden

    def test_open_forbidden(self):
        self.server.input_queue = sftpcmd(
            SSH2_FXP_OPEN, sftpstring(
                b'/etc/services'), sftpint(SSH2_FXF_CREAT), sftpint(0)
        )
        self.assertRaises(SFTPForbidden, self.server.process)

        self.server.input_queue = sftpcmd(
            SSH2_FXP_OPEN, sftpstring(
                b'../../foo'), sftpint(SSH2_FXF_CREAT), sftpint(0)
        )
        self.assertRaises(SFTPForbidden, self.server.process)
开发者ID:20tab,项目名称:pysftpserver,代码行数:12,代码来源:test_server_chroot.py


示例18: test_rename_forbidden

    def test_rename_forbidden(self):
        self.server.input_queue = sftpcmd(
            SSH2_FXP_RENAME,
            sftpstring(b'services'),
            sftpstring(b'/etc/other_services'),
        )
        self.assertRaises(SFTPForbidden, self.server.process)

        self.server.input_queue = sftpcmd(
            SSH2_FXP_RENAME,
            sftpstring(b'/etc/services'),
            sftpstring(b'/etc/other_services'),
        )
        self.assertRaises(SFTPForbidden, self.server.process)
开发者ID:20tab,项目名称:pysftpserver,代码行数:14,代码来源:test_server_chroot.py


示例19: test_close

 def test_close(self):
     filename = b'services'
     self.server.input_queue = sftpcmd(
         SSH2_FXP_OPEN,
         sftpstring(filename),
         sftpint(SSH2_FXF_CREAT | SSH2_FXF_WRITE),
         sftpint(0),
     )
     self.server.process()
     handle = get_sftphandle(self.server.output_queue)
     self.server.output_queue = b''
     self.server.input_queue = sftpcmd(SSH2_FXP_CLOSE, sftpstring(handle))
     self.server.process()
     self.assertEqual(self.hook.get_result('close'), filename)
     os.unlink(filename)
开发者ID:20tab,项目名称:pysftpserver,代码行数:15,代码来源:test_hook.py


示例20: test_rmdir

 def test_rmdir(self, mock_request):
     dirname = b'foo'
     # sftpint(0) means no attrs
     self.server.input_queue = sftpcmd(
         SSH2_FXP_MKDIR, sftpstring(dirname), sftpint(0))
     self.server.process()
     self.server.output_queue = b''
     self.server.input_queue = sftpcmd(SSH2_FXP_RMDIR, sftpstring(dirname))
     self.server.process()
     mock_request.assert_has_calls([
         mock.ANY,  # mkdir
         mock.call(
             'POST', 'test_url/rmdir', auth=None,
             data={'method': 'rmdir', 'filename': dirname}),
     ])
开发者ID:20tab,项目名称:pysftpserver,代码行数:15,代码来源:test_urlrequesthook.py



注:本文中的pysftpserver.tests.utils.sftpcmd函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.sftpstring函数代码示例发布时间:2022-05-26
下一篇:
Python sfgerrit.GerritUtils类代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap