• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python producers.simple_producer函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中supervisor.medusa.producers.simple_producer函数的典型用法代码示例。如果您正苦于以下问题:Python simple_producer函数的具体用法?Python simple_producer怎么用?Python simple_producer使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了simple_producer函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: status

 def status(self):
     # Thanks to [email protected] (Mike Meyer)
     r = [
         producers.simple_producer(
             "<li>Authorization Extension : " "<b>Unauthorized requests:</b> %s<ul>" % self.fail_count
         )
     ]
     if hasattr(self.handler, "status"):
         r.append(self.handler.status())
     r.append(producers.simple_producer("</ul>"))
     return producers.composite_producer(r)
开发者ID:GulsahKose,项目名称:supervisor,代码行数:11,代码来源:auth_handler.py


示例2: status

    def status (self):
        from supervisor.medusa.status_handler import english_bytes
        def nice_bytes (n):
            return ''.join(english_bytes (n))

        handler_stats = [_f for _f in map (maybe_status, self.handlers) if _f]

        if self.total_clients:
            ratio = self.total_requests.as_long() / float(self.total_clients.as_long())
        else:
            ratio = 0.0

        return producers.composite_producer (
                [producers.lines_producer (
                        ['<h2>%s</h2>'                                                  % self.SERVER_IDENT,
                        '<br>Listening on: <b>Host:</b> %s'             % self.server_name,
                        '<b>Port:</b> %d'                                               % self.port,
                         '<p><ul>'
                         '<li>Total <b>Clients:</b> %s'                 % self.total_clients,
                         '<b>Requests:</b> %s'                                  % self.total_requests,
                         '<b>Requests/Client:</b> %.1f'                 % ratio,
                         '<li>Total <b>Bytes In:</b> %s'        % (nice_bytes (self.bytes_in.as_long())),
                         '<b>Bytes Out:</b> %s'                         % (nice_bytes (self.bytes_out.as_long())),
                         '<li>Total <b>Exceptions:</b> %s'              % self.exceptions,
                         '</ul><p>'
                         '<b>Extension List</b><ul>',
                         ])] + handler_stats + [producers.simple_producer('</ul>')]
                )
开发者ID:lmcdonough,项目名称:supervisor,代码行数:28,代码来源:http_server.py


示例3: status

 def status (self):
     import supervisor.medusa.producers as producers
     return producers.simple_producer (
             '<li> Redirecting Handler %s => %s <b>Hits</b>: %s' % (
                     self.pattern, self.redirect, self.hits
                     )
             )
开发者ID:GulsahKose,项目名称:supervisor,代码行数:7,代码来源:redirecting_handler.py


示例4: getresponse

    def getresponse(self, body):
        self.request["Content-Type"] = "text/xml"
        self.request["Content-Length"] = len(body)
        self.request.push(body)
        connection = get_header(self.CONNECTION, self.request.header)

        close_it = 0
        wrap_in_chunking = 0

        if self.request.version == "1.0":
            if connection == "keep-alive":
                if not self.request.has_key("Content-Length"):
                    close_it = 1
                else:
                    self.request["Connection"] = "Keep-Alive"
            else:
                close_it = 1
        elif self.request.version == "1.1":
            if connection == "close":
                close_it = 1
            elif not self.request.has_key("Content-Length"):
                if self.request.has_key("Transfer-Encoding"):
                    if not self.request["Transfer-Encoding"] == "chunked":
                        close_it = 1
                elif self.request.use_chunked:
                    self.request["Transfer-Encoding"] = "chunked"
                    wrap_in_chunking = 1
                else:
                    close_it = 1
        elif self.request.version is None:
            close_it = 1

        outgoing_header = producers.simple_producer(self.request.build_reply_header())

        if close_it:
            self.request["Connection"] = "close"

        if wrap_in_chunking:
            outgoing_producer = producers.chunked_producer(producers.composite_producer(self.request.outgoing))
            # prepend the header
            outgoing_producer = producers.composite_producer([outgoing_header, outgoing_producer])
        else:
            # prepend the header
            self.request.outgoing.insert(0, outgoing_header)
            outgoing_producer = producers.composite_producer(self.request.outgoing)

        # apply a few final transformations to the output
        self.request.channel.push_with_producer(
            # globbing gives us large packets
            producers.globbing_producer(
                # hooking lets us log the number of bytes sent
                producers.hooked_producer(outgoing_producer, self.request.log)
            )
        )

        self.request.channel.current_request = None

        if close_it:
            self.request.channel.close_when_done()
开发者ID:Jude7,项目名称:minos,代码行数:59,代码来源:xmlrpc.py


示例5: push

 def push (self, thing):
     # Sometimes, text gets pushed by XMLRPC logic for later
     # processing.
     if isinstance(thing, str):
         thing = as_bytes(thing)
     if isinstance(thing, bytes):
         thing = producers.simple_producer(thing, buffer_size=len(thing))
     self.outgoing.append(thing)
开发者ID:the5fire,项目名称:supervisor,代码行数:8,代码来源:http_server.py


示例6: status

 def status (self):
     return producers.simple_producer (
             '<h2>%s</h2>'                                           % self.SERVER_IDENT
             + '<br><b>Total Sessions:</b> %s'               % self.total_sessions
             + '<br><b>Current Sessions:</b> %d'     % (
                     self.total_sessions.as_long()-self.closed_sessions.as_long()
                     )
             )
开发者ID:lmcdonough,项目名称:supervisor,代码行数:8,代码来源:monitor.py


示例7: status

 def status (self):
     return producers.simple_producer (
             '<li>Server-Side Script Handler'
             + '<ul>'
             + '  <li><b>Hits:</b> %s' % self.hits
             + '  <li><b>Exceptions:</b> %s' % self.exceptions
             + '</ul>'
             )
开发者ID:GulsahKose,项目名称:supervisor,代码行数:8,代码来源:script_handler.py


示例8: status

 def status (self):
     return producers.simple_producer (
             '<li>%s' % status_handler.html_repr (self)
             + '<ul>'
             + '  <li><b>Total Hits:</b> %s'                 % self.hit_counter
             + '  <li><b>Files Delivered:</b> %s'    % self.file_counter
             + '  <li><b>Cache Hits:</b> %s'                 % self.cache_counter
             + '</ul>'
             )
开发者ID:GulsahKose,项目名称:supervisor,代码行数:9,代码来源:default_handler.py


示例9: status

 def status (self):
     import supervisor.medusa.producers as producers
     return producers.simple_producer (
             '<h2>%s</h2>'                                   % self.SERVER_IDENT
             + '<br>Server: %s'                              % self.server
             + '<br>Cache Entries: %d'               % len(self.cache)
             + '<br>Outstanding Requests: %d' % len(self.request_map)
             + '<br>Forward Requests: %s'    % self.forward_requests
             + '<br>Reverse Requests: %s'    % self.reverse_requests
             + '<br>Cache Hits: %s'                  % self.cache_hits
             )
开发者ID:lmcdonough,项目名称:supervisor,代码行数:11,代码来源:resolver.py


示例10: check_chunked

    def check_chunked (self):
        p1 = producers.simple_producer('the quick brown fox', buffer_size = 5)
        p = producers.chunked_producer(p1, footers=['FOOTER'])
        self._check_all(p, """5\r
the q\r
5\r
uick \r
5\r
brown\r
4\r
 fox\r
0\r
FOOTER\r
\r\n""")
开发者ID:WLPhoenix,项目名称:supervisor-py3k,代码行数:14,代码来源:test_producers.py


示例11: getresponse

    def getresponse(self, body):
        self.request['Content-Type'] = 'text/xml'
        self.request['Content-Length'] = len(body)
        self.request.push(body)
        connection = get_header(self.CONNECTION, self.request.header)

        close_it = 0

        if self.request.version == '1.0':
            if connection == 'keep-alive':
                self.request['Connection'] = 'Keep-Alive'
            else:
                close_it = 1
        elif self.request.version == '1.1':
            if connection == 'close':
                close_it = 1
        elif self.request.version is None:
            close_it = 1

        outgoing_header = producers.simple_producer (
            self.request.build_reply_header())

        if close_it:
            self.request['Connection'] = 'close'

        # prepend the header
        self.request.outgoing.insert(0, outgoing_header)
        outgoing_producer = producers.composite_producer(self.request.outgoing)

        # apply a few final transformations to the output
        self.request.channel.push_with_producer (
                # globbing gives us large packets
                producers.globbing_producer (
                        # hooking lets us log the number of bytes sent
                        producers.hooked_producer (
                                outgoing_producer,
                                self.request.log
                                )
                        )
                )

        self.request.channel.current_request = None

        if close_it:
            self.request.channel.close_when_done()
开发者ID:denghp,项目名称:supervisor,代码行数:45,代码来源:xmlrpc.py


示例12: check_hooked

 def check_hooked (self):
     def f (num_bytes):
         self.test_val('num_bytes', len(test_string))
     p1 = producers.simple_producer(test_string, buffer_size = 5)
     p = producers.hooked_producer(p1, f)
     self._check_all(p, test_string)
开发者ID:WLPhoenix,项目名称:supervisor-py3k,代码行数:6,代码来源:test_producers.py


示例13: check_glob

 def check_glob (self):
     p1 = producers.simple_producer(test_string, buffer_size = 5)
     p = producers.globbing_producer(p1, buffer_size = 1024)
     self.test_true('1024 <= len(p.more())')
开发者ID:WLPhoenix,项目名称:supervisor-py3k,代码行数:4,代码来源:test_producers.py


示例14: check_composite

 def check_composite (self):
     p1 = producers.simple_producer('a'*66, buffer_size = 5)
     p2 = producers.lines_producer(['b']*65)
     p = producers.composite_producer([p1, p2])
     self._check_all(p, 'a'*66 + 'b\r\n'*65)
开发者ID:WLPhoenix,项目名称:supervisor-py3k,代码行数:5,代码来源:test_producers.py


示例15: done

    def done (self):
        """finalize this transaction - send output to the http channel"""

        # ----------------------------------------
        # persistent connection management
        # ----------------------------------------

        #  --- BUCKLE UP! ----

        connection = get_header(CONNECTION, self.header).lower()

        close_it = 0
        wrap_in_chunking = 0

        if self.version == '1.0':
            if connection == 'keep-alive':
                if 'Content-Length' not in self:
                    close_it = 1
                else:
                    self['Connection'] = 'Keep-Alive'
            else:
                close_it = 1
        elif self.version == '1.1':
            if connection == 'close':
                close_it = 1
            elif 'Content-Length' not in self:
                if 'Transfer-Encoding' in self:
                    if not self['Transfer-Encoding'] == 'chunked':
                        close_it = 1
                elif self.use_chunked:
                    self['Transfer-Encoding'] = 'chunked'
                    wrap_in_chunking = 1
                else:
                    close_it = 1
        elif self.version is None:
            # Although we don't *really* support http/0.9 (because we'd have to
            # use \r\n as a terminator, and it would just yuck up a lot of stuff)
            # it's very common for developers to not want to type a version number
            # when using telnet to debug a server.
            close_it = 1

        outgoing_header = producers.simple_producer(self.get_reply_header_text())

        if close_it:
            self['Connection'] = 'close'

        if wrap_in_chunking:
            outgoing_producer = producers.chunked_producer (
                    producers.composite_producer (self.outgoing)
                    )
            # prepend the header
            outgoing_producer = producers.composite_producer(
                [outgoing_header, outgoing_producer]
                )
        else:
            # prepend the header
            self.outgoing.insert(0, outgoing_header)
            outgoing_producer = producers.composite_producer (self.outgoing)

        # apply a few final transformations to the output
        self.channel.push_with_producer (
                # globbing gives us large packets
                producers.globbing_producer (
                        # hooking lets us log the number of bytes sent
                        producers.hooked_producer (
                                outgoing_producer,
                                self.log
                                )
                        )
                )

        self.channel.current_request = None

        if close_it:
            self.channel.close_when_done()
开发者ID:lmcdonough,项目名称:supervisor,代码行数:75,代码来源:http_server.py


示例16: sendresponse

    def sendresponse(self, response):

        headers = response.get('headers', {})
        for header in headers:
            self.request[header] = headers[header]

        if not self.request.has_key('Content-Type'):
            self.request['Content-Type'] = 'text/plain'

        if headers.get('Location'):
            self.request['Content-Length'] = 0
            self.request.error(301)
            return

        body = response.get('body', '')
        self.request['Content-Length'] = len(body)

        self.request.push(body)

        connection = get_header(self.CONNECTION, self.request.header)

        close_it = 0
        wrap_in_chunking = 0

        if self.request.version == '1.0':
            if connection == 'keep-alive':
                if not self.request.has_key('Content-Length'):
                    close_it = 1
                else:
                    self.request['Connection'] = 'Keep-Alive'
            else:
                close_it = 1
        elif self.request.version == '1.1':
            if connection == 'close':
                close_it = 1
            elif not self.request.has_key ('Content-Length'):
                if self.request.has_key ('Transfer-Encoding'):
                    if not self.request['Transfer-Encoding'] == 'chunked':
                        close_it = 1
                elif self.request.use_chunked:
                    self.request['Transfer-Encoding'] = 'chunked'
                    wrap_in_chunking = 1
                else:
                    close_it = 1
        elif self.request.version is None:
            close_it = 1

        outgoing_header = producers.simple_producer (
            self.request.build_reply_header())

        if close_it:
            self.request['Connection'] = 'close'

        if wrap_in_chunking:
            outgoing_producer = producers.chunked_producer (
                    producers.composite_producer (self.request.outgoing)
                    )
            # prepend the header
            outgoing_producer = producers.composite_producer(
                [outgoing_header, outgoing_producer]
                )
        else:
            # prepend the header
            self.request.outgoing.insert(0, outgoing_header)
            outgoing_producer = producers.composite_producer (
                self.request.outgoing)

        # apply a few final transformations to the output
        self.request.channel.push_with_producer (
                # globbing gives us large packets
                producers.globbing_producer (
                        # hooking lets us log the number of bytes sent
                        producers.hooked_producer (
                                outgoing_producer,
                                self.request.log
                                )
                        )
                )

        self.request.channel.current_request = None

        if close_it:
            self.request.channel.close_when_done()
开发者ID:B-Rich,项目名称:supervisor,代码行数:83,代码来源:web.py


示例17: sendresponse

    def sendresponse(self, response):

        headers = response.get("headers", {})
        for header in headers:
            self.request[header] = headers[header]

        if "Content-Type" not in self.request:
            self.request["Content-Type"] = "text/plain"

        if headers.get("Location"):
            self.request["Content-Length"] = 0
            self.request.error(301)
            return

        body = response.get("body", "")
        self.request["Content-Length"] = len(body)

        self.request.push(body)

        connection = get_header(self.CONNECTION, self.request.header)

        close_it = 0
        wrap_in_chunking = 0

        if self.request.version == "1.0":
            if connection == "keep-alive":
                if not self.request.has_key("Content-Length"):
                    close_it = 1
                else:
                    self.request["Connection"] = "Keep-Alive"
            else:
                close_it = 1
        elif self.request.version == "1.1":
            if connection == "close":
                close_it = 1
            elif "Content-Length" not in self.request:
                if "Transfer-Encoding" in self.request:
                    if not self.request["Transfer-Encoding"] == "chunked":
                        close_it = 1
                elif self.request.use_chunked:
                    self.request["Transfer-Encoding"] = "chunked"
                    wrap_in_chunking = 1
                else:
                    close_it = 1
        elif self.request.version is None:
            close_it = 1

        outgoing_header = producers.simple_producer(self.request.build_reply_header())

        if close_it:
            self.request["Connection"] = "close"

        if wrap_in_chunking:
            outgoing_producer = producers.chunked_producer(producers.composite_producer(self.request.outgoing))
            # prepend the header
            outgoing_producer = producers.composite_producer([outgoing_header, outgoing_producer])
        else:
            # fix AttributeError: 'unicode' object has no attribute 'more'
            if (not PY3) and (len(self.request.outgoing) > 0):
                body = self.request.outgoing[0]
                if isinstance(body, unicode):
                    self.request.outgoing[0] = producers.simple_producer(body)

            # prepend the header
            self.request.outgoing.insert(0, outgoing_header)
            outgoing_producer = producers.composite_producer(self.request.outgoing)

        # apply a few final transformations to the output
        self.request.channel.push_with_producer(
            # globbing gives us large packets
            producers.globbing_producer(
                # hooking lets us log the number of bytes sent
                producers.hooked_producer(outgoing_producer, self.request.log)
            )
        )

        self.request.channel.current_request = None

        if close_it:
            self.request.channel.close_when_done()
开发者ID:dvarrazzo,项目名称:supervisor,代码行数:80,代码来源:web.py


示例18: status

 def status (self):
     return producers.simple_producer (
             '<li>Status Extension <b>Hits</b> : %s' % self.hit_counter
             )
开发者ID:GulsahKose,项目名称:supervisor,代码行数:4,代码来源:status_handler.py


示例19: check_simple

    def check_simple (self):
        p = producers.simple_producer(test_string)
        self.test_val('p.more()', test_string[:1024])

        p = producers.simple_producer(test_string, buffer_size = 5)
        self._check_all(p, test_string)
开发者ID:WLPhoenix,项目名称:supervisor-py3k,代码行数:6,代码来源:test_producers.py


示例20: check_compressed

 def check_compressed (self):
     p1 = producers.simple_producer(test_string, buffer_size = 5)
     p = producers.compressed_producer(p1)
     compr_data = self._check_all(p, zlib.compress(test_string, 5))
     self.test_val('zlib.decompress(compr_data)', test_string)
开发者ID:WLPhoenix,项目名称:supervisor-py3k,代码行数:5,代码来源:test_producers.py



注:本文中的supervisor.medusa.producers.simple_producer函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python options.make_namespec函数代码示例发布时间:2022-05-27
下一篇:
Python producers.composite_producer函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap