本文整理汇总了Python中pysftpserver.tests.utils.sftpcmd函数的典型用法代码示例。如果您正苦于以下问题:Python sftpcmd函数的具体用法?Python sftpcmd怎么用?Python sftpcmd使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了sftpcmd函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_rename
def test_rename(self):
self.server.input_queue = sftpcmd(
SSH2_FXP_OPEN,
sftpstring(b'services'),
sftpint(SSH2_FXF_CREAT | SSH2_FXF_WRITE),
sftpint(SSH2_FILEXFER_ATTR_PERMISSIONS),
sftpint(0o644)
)
self.server.process()
handle = get_sftphandle(self.server.output_queue)
# reset output queue
self.server.output_queue = b''
self.server.input_queue = sftpcmd(
SSH2_FXP_CLOSE,
sftpstring(handle),
)
self.server.process()
self.server.input_queue = sftpcmd(
SSH2_FXP_RENAME,
sftpstring(b'services'),
sftpstring(b'other_services'),
)
self.server.process()
self.assertIn('other_services', os.listdir('.'))
开发者ID:20tab,项目名称:pysftpserver,代码行数:26,代码来源:test_server_chroot.py
示例2: test_remove
def test_remove(self):
self.server.input_queue = sftpcmd(
SSH2_FXP_OPEN,
sftpstring(b'services'),
sftpint(SSH2_FXF_CREAT | SSH2_FXF_WRITE),
sftpint(SSH2_FILEXFER_ATTR_PERMISSIONS),
sftpint(0o644)
)
self.server.process()
handle = get_sftphandle(self.server.output_queue)
# reset output queue
self.server.output_queue = b''
self.server.input_queue = sftpcmd(
SSH2_FXP_CLOSE,
sftpstring(handle)
)
self.server.process()
self.server.input_queue = sftpcmd(
SSH2_FXP_REMOVE,
sftpstring(b'services'),
sftpint(0)
)
self.server.process()
开发者ID:20tab,项目名称:pysftpserver,代码行数:25,代码来源:test_server_chroot.py
示例3: test_write
def test_write(self, mock_request):
filename = b'services'
write_offset = 5
self.server.input_queue = sftpcmd(
SSH2_FXP_OPEN,
sftpstring(filename),
sftpint(SSH2_FXF_CREAT | SSH2_FXF_WRITE | SSH2_FXF_READ),
sftpint(SSH2_FILEXFER_ATTR_PERMISSIONS),
sftpint(0o644),
)
self.server.process()
handle = get_sftphandle(self.server.output_queue)
self.server.output_queue = b''
chunk = open('/etc/services', 'rb').read()
self.server.input_queue = sftpcmd(
SSH2_FXP_WRITE,
sftpstring(handle),
sftpint64(write_offset),
sftpstring(chunk),
)
self.server.process()
self.server.output_queue = b''
self.server.input_queue = sftpcmd(SSH2_FXP_CLOSE, sftpstring(handle))
self.server.process()
mock_request.assert_has_calls([
mock.ANY, # open
mock.call(
'POST', 'test_url/write', auth=None,
data={
'method': 'write', 'filename': filename,
'offset': write_offset}),
mock.ANY, # close
])
os.unlink(filename)
开发者ID:20tab,项目名称:pysftpserver,代码行数:34,代码来源:test_urlrequesthook.py
示例4: test_fstat
def test_fstat(self):
self.server.input_queue = sftpcmd(
SSH2_FXP_OPEN,
sftpstring(b'services'),
sftpint(SSH2_FXF_CREAT),
sftpint(0)
)
self.server.process()
handle = get_sftphandle(self.server.output_queue)
self.server.output_queue = b''
self.server.input_queue = sftpcmd(
SSH2_FXP_FSTAT,
sftpstring(handle)
)
self.server.process()
stat = get_sftpstat(self.server.output_queue)
self.assertEqual(stat['size'], 0)
self.assertEqual(stat['uid'], os.getuid())
self.server.output_queue = b''
self.server.input_queue = sftpcmd(
SSH2_FXP_CLOSE,
sftpstring(handle)
)
self.server.process()
os.unlink('services')
开发者ID:20tab,项目名称:pysftpserver,代码行数:28,代码来源:test_server_chroot.py
示例5: test_open
def test_open(self, mock_request):
filename = b'services'
flags = SSH2_FXF_CREAT | SSH2_FXF_WRITE
perm = 0o100600
self.server.input_queue = sftpcmd(
SSH2_FXP_OPEN,
sftpstring(filename),
sftpint(flags),
sftpint(SSH2_FILEXFER_ATTR_PERMISSIONS),
sftpint(perm),
)
self.server.process()
handle = get_sftphandle(self.server.output_queue)
self.server.output_queue = b''
self.server.input_queue = sftpcmd(SSH2_FXP_CLOSE, sftpstring(handle))
self.server.process()
mock_request.assert_has_calls([
mock.call(
'POST', 'test_url/open', auth=None,
data={
'method': 'open', 'filename': filename,
'flags': self.server.get_explicit_flags(flags),
'attrs': {b'perm': perm}}),
mock.ANY, # close
])
os.unlink(filename)
开发者ID:20tab,项目名称:pysftpserver,代码行数:26,代码来源:test_urlrequesthook.py
示例6: test_rm
def test_rm(self, mock_request):
filename = b'services'
self.server.input_queue = sftpcmd(
SSH2_FXP_OPEN,
sftpstring(filename),
sftpint(SSH2_FXF_CREAT | SSH2_FXF_WRITE),
sftpint(SSH2_FILEXFER_ATTR_PERMISSIONS),
sftpint(0o644)
)
self.server.process()
handle = get_sftphandle(self.server.output_queue)
self.server.output_queue = b''
self.server.input_queue = sftpcmd(SSH2_FXP_CLOSE, sftpstring(handle))
self.server.process()
self.server.output_queue = b''
self.server.input_queue = sftpcmd(
SSH2_FXP_REMOVE,
sftpstring(filename),
sftpint(0)
)
self.server.process()
mock_request.assert_has_calls([
mock.ANY, # open
mock.ANY, # close
mock.call(
'POST', 'test_url/rm', auth=None,
data={'method': 'rm', 'filename': filename}),
])
开发者ID:20tab,项目名称:pysftpserver,代码行数:28,代码来源:test_urlrequesthook.py
示例7: test_rename
def test_rename(self):
oldpath = b'services'
newpath = b'other_services'
self.server.input_queue = sftpcmd(
SSH2_FXP_OPEN,
sftpstring(oldpath),
sftpint(SSH2_FXF_CREAT | SSH2_FXF_WRITE),
sftpint(SSH2_FILEXFER_ATTR_PERMISSIONS),
sftpint(0o644)
)
self.server.process()
handle = get_sftphandle(self.server.output_queue)
self.server.output_queue = b''
self.server.input_queue = sftpcmd(SSH2_FXP_CLOSE, sftpstring(handle))
self.server.process()
self.server.output_queue = b''
self.server.input_queue = sftpcmd(
SSH2_FXP_RENAME,
sftpstring(oldpath),
sftpstring(newpath),
)
self.server.process()
self.assertEqual(self.hook.get_result('rename', 'oldpath'), oldpath)
self.assertEqual(self.hook.get_result('rename', 'newpath'), newpath)
os.unlink(newpath)
开发者ID:20tab,项目名称:pysftpserver,代码行数:25,代码来源:test_hook.py
示例8: test_realpath
def test_realpath(self, mock_request):
"""Additionally tests multiple urls and no path."""
self.server.hook = UrlRequestHook(
'test_url',
urls_mapping={
'realpath': ['test_url_1', 'test_url_2']},
paths_mapping={
'realpath': ''})
filename = b'services'
flags = SSH2_FXF_CREAT | SSH2_FXF_WRITE
perm = 0o100600
self.server.input_queue = sftpcmd(
SSH2_FXP_OPEN,
sftpstring(filename),
sftpint(flags),
sftpint(SSH2_FILEXFER_ATTR_PERMISSIONS),
sftpint(perm),
)
self.server.process()
self.server.output_queue = b''
self.server.input_queue = sftpcmd(SSH2_FXP_REALPATH,
sftpstring(filename))
self.server.process()
mock_request.assert_has_calls([
mock.ANY, # open
mock.call(
'POST', 'test_url_1/', auth=None,
data={'method': 'realpath', 'filename': filename}),
mock.call(
'POST', 'test_url_2/', auth=None,
data={'method': 'realpath', 'filename': filename}),
])
os.unlink(filename)
开发者ID:20tab,项目名称:pysftpserver,代码行数:33,代码来源:test_urlrequesthook.py
示例9: test_opendir
def test_opendir(self, mock_request):
"""Additionally tests single url and multiple paths."""
self.server.hook = UrlRequestHook(
'test_url',
paths_mapping={
'opendir': ['test_path_1', 'test_path_2', 'test_path_3']})
dirname = b'foo'
os.mkdir(dirname)
self.server.input_queue = sftpcmd(SSH2_FXP_OPENDIR,
sftpstring(dirname))
self.server.process()
handle = get_sftphandle(self.server.output_queue)
self.server.output_queue = b''
self.server.input_queue = sftpcmd(SSH2_FXP_CLOSE, sftpstring(handle))
self.server.process()
mock_request.assert_has_calls([
mock.call(
'POST', 'test_url/test_path_1', auth=None,
data={'method': 'opendir', 'filename': dirname}),
mock.call(
'POST', 'test_url/test_path_2', auth=None,
data={'method': 'opendir', 'filename': dirname}),
mock.call(
'POST', 'test_url/test_path_3', auth=None,
data={'method': 'opendir', 'filename': dirname}),
mock.ANY, # close
])
rmtree(dirname)
开发者ID:20tab,项目名称:pysftpserver,代码行数:28,代码来源:test_urlrequesthook.py
示例10: test_open_already_existing
def test_open_already_existing(self):
self.server.input_queue = sftpcmd(
SSH2_FXP_OPEN,
sftpstring(b'services'),
sftpint(SSH2_FXF_CREAT),
sftpint(0)
)
self.server.process()
handle = get_sftphandle(self.server.output_queue)
# reset output queue
self.server.output_queue = b''
self.server.input_queue = sftpcmd(
SSH2_FXP_CLOSE,
sftpstring(handle)
)
self.server.process()
self.server.input_queue = sftpcmd(
SSH2_FXP_OPEN,
sftpstring(b'services'),
sftpint(SSH2_FXF_CREAT | SSH2_FXF_EXCL),
sftpint(0)
)
self.assertRaises(SFTPException, self.server.process)
os.unlink('services')
开发者ID:20tab,项目名称:pysftpserver,代码行数:27,代码来源:test_server_chroot.py
示例11: test_write
def test_write(self):
filename = b'services'
write_offset = 5
self.server.input_queue = sftpcmd(
SSH2_FXP_OPEN,
sftpstring(filename),
sftpint(SSH2_FXF_CREAT | SSH2_FXF_WRITE | SSH2_FXF_READ),
sftpint(SSH2_FILEXFER_ATTR_PERMISSIONS),
sftpint(0o644),
)
self.server.process()
handle = get_sftphandle(self.server.output_queue)
self.server.output_queue = b''
chunk = open('/etc/services', 'rb').read()
self.server.input_queue = sftpcmd(
SSH2_FXP_WRITE,
sftpstring(handle),
sftpint64(write_offset),
sftpstring(chunk),
)
self.server.process()
self.assertEqual(self.hook.get_result('write', 'filename'), filename)
self.assertEqual(self.hook.get_result('write', 'offset'), write_offset)
self.server.output_queue = b''
self.server.input_queue = sftpcmd(SSH2_FXP_CLOSE, sftpstring(handle))
self.server.process()
os.unlink(filename)
开发者ID:20tab,项目名称:pysftpserver,代码行数:27,代码来源:test_hook.py
示例12: test_rename
def test_rename(self, mock_request):
oldpath = b'services'
newpath = b'other_services'
self.server.input_queue = sftpcmd(
SSH2_FXP_OPEN,
sftpstring(oldpath),
sftpint(SSH2_FXF_CREAT | SSH2_FXF_WRITE),
sftpint(SSH2_FILEXFER_ATTR_PERMISSIONS),
sftpint(0o644)
)
self.server.process()
handle = get_sftphandle(self.server.output_queue)
self.server.output_queue = b''
self.server.input_queue = sftpcmd(SSH2_FXP_CLOSE, sftpstring(handle))
self.server.process()
self.server.output_queue = b''
self.server.input_queue = sftpcmd(
SSH2_FXP_RENAME,
sftpstring(oldpath),
sftpstring(newpath),
)
self.server.process()
mock_request.assert_has_calls([
mock.ANY, # open
mock.ANY, # close
mock.call(
'POST', 'test_url/rename', auth=None,
data={
'method': 'rename', 'oldpath': oldpath,
'newpath': newpath}),
])
os.unlink(newpath)
开发者ID:20tab,项目名称:pysftpserver,代码行数:32,代码来源:test_urlrequesthook.py
示例13: test_mkdir_forbidden
def test_mkdir_forbidden(self):
self.server.input_queue = sftpcmd(
SSH2_FXP_MKDIR, sftpstring(b'../foo'), sftpint(0))
self.assertRaises(SFTPForbidden, self.server.process)
self.server.input_queue = sftpcmd(
SSH2_FXP_MKDIR, sftpstring(b'/foo'), sftpint(0))
self.assertRaises(SFTPForbidden, self.server.process)
开发者ID:20tab,项目名称:pysftpserver,代码行数:8,代码来源:test_server_chroot.py
示例14: test_rmdir
def test_rmdir(self):
dirname = b'foo'
# sftpint(0) means no attrs
self.server.input_queue = sftpcmd(
SSH2_FXP_MKDIR, sftpstring(dirname), sftpint(0))
self.server.process()
self.server.output_queue = b''
self.server.input_queue = sftpcmd(SSH2_FXP_RMDIR, sftpstring(dirname))
self.server.process()
self.assertEqual(self.hook.get_result('rmdir'), dirname)
开发者ID:20tab,项目名称:pysftpserver,代码行数:10,代码来源:test_hook.py
示例15: test_fsetstat
def test_fsetstat(self, mock_request):
filename = b'services'
attrs = {
b'size': 10**2,
b'perm': 0o100600,
b'atime': 1415626110,
b'mtime': 1415626120,
}
self.server.input_queue = sftpcmd(
SSH2_FXP_OPEN,
sftpstring(filename),
sftpint(SSH2_FXF_CREAT | SSH2_FXF_WRITE),
sftpint(0)
)
self.server.process()
handle = get_sftphandle(self.server.output_queue)
self.server.output_queue = b''
etc_services = open('/etc/services', 'rb').read()
self.server.input_queue = sftpcmd(
SSH2_FXP_WRITE,
sftpstring(handle),
sftpint64(0),
sftpstring(etc_services)
)
self.server.process()
self.server.output_queue = b''
self.server.input_queue = sftpcmd(
SSH2_FXP_FSETSTAT,
sftpstring(handle),
sftpint(
SSH2_FILEXFER_ATTR_SIZE |
SSH2_FILEXFER_ATTR_PERMISSIONS |
SSH2_FILEXFER_ATTR_ACMODTIME
),
sftpint64(attrs[b'size']),
sftpint(attrs[b'perm']),
sftpint(attrs[b'atime']),
sftpint(attrs[b'mtime']),
)
self.server.process()
self.server.output_queue = b''
self.server.input_queue = sftpcmd(SSH2_FXP_CLOSE, sftpstring(handle))
self.server.process()
mock_request.assert_has_calls([
mock.ANY, # open
mock.ANY, # write
mock.call(
'POST', 'test_url/fsetstat', auth=None,
data={
'method': 'fsetstat', 'filename': filename,
'attrs': attrs}),
mock.ANY, # close
])
os.unlink(filename)
开发者ID:20tab,项目名称:pysftpserver,代码行数:54,代码来源:test_urlrequesthook.py
示例16: test_opendir
def test_opendir(self):
dirname = b'foo'
os.mkdir(dirname)
self.server.input_queue = sftpcmd(SSH2_FXP_OPENDIR,
sftpstring(dirname))
self.server.process()
self.assertEqual(self.hook.get_result('opendir'), dirname)
handle = get_sftphandle(self.server.output_queue)
self.server.output_queue = b''
self.server.input_queue = sftpcmd(SSH2_FXP_CLOSE, sftpstring(handle))
self.server.process()
rmtree(dirname)
开发者ID:20tab,项目名称:pysftpserver,代码行数:12,代码来源:test_hook.py
示例17: test_open_forbidden
def test_open_forbidden(self):
self.server.input_queue = sftpcmd(
SSH2_FXP_OPEN, sftpstring(
b'/etc/services'), sftpint(SSH2_FXF_CREAT), sftpint(0)
)
self.assertRaises(SFTPForbidden, self.server.process)
self.server.input_queue = sftpcmd(
SSH2_FXP_OPEN, sftpstring(
b'../../foo'), sftpint(SSH2_FXF_CREAT), sftpint(0)
)
self.assertRaises(SFTPForbidden, self.server.process)
开发者ID:20tab,项目名称:pysftpserver,代码行数:12,代码来源:test_server_chroot.py
示例18: test_rename_forbidden
def test_rename_forbidden(self):
self.server.input_queue = sftpcmd(
SSH2_FXP_RENAME,
sftpstring(b'services'),
sftpstring(b'/etc/other_services'),
)
self.assertRaises(SFTPForbidden, self.server.process)
self.server.input_queue = sftpcmd(
SSH2_FXP_RENAME,
sftpstring(b'/etc/services'),
sftpstring(b'/etc/other_services'),
)
self.assertRaises(SFTPForbidden, self.server.process)
开发者ID:20tab,项目名称:pysftpserver,代码行数:14,代码来源:test_server_chroot.py
示例19: test_close
def test_close(self):
filename = b'services'
self.server.input_queue = sftpcmd(
SSH2_FXP_OPEN,
sftpstring(filename),
sftpint(SSH2_FXF_CREAT | SSH2_FXF_WRITE),
sftpint(0),
)
self.server.process()
handle = get_sftphandle(self.server.output_queue)
self.server.output_queue = b''
self.server.input_queue = sftpcmd(SSH2_FXP_CLOSE, sftpstring(handle))
self.server.process()
self.assertEqual(self.hook.get_result('close'), filename)
os.unlink(filename)
开发者ID:20tab,项目名称:pysftpserver,代码行数:15,代码来源:test_hook.py
示例20: test_rmdir
def test_rmdir(self, mock_request):
dirname = b'foo'
# sftpint(0) means no attrs
self.server.input_queue = sftpcmd(
SSH2_FXP_MKDIR, sftpstring(dirname), sftpint(0))
self.server.process()
self.server.output_queue = b''
self.server.input_queue = sftpcmd(SSH2_FXP_RMDIR, sftpstring(dirname))
self.server.process()
mock_request.assert_has_calls([
mock.ANY, # mkdir
mock.call(
'POST', 'test_url/rmdir', auth=None,
data={'method': 'rmdir', 'filename': dirname}),
])
开发者ID:20tab,项目名称:pysftpserver,代码行数:15,代码来源:test_urlrequesthook.py
注:本文中的pysftpserver.tests.utils.sftpcmd函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论