• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python io.IO类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中slimta.smtp.io.IO的典型用法代码示例。如果您正苦于以下问题:Python IO类的具体用法?Python IO怎么用?Python IO使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了IO类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_recv_utf8

 def test_recv_utf8(self):
     self.sock.recv(IsA(int)).AndReturn(b'250 \xc3\xbf\r\n')
     self.mox.ReplayAll()
     io = IO(self.sock)
     code, message = io.recv_reply()
     self.assertEqual('250', code)
     self.assertEqual(u'\xff', message)
开发者ID:madhugb,项目名称:python-slimta,代码行数:7,代码来源:test_slimta_smtp_io.py


示例2: test_recv_command_arg

 def test_recv_command_arg(self):
     self.sock.recv(IsA(int)).AndReturn(b'cmd arg \r\n')
     self.mox.ReplayAll()
     io = IO(self.sock)
     command, arg = io.recv_command()
     self.assertEqual(b'CMD', command)
     self.assertEqual(b'arg', arg)
开发者ID:madhugb,项目名称:python-slimta,代码行数:7,代码来源:test_slimta_smtp_io.py


示例3: test_recv_command_nonutf8

 def test_recv_command_nonutf8(self):
     self.sock.recv(IsA(int)).AndReturn(b'cmd\xffr\n')
     self.mox.ReplayAll()
     io = IO(self.sock)
     command, arg = io.recv_command()
     self.assertEqual(None, command)
     self.assertEqual(None, arg)
开发者ID:madhugb,项目名称:python-slimta,代码行数:7,代码来源:test_slimta_smtp_io.py


示例4: test_recv_reply_multiline

 def test_recv_reply_multiline(self):
     self.sock.recv(IsA(int)).AndReturn(b'250-One\r\n250 Two\r\n')
     self.mox.ReplayAll()
     io = IO(self.sock)
     code, message = io.recv_reply()
     self.assertEqual('250', code)
     self.assertEqual('One\r\nTwo', message)
开发者ID:madhugb,项目名称:python-slimta,代码行数:7,代码来源:test_slimta_smtp_io.py


示例5: test_recv_command

 def test_recv_command(self):
     self.sock.recv(IsA(int)).AndReturn('CMD\r\n')
     self.mox.ReplayAll()
     io = IO(self.sock)
     command, arg = io.recv_command()
     assert_equal('CMD', command)
     assert_equal(None, arg)
开发者ID:drewlander,项目名称:python-slimta,代码行数:7,代码来源:test_slimta_smtp_io.py


示例6: test_recv_reply

 def test_recv_reply(self):
     self.sock.recv(IsA(int)).AndReturn(b'250 Ok\r\n')
     self.mox.ReplayAll()
     io = IO(self.sock)
     code, message = io.recv_reply()
     self.assertEqual('250', code)
     self.assertEqual('Ok', message)
开发者ID:madhugb,项目名称:python-slimta,代码行数:7,代码来源:test_slimta_smtp_io.py


示例7: test_recv_reply_multipart

 def test_recv_reply_multipart(self):
     self.sock.recv(IsA(int)).AndReturn('250 ')
     self.sock.recv(IsA(int)).AndReturn('Ok\r\n')
     self.mox.ReplayAll()
     io = IO(self.sock)
     code, message = io.recv_reply()
     assert_equal('250', code)
     assert_equal('Ok', message)
开发者ID:drewlander,项目名称:python-slimta,代码行数:8,代码来源:test_slimta_smtp_io.py


示例8: test_recv

 def test_recv(self):
     self.sock.recv(IsA(int)).AndReturn("\r\nthree\r\n")
     self.sock.recv(IsA(int)).AndReturn(".\r\nstuff\r\n")
     self.mox.ReplayAll()
     io = IO(self.sock)
     io.recv_buffer = "one\r\ntwo"
     dr = DataReader(io)
     self.assertEqual("one\r\ntwo\r\nthree\r\n", dr.recv())
开发者ID:MichaelEvanchik,项目名称:python-slimta,代码行数:8,代码来源:test_slimta_smtp_datareader.py


示例9: test_recv_line

 def test_recv_line(self):
     self.sock.recv(IsA(int)).AndReturn(b'one')
     self.sock.recv(IsA(int)).AndReturn(b'\r\ntwo')
     self.mox.ReplayAll()
     io = IO(self.sock)
     line = io.recv_line()
     self.assertEqual(b'one', line)
     self.assertEqual(b'two', io.recv_buffer)
开发者ID:madhugb,项目名称:python-slimta,代码行数:8,代码来源:test_slimta_smtp_io.py


示例10: test_recv

 def test_recv(self):
     self.sock.recv(IsA(int)).AndReturn(b'\r\nthree\r\n')
     self.sock.recv(IsA(int)).AndReturn(b'.\r\nstuff\r\n')
     self.mox.ReplayAll()
     io = IO(self.sock)
     io.recv_buffer = b'one\r\ntwo'
     dr = DataReader(io)
     self.assertEqual(b'one\r\ntwo\r\nthree\r\n', dr.recv())
开发者ID:madhugb,项目名称:python-slimta,代码行数:8,代码来源:test_slimta_smtp_datareader.py


示例11: __init__

    def __init__(self, socket, handlers, auth_class=None,
                       tls=None, tls_immediately=False, tls_wrapper=None,
                       command_timeout=None, data_timeout=None):
        self.handlers = handlers
        self.extensions = Extensions()

        self.have_mailfrom = None
        self.have_rcptto = None
        self.ehlo_as = None
        self.auth_result = None
        self.encrypted = False

        self.extensions.add('8BITMIME')
        self.extensions.add('PIPELINING')
        self.extensions.add('ENHANCEDSTATUSCODES')
        if tls and not tls_immediately:
            self.extensions.add('STARTTLS')
        if auth_class:
            self.extensions.add('AUTH', auth_class(self))

        self.io = IO(socket, tls_wrapper)

        if tls:
            self.tls = tls.copy()
            self.tls.setdefault('server_side', True)
        else:
            self.tls = None
        self.tls_immediately = tls_immediately

        self.command_timeout = command_timeout
        self.data_timeout = data_timeout or command_timeout
开发者ID:RoboticCheese,项目名称:python-slimta,代码行数:31,代码来源:server.py


示例12: test_send_reply_multiline

 def test_send_reply_multiline(self):
     self.mox.ReplayAll()
     io = IO(self.sock)
     io.send_reply(Reply('100', 'One\r\nTwo'))
     self.assertEqual(b'100-One\r\n100 Two\r\n', io.send_buffer.getvalue())
开发者ID:madhugb,项目名称:python-slimta,代码行数:5,代码来源:test_slimta_smtp_io.py


示例13: test_from_recv_buffer

 def test_from_recv_buffer(self):
     io = IO(None)
     io.recv_buffer = "test\r\ndata"
     dr = DataReader(io)
     dr.from_recv_buffer()
     self.assertEqual(["test\r\n", "data"], dr.lines)
开发者ID:MichaelEvanchik,项目名称:python-slimta,代码行数:6,代码来源:test_slimta_smtp_datareader.py


示例14: Server

class Server(object):
    """Class that implements an SMTP server given a connected socket. This
    object has an ``extensions`` attribute that is an |Extensions| object that
    contains the SMTP extensions the server supports.

    :param socket: Connected socket for the session.
    :type socket: :class:`gevent.socket.socket`
    :param handlers: Object with methods that will be called when
                     corresponding SMTP commands are received. These methods
                     can modify the |Reply| before the command response is sent.
    :param auth_class: Optional |Auth| sub-class to enable authentication.
    :param tls: Optional dictionary of TLS settings passed directly as
                keyword arguments to :class:`gevent.ssl.SSLSocket`.
    :param tls_immediately: If True, the socket will be encrypted
                            immediately.
    :param tls_wrapper: Optional function that takes a socket and the ``tls``
                        dictionary, creates a new encrypted socket, performs
                        the TLS handshake, and returns it. The default uses
                        :class:`~gevent.ssl.SSLSocket`.
    :type tls_immediately: True or False
    :param command_timeout: Optional timeout waiting for a command to be
                            sent from the client.
    :param data_timeout: Optional timeout waiting for data to be sent from
                         the client.

    """

    def __init__(self, socket, handlers, auth_class=None,
                       tls=None, tls_immediately=False, tls_wrapper=None,
                       command_timeout=None, data_timeout=None):
        self.handlers = handlers
        self.extensions = Extensions()

        self.have_mailfrom = None
        self.have_rcptto = None
        self.ehlo_as = None
        self.auth_result = None
        self.encrypted = False

        self.extensions.add('8BITMIME')
        self.extensions.add('PIPELINING')
        self.extensions.add('ENHANCEDSTATUSCODES')
        if tls and not tls_immediately:
            self.extensions.add('STARTTLS')
        if auth_class:
            self.extensions.add('AUTH', auth_class(self))

        self.io = IO(socket, tls_wrapper)

        if tls:
            self.tls = tls.copy()
            self.tls.setdefault('server_side', True)
        else:
            self.tls = None
        self.tls_immediately = tls_immediately

        self.command_timeout = command_timeout
        self.data_timeout = data_timeout or command_timeout

    def _recv_command(self):
        timeout = Timeout(self.command_timeout)
        timeout.start()
        try:
            return self.io.recv_command()
        finally:
            timeout.cancel()

    def _get_message_data(self):
        max_size = self.extensions.getparam('SIZE', filter=int)
        reader = DataReader(self.io, max_size)

        err = None
        timeout = Timeout(self.data_timeout)
        timeout.start()
        try:
            data = reader.recv()
        except ConnectionLost:
            raise
        except SmtpError, e:
            data = None
            err = e
        finally:
开发者ID:RoboticCheese,项目名称:python-slimta,代码行数:82,代码来源:server.py


示例15: test_send_reply

 def test_send_reply(self):
     self.mox.ReplayAll()
     io = IO(self.sock)
     io.send_reply(Reply('100', 'Ok'))
     self.assertEqual(b'100 Ok\r\n', io.send_buffer.getvalue())
开发者ID:madhugb,项目名称:python-slimta,代码行数:5,代码来源:test_slimta_smtp_io.py


示例16: test_send_command

 def test_send_command(self):
     self.mox.ReplayAll()
     io = IO(self.sock)
     io.send_command('CMD')
     assert_equal('CMD\r\n', io.send_buffer.getvalue())
开发者ID:drewlander,项目名称:python-slimta,代码行数:5,代码来源:test_slimta_smtp_io.py


示例17: test_flush_send_empty

 def test_flush_send_empty(self):
     self.mox.ReplayAll()
     io = IO(self.sock)
     io.flush_send()
开发者ID:madhugb,项目名称:python-slimta,代码行数:4,代码来源:test_slimta_smtp_io.py


示例18: test_send_reply_nonascii

 def test_send_reply_nonascii(self):
     self.mox.ReplayAll()
     io = IO(self.sock)
     io.send_reply(Reply('100', u'Ok\xff'))
     self.assertEqual(b'100 Ok\xc3\xbf\r\n', io.send_buffer.getvalue())
开发者ID:madhugb,项目名称:python-slimta,代码行数:5,代码来源:test_slimta_smtp_io.py


示例19: test_flush_send

 def test_flush_send(self):
     self.sock.sendall(b'some data')
     self.mox.ReplayAll()
     io = IO(self.sock)
     io.buffered_send(b'some data')
     io.flush_send()
开发者ID:madhugb,项目名称:python-slimta,代码行数:6,代码来源:test_slimta_smtp_io.py


示例20: test_buffered_send

 def test_buffered_send(self):
     self.mox.ReplayAll()
     io = IO(self.sock)
     io.buffered_send(b'some data')
     self.assertEqual(b'some data', io.send_buffer.getvalue())
开发者ID:madhugb,项目名称:python-slimta,代码行数:5,代码来源:test_slimta_smtp_io.py



注:本文中的slimta.smtp.io.IO类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python reply.Reply类代码示例发布时间:2022-05-27
下一篇:
Python datareader.DataReader类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap