• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python strports.parse函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中twisted.application.strports.parse函数的典型用法代码示例。如果您正苦于以下问题:Python parse函数的具体用法?Python parse怎么用?Python parse使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了parse函数的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: makeService

def makeService(config):
    from twisted.internet import reactor

    multi = service.MultiService()

    domain = config['domain']
    mutils.maybeWarnAboutDomain(reactor, domain)

    closureLibrary = FilePath(config['closure-library'])
    mutils.maybeWarnAboutClosureLibrary(reactor, closureLibrary)

    socketPorts = []
    for minervaStrport in config['minerva']:
            _, _args, _ = strports.parse(minervaStrport, object())
            socketPorts.append(_args[0])

    fileCache = FileCache(lambda: reactor.seconds(), -1)
    stf, httpSite = site.setupMinerva(
            reactor, fileCache, socketPorts, domain, closureLibrary
    )
    httpSite.displayTracebacks = not config["no-tracebacks"]

    for httpStrport in config['http']:
            httpServer = strports.service(httpStrport, httpSite)
            httpServer.setServiceParent(multi)

    for minervaStrport in config['minerva']:
            minervaServer = strports.service(minervaStrport, stf)
            minervaServer.setServiceParent(multi)

    return multi
开发者ID:Julian,项目名称:cardboard,代码行数:31,代码来源:config.py


示例2: makeService

def makeService(config):
	from twisted.internet import reactor, task

	multi = service.MultiService()

	domain = config['domain']
	mutils.maybeWarnAboutDomain(reactor, domain)

	closureLibrary = FilePath(config['closure-library'])
	mutils.maybeWarnAboutClosureLibrary(reactor, closureLibrary)

	socketPorts = []
	for minervaStrport in config['minerva']:
		_, _args, _ = strports.parse(minervaStrport, object())
		socketPorts.append(_args[0])

	doReloading = bool(int(os.environ.get('PYRELOADING', '0')))
	fileCache = FileCache(lambda: reactor.seconds(), 0.1 if doReloading else -1)
	stf, httpSite = demosminerva_site.makeMinervaAndHttp(
		reactor, fileCache, socketPorts, domain, closureLibrary)
	httpSite.displayTracebacks = not config["no-tracebacks"]

	for httpStrport in config['http']:
		httpServer = strports.service(httpStrport, httpSite)
		httpServer.setServiceParent(multi)

	for minervaStrport in config['minerva']:
		minervaServer = strports.service(minervaStrport, stf)
		minervaServer.setServiceParent(multi)

	if doReloading:
		mutils.enablePyquitter(reactor)

	return multi
开发者ID:ludios,项目名称:DemosMinerva,代码行数:34,代码来源:tap.py


示例3: test_modeUNIX

 def test_modeUNIX(self):
     """
     C{mode} can be set by including C{"mode=<some integer>"}.
     """
     self.assertEqual(
         strports.parse('unix:/var/run/finger:mode=0660', self.f),
         ('UNIX', ('/var/run/finger', self.f),
          {'mode': 0660, 'backlog': 50, 'wantPID': True}))
开发者ID:Almad,项目名称:twisted,代码行数:8,代码来源:test_strports.py


示例4: test_wantPIDUNIX

 def test_wantPIDUNIX(self):
     """
     C{wantPID} can be set to false by included C{"lockfile=0"}.
     """
     self.assertEqual(
         strports.parse('unix:/var/run/finger:lockfile=0', self.f),
         ('UNIX', ('/var/run/finger', self.f),
          {'mode': 0666, 'backlog': 50, 'wantPID': False}))
开发者ID:Almad,项目名称:twisted,代码行数:8,代码来源:test_strports.py


示例5: test_defaultPort

 def test_defaultPort(self):
     """
     If the I{--port} option is not specified, L{Options} defaults the port
     to C{8080}.
     """
     options = Options()
     options.parseOptions([])
     self.assertEqual(strports.parse(options["port"], None)[:2], ("TCP", (8080, None)))
开发者ID:jsober,项目名称:twisted,代码行数:8,代码来源:test_tap.py


示例6: parse

 def parse(self, *a, **kw):
     result = strports.parse(*a, **kw)
     warnings = self.flushWarnings([self.parse])
     self.assertEqual(len(warnings), 1)
     self.assertEqual(
         warnings[0]['message'],
         "twisted.application.strports.parse was deprecated "
         "in Twisted 10.2.0: in favor of twisted.internet.endpoints.serverFromString")
     return result
开发者ID:12019,项目名称:OpenWrt_Luci_Lua,代码行数:9,代码来源:test_strports.py


示例7: test_simpleUNIX

 def test_simpleUNIX(self):
     """
     L{strports.parse} returns a C{'UNIX'} port description with defaults
     for C{'mode'}, C{'backlog'}, and C{'wantPID'} when passed a string with
     the C{'unix:'} prefix and no other parameter values.
     """
     self.assertEqual(
         strports.parse('unix:/var/run/finger', self.f),
         ('UNIX', ('/var/run/finger', self.f),
          {'mode': 0666, 'backlog': 50, 'wantPID': True}))
开发者ID:Almad,项目名称:twisted,代码行数:10,代码来源:test_strports.py


示例8: test_defaultPersonalPath

 def test_defaultPersonalPath(self):
     """
     If the I{--port} option not specified but the I{--personal} option is,
     L{Options} defaults the port to C{UserDirectory.userSocketName} in the
     user's home directory.
     """
     options = Options()
     options.parseOptions(["--personal"])
     path = os.path.expanduser(os.path.join("~", UserDirectory.userSocketName))
     self.assertEqual(strports.parse(options["port"], None)[:2], ("UNIX", (path, None)))
开发者ID:jsober,项目名称:twisted,代码行数:10,代码来源:test_tap.py


示例9: _strport_to_url

def _strport_to_url(desc, scheme='http', path='/'):
	'''Construct a URL from a twisted.application.strports string.'''
	# TODO: need to know canonical domain name, not localhost; can we extract from the ssl cert?
	# TODO: strports.parse is deprecated
	(method, args, kwargs) = strports.parse(desc, None)
	if method == 'TCP':
		return scheme + '://localhost:' + str(args[0]) + path
	elif method == 'SSL':
		return scheme + 's://localhost:' + str(args[0]) + path
	else:
		# TODO better error return
		return '???'
开发者ID:LucaBongiorni,项目名称:shinysdr,代码行数:12,代码来源:web.py


示例10: helper

        def helper(name, config, transport_names):
            # Helper closure passed to / called by L{_loadItemsFromConfig}
            # This will keep track of configured transports in transport_names
            # and set up a transport if applicable
            transport_names.add(name)

            # If this transport (or another with this name, which would be a
            # configuration error) is already registered, just ignore this
            # configuration item
            if name in self.registered_transports:
                return None

            log.msg('[CONTROLLER] Configuring transport %s' % name)
            # Create a transport (L{ITransport}) using the given configuration
            transport = self.loadTransport(name, config)

            # If the transport is an L{IServerTransport}...
            if IServerTransport.providedBy(transport):
                # Create a L{twisted.application.service.IService} from it
                # using settings provided in C{config}. This is *not* an
                # applicationserver  service but a Twisted service, ie a 
                # server handling incoming requests on a given socket.
                service = self.hookTransport(transport, config)
                # Set the IService name
                service.setName(name)
                # Set the IService parent to our MultiTransport
                service.setServiceParent(self.transports)

                # Parse the transport settings again so we can set up an
                # L{ITransportInfo} object for this transport
                port_info = strports.parse(config['transport'], transport)
                transport_info = transport.getTransportInfo(name, port_info)
                # Store the L{ITransportInfo}
                self.transport_info_map[name] = transport_info

                # Finally return the service
                return service
            else:
                # Set up the L{ITransportInfo} for the transport
                transport_info = transport.getTransportInfo(name)
                # Save it
                self.transport_info_map[name] = transport_info

                # And return it. We don't know how to handle this further,
                # since this is not an L{IServerTransport}, most likely the
                # 'mail' transport.
                return transport
开发者ID:racktivity,项目名称:ext-pylabs-core,代码行数:47,代码来源:controller.py


示例11: postOptions

    def postOptions(self):
        strport = self['strport']
        factoryIdentifier = self['factory-identifier']
        if strport is None or factoryIdentifier is None:
            self.opt_help()

        store = self.parent.parent.getStore()
        storeID = int(factoryIdentifier)
        try:
            factory = store.getItemByID(storeID)
        except KeyError:
            print "%d does not identify an item." % (storeID,)
            raise SystemExit(1)
        else:
            if not IProtocolFactoryFactory.providedBy(factory):
                print "%d does not identify a factory." % (storeID,)
                raise SystemExit(1)
            else:
                try:
                    kind, args, kwargs = parse(strport, factory)
                except ValueError:
                    print "%r is not a valid port description." % (strport,)
                    raise SystemExit(1)
                except KeyError:
                    print "Unrecognized port type."
                    raise SystemExit(1)
                except SSL.Error, e:
                    if self._pemFormatError in e.args[0]:
                        print 'Certificate file must use PEM format.'
                        raise SystemExit(1)
                    elif self._noSuchFileError in e.args[0]:
                        if self._certFileError in e.args[0]:
                            print "Specified certificate file does not exist."
                            raise SystemExit(1)
                        elif self._keyFileError in e.args[0]:
                            print "Specified private key file does not exist."
                            raise SystemExit(1)
                        else:
                            # Note, no test coverage.
                            raise
                    else:
                        # Note, no test coverage.
                        raise
                else:
                    try:
开发者ID:rcarmo,项目名称:divmod.org,代码行数:45,代码来源:port.py


示例12: testNonstandardDefault

 def testNonstandardDefault(self):
     self.assertEqual(
         strports.parse('filename', self.f, 'unix'),
         ('UNIX', ('filename', self.f),
          {'mode': 0666, 'backlog': 50, 'wantPID': True}))
开发者ID:Almad,项目名称:twisted,代码行数:5,代码来源:test_strports.py


示例13: testImpliedEscape

 def testImpliedEscape(self):
     self.assertEqual(
         strports.parse(r'unix:address=foo=bar', self.f),
         ('UNIX', ('foo=bar', self.f),
          {'mode': 0666, 'backlog': 50, 'wantPID': True}))
开发者ID:Almad,项目名称:twisted,代码行数:5,代码来源:test_strports.py


示例14: testEscape

 def testEscape(self):
     self.assertEqual(
         strports.parse(r'unix:foo\:bar\=baz\:qux\\', self.f),
         ('UNIX', ('foo:bar=baz:qux\\', self.f),
          {'mode': 0666, 'backlog': 50, 'wantPID': True}))
开发者ID:Almad,项目名称:twisted,代码行数:5,代码来源:test_strports.py


示例15: testAllKeywords

 def testAllKeywords(self):
     self.assertEqual(strports.parse('port=80', self.f),
                      ('TCP', (80, self.f), {'interface':'', 'backlog':50}))
开发者ID:Almad,项目名称:twisted,代码行数:3,代码来源:test_strports.py


示例16: testBacklogTCP

 def testBacklogTCP(self):
     self.assertEqual(strports.parse('tcp:80:backlog=6', self.f),
                      ('TCP', (80, self.f),
                              {'interface':'', 'backlog':6}))
开发者ID:Almad,项目名称:twisted,代码行数:4,代码来源:test_strports.py


示例17: testInterfaceTCP

 def testInterfaceTCP(self):
     self.assertEqual(strports.parse('tcp:80:interface=127.0.0.1', self.f),
                      ('TCP', (80, self.f),
                              {'interface':'127.0.0.1', 'backlog':50}))
开发者ID:Almad,项目名称:twisted,代码行数:4,代码来源:test_strports.py


示例18: testSimpleTCP

 def testSimpleTCP(self):
     self.assertEqual(strports.parse('tcp:80', self.f),
                      ('TCP', (80, self.f), {'interface':'', 'backlog':50}))
开发者ID:Almad,项目名称:twisted,代码行数:3,代码来源:test_strports.py


示例19: valid_strport

def valid_strport(value):
    try:
        strports.parse( value, default='tcp:5775' )
    except (ValueError, KeyError) as err:
        raise argparse.ArgumentTypeError(*err.args)
    return value 
开发者ID:mcfletch,项目名称:ssws,代码行数:6,代码来源:service.py



注:本文中的twisted.application.strports.parse函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python strports.service函数代码示例发布时间:2022-05-27
下一篇:
Python service.Service类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap