• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python defer.succeed函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中twisted.internet.defer.succeed函数的典型用法代码示例。如果您正苦于以下问题:Python succeed函数的具体用法?Python succeed怎么用?Python succeed使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了succeed函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: fetch_user

 def fetch_user((consumer_key, timestamp, nonce, signature)):
     
     if consumer_key == "root":
         return defer.succeed((self.root,
                               timestamp,
                               nonce,
                               signature))
         
     if consumer_key == "guest":
         return defer.succeed((self.guest,
                               timestamp,
                               nonce,
                               signature))
     
     def complete(results):
         
         if not len(results):
             return defer.fail(Exception("Unauthorized."))
         
         return defer.succeed((results[0],
                               timestamp,
                               nonce,
                               signature))
     
     d = userRepository.where(login = consumer_key)
     d.addCallback(complete)
     return d
开发者ID:AnthonyNystrom,项目名称:YoGoMee,代码行数:27,代码来源:auth.py


示例2: getChange

    def getChange(self, changeid):
        try:
            row = self.changes[changeid]
        except KeyError:
            return defer.succeed(None)

        return defer.succeed(self._chdict(row))
开发者ID:Acidburn0zzz,项目名称:buildbot,代码行数:7,代码来源:fakedb.py


示例3: side_effect

 def side_effect(*args, **kwargs):
     if 'getSlaveInfo' in args:
         return defer.succeed({'info': 'test'})
     if 'getCommands' in args:
         return defer.succeed({'x': 1, 'y': 2})
     if 'getVersion' in args:
         return defer.succeed('TheVersion')
开发者ID:BenjamenMeyer,项目名称:buildbot,代码行数:7,代码来源:test_buildslave_protocols_pb.py


示例4: after_reply

        def after_reply(reply, proto, fetched=0):
            documents = reply.documents
            docs_count = len(documents)
            if limit > 0:
                docs_count = min(docs_count, limit - fetched)
            fetched += docs_count

            options = bson.codec_options.CodecOptions(document_class=as_class)
            out = [document.decode(codec_options=options) for document in documents[:docs_count]]

            if reply.cursor_id:
                if limit == 0:
                    to_fetch = 0  # no limit
                elif limit < 0:
                    # We won't actually get here because MongoDB won't
                    # create cursor when limit < 0
                    to_fetch = None
                else:
                    to_fetch = limit - fetched
                    if to_fetch <= 0:
                        to_fetch = None  # close cursor

                if to_fetch is None:
                    proto.send_KILL_CURSORS(KillCursors(cursors=[reply.cursor_id]))
                    return out, defer.succeed(([], None))

                next_reply = proto.send_GETMORE(
                    Getmore(collection=str(self), cursor_id=reply.cursor_id, n_to_return=to_fetch)
                )
                next_reply.addCallback(after_reply, proto, fetched)
                return out, next_reply

            return out, defer.succeed(([], None))
开发者ID:hawkowl,项目名称:txmongo,代码行数:33,代码来源:collection.py


示例5: test_error_sending

    def test_error_sending(self, logger):
        """
        An error sending to one agent does not prevent others from being
        notified.
        """
        control_amp_service = build_control_amp_service(self)
        self.patch(control_amp_service, 'logger', logger)

        connected_protocol = ControlAMP(control_amp_service)
        # Patching is bad.
        # https://clusterhq.atlassian.net/browse/FLOC-1603
        connected_protocol.callRemote = lambda *args, **kwargs: succeed({})

        error = ConnectionLost()
        disconnected_protocol = ControlAMP(control_amp_service)
        results = [succeed({}), fail(error)]
        # Patching is bad.
        # https://clusterhq.atlassian.net/browse/FLOC-1603
        disconnected_protocol.callRemote = (
            lambda *args, **kwargs: results.pop(0))

        control_amp_service.connected(disconnected_protocol)
        control_amp_service.connected(connected_protocol)
        control_amp_service.node_changed(NodeState(hostname=u"1.2.3.4"))

        actions = LoggedAction.ofType(logger.messages, LOG_SEND_TO_AGENT)
        self.assertEqual(
            [action.end_message["exception"] for action in actions
             if not action.succeeded],
            [u"twisted.internet.error.ConnectionLost"])
开发者ID:aminembarki,项目名称:flocker,代码行数:30,代码来源:test_protocol.py


示例6: test_bookmarklet

    def test_bookmarklet(self):
        """
        Does api/bookmarklet fetch, save, and return a response for the recipe? 
        """
        fromTest = fromdir(__file__)
        loc = fromTest('recipe_page_source.html')
        pageSource = open(loc).read()

        pGet = patch.object(treq, 'get', return_value=defer.succeed(None), autospec=True)
        pTreqContent = patch.object(treq, 'content', return_value=defer.succeed(pageSource), autospec=True)
        
        with pGet, pTreqContent:  
            # normal bookmarketing 
            u = self._users()[0]
            req = self.requestJSON([], session_user=u) 
            req.args['uri'] = ['http://www.foodandwine.com/recipes/poutine-style-twice-baked-potatoes']
            ret = yield self.handler('bookmarklet', req)
            self.assertEqual(len(recipe.Recipe.objects()), 1)
            expectedResults = '{"status": "ok", "recipes": [{"name": "Delicious Meatless Meatballs", "urlKey": "weirdo-gmail-com-delicious-meatless-meatballs-"}], "message": ""}'
            assert ret == expectedResults  

            # # not signed in to noms; bookmarketing should not be allowed 
            req = self.requestJSON([])
            req.args['uri'] = ['http://www.foodandwine.com/recipes/poutine-style-twice-baked-potatoes']
            ret = yield self.handler('bookmarklet', req)
            expectedResults = '{"status": "error", "recipes": [], "message": "User was not logged in."}'
            assert ret == expectedResults
开发者ID:corydodt,项目名称:Noms,代码行数:27,代码来源:test_server.py


示例7: _initEvent

    def _initEvent(self):
        if not self._client.started:
            return succeed(None)

        # If it already exists, don't re-create
        calendar = self._calendarsOfType(caldavxml.calendar, "VEVENT")[0]
        if calendar.events:
            events = [event for event in calendar.events.values() if event.url.endswith("event_to_update.ics")]
            if events:
                return succeed(None)

        # Copy the template event and fill in some of its fields
        # to make a new event to create on the calendar.
        vcalendar = self._eventTemplate.duplicate()
        vevent = vcalendar.mainComponent()
        uid = str(uuid4())
        dtstart = self._eventStartDistribution.sample()
        dtend = dtstart + Duration(seconds=self._eventDurationDistribution.sample())
        vevent.replaceProperty(Property("CREATED", DateTime.getNowUTC()))
        vevent.replaceProperty(Property("DTSTAMP", DateTime.getNowUTC()))
        vevent.replaceProperty(Property("DTSTART", dtstart))
        vevent.replaceProperty(Property("DTEND", dtend))
        vevent.replaceProperty(Property("UID", uid))

        rrule = self._recurrenceDistribution.sample()
        if rrule is not None:
            vevent.addProperty(Property(None, None, None, pycalendar=rrule))

        href = '%s%s' % (calendar.url, "event_to_update.ics")
        d = self._client.addEvent(href, vcalendar)
        return self._newOperation("create", d)
开发者ID:eventable,项目名称:CalendarServer,代码行数:31,代码来源:profiles.py


示例8: getMapper

def getMapper():
    # We prefer UPnP when available, as it's more robust
    global _installedShutdownHook
    if not _installedShutdownHook:
        from twisted.internet import reactor
        t = reactor.addSystemEventTrigger('after',
                                          'shutdown',
                                          clearCache)
        _installedShutdownHook = True
    try:
        from __main__ import app
    except:
        app = None
    natPref = 'both'
    if app is not None:
        print "app is", app
        natPref = app.getPref('nat')
        log.msg('NAT preference says to use %s'%(natPref))
    if _forcedMapper is not None:
        return defer.succeed(_forcedMapper)
    from xshtoom.stun import getSTUN
    if natPref == 'stun':
        ud = getSTUN()
        d = defer.DeferredList([defer.succeed(None), ud])
    else:
        nm = NullMapper()
        d = defer.DeferredList([defer.succeed(None),
                                defer.succeed(None)])
    d.addCallback(cb_getMapper).addErrback(log.err)
    return d
开发者ID:ViktorNova,项目名称:rtpmidi,代码行数:30,代码来源:nat.py


示例9: resolve

    def resolve(self, guid):
        """
        Given a guid return a `Node` object containing its ip and port or none if it's
        not found.

        Args:
            guid: the 20 raw bytes representing the guid.
        """
        node_to_find = Node(guid)

        def check_for_node(nodes):
            for node in nodes:
                if node.id == node_to_find.id:
                    return node
            return None
        index = self.protocol.router.getBucketFor(node_to_find)
        nodes = self.protocol.router.buckets[index].getNodes()
        for node in nodes:
            if node.id == node_to_find.id:
                return defer.succeed(node)
        nearest = self.protocol.router.findNeighbors(node_to_find)
        if len(nearest) == 0:
            self.log.warning("there are no known neighbors to find node %s" % node_to_find.id.encode("hex"))
            return defer.succeed(None)
        spider = NodeSpiderCrawl(self.protocol, node_to_find, nearest, self.ksize, self.alpha)
        return spider.find().addCallback(check_for_node)
开发者ID:gasull,项目名称:OpenBazaar-Server,代码行数:26,代码来源:network.py


示例10: test_convergence_sent_state_fail_resends

    def test_convergence_sent_state_fail_resends(self):
        """
        If sending state to the control node fails the next iteration will send
        state even if the state hasn't changed.
        """
        local_state = NodeState(hostname=u'192.0.2.123')
        configuration = Deployment(nodes=[to_node(local_state)])
        state = DeploymentState(nodes=[local_state])
        deployer = ControllableDeployer(
            local_state.hostname,
            [succeed(local_state), succeed(local_state.copy())],
            [no_action(), no_action()])
        client = self.make_amp_client(
            [local_state, local_state.copy()], succeed=False
        )
        reactor = Clock()
        loop = build_convergence_loop_fsm(reactor, deployer)
        loop.receive(_ClientStatusUpdate(
            client=client, configuration=configuration, state=state))
        reactor.advance(1.0)

        # Calculating actions happened, result was run... and then we did
        # whole thing again:
        self.assertTupleEqual(
            (deployer.calculate_inputs, client.calls),
            (
                # Check that the loop has run twice
                [(local_state, configuration, state),
                 (local_state, configuration, state)],
                # And that state was re-sent even though it remained unchanged
                [(NodeStateCommand, dict(state_changes=(local_state,))),
                 (NodeStateCommand, dict(state_changes=(local_state,)))],
            )
        )
开发者ID:punalpatel,项目名称:flocker,代码行数:34,代码来源:test_loop.py


示例11: test_convergence_done_delays_new_iteration

    def test_convergence_done_delays_new_iteration(self, logger):
        """
        An FSM completing the changes from one convergence iteration doesn't
        instantly start another iteration.
        """
        self.local_state = local_state = NodeState(hostname=u'192.0.2.123')
        self.configuration = configuration = Deployment()
        self.cluster_state = received_state = DeploymentState(nodes=[])
        self.action = action = ControllableAction(result=succeed(None))
        deployer = ControllableDeployer(
            local_state.hostname, [succeed(local_state)], [action]
        )
        client = self.make_amp_client([local_state])
        reactor = Clock()
        loop = build_convergence_loop_fsm(reactor, deployer)
        self.patch(loop, "logger", logger)
        loop.receive(_ClientStatusUpdate(
            client=client, configuration=configuration, state=received_state))

        expected_cluster_state = DeploymentState(
            nodes=[local_state])

        # Calculating actions happened and the result was run.
        self.assertTupleEqual(
            (deployer.calculate_inputs, client.calls),
            ([(local_state, configuration, expected_cluster_state)],
             [(NodeStateCommand, dict(state_changes=(local_state,)))])
        )
开发者ID:punalpatel,项目名称:flocker,代码行数:28,代码来源:test_loop.py


示例12: test_convergence_done_unchanged_notify

    def test_convergence_done_unchanged_notify(self):
        """
        An FSM doing convergence that discovers state unchanged from the last
        state acknowledged by the control service does not re-send that state.
        """
        local_state = NodeState(hostname=u'192.0.2.123')
        configuration = Deployment(nodes=[to_node(local_state)])
        state = DeploymentState(nodes=[local_state])
        deployer = ControllableDeployer(
            local_state.hostname,
            [succeed(local_state), succeed(local_state.copy())],
            [no_action(), no_action()]
        )
        client = self.make_amp_client([local_state])
        reactor = Clock()
        loop = build_convergence_loop_fsm(reactor, deployer)
        loop.receive(_ClientStatusUpdate(
            client=client, configuration=configuration, state=state))
        reactor.advance(1.0)

        # Calculating actions happened, result was run... and then we did
        # whole thing again:
        self.assertEqual(
            (deployer.calculate_inputs, client.calls),
            (
                # Check that the loop has run twice
                [(local_state, configuration, state),
                 (local_state, configuration, state)],
                # But that state was only sent once.
                [(NodeStateCommand, dict(state_changes=(local_state,)))],
            )
        )
开发者ID:punalpatel,项目名称:flocker,代码行数:32,代码来源:test_loop.py


示例13: test_parse_relative_path

    def test_parse_relative_path(self):
        # this makes sure we convert a relative path to absolute
        # hiddenServiceDir args. see Issue #77

        # make sure we have a valid thing from get_global_tor without
        # actually launching tor
        config = TorConfig()
        config.post_bootstrap = defer.succeed(config)
        from txtorcon import torconfig
        torconfig._global_tor_config = None
        get_global_tor(
            self.reactor,
            _tor_launcher=lambda react, config, prog: defer.succeed(config)
        )

        orig = os.path.realpath('.')
        try:
            with util.TempDir() as t:
                t = str(t)
                os.chdir(t)
                os.mkdir(os.path.join(t, 'foo'))
                hsdir = os.path.join(t, 'foo', 'blam')
                os.mkdir(hsdir)

                ep = serverFromString(
                    self.reactor,
                    'onion:88:localPort=1234:hiddenServiceDir=foo/blam'
                )
                self.assertEqual(
                    os.path.realpath(hsdir),
                    ep.hidden_service_dir
                )

        finally:
            os.chdir(orig)
开发者ID:coffeemakr,项目名称:txtorcon,代码行数:35,代码来源:test_endpoints.py


示例14: test_parse_user_path

    def test_parse_user_path(self):
        # this makes sure we expand users and symlinks in
        # hiddenServiceDir args. see Issue #77

        # make sure we have a valid thing from get_global_tor without
        # actually launching tor
        config = TorConfig()
        config.post_bootstrap = defer.succeed(config)
        from txtorcon import torconfig
        torconfig._global_tor_config = None
        get_global_tor(
            self.reactor,
            _tor_launcher=lambda react, config, prog: defer.succeed(config)
        )
        ep = serverFromString(
            self.reactor,
            'onion:88:localPort=1234:hiddenServiceDir=~/blam/blarg'
        )
        # would be nice to have a fixed path here, but then would have
        # to run as a known user :/
        # maybe using the docker stuff to run integration tests better here?
        self.assertEqual(
            os.path.expanduser('~/blam/blarg'),
            ep.hidden_service_dir
        )
开发者ID:coffeemakr,项目名称:txtorcon,代码行数:25,代码来源:test_endpoints.py


示例15: on_qq_login

 def on_qq_login(self,message):
     '''
     这个也是登陆过程中自动处理的一部分,当登陆成功后,self.qq.login这个属性为1,否则则没有登陆成功。
     调用了lib后可以通过这个属性来判断是否登陆成功。
     '''
     if message.body.fields['status'][0]==1:
         #self.transport.connect(util.ip2string(message.body.fields['ip']),8000)
         self.qq.server=(util.ip2string(message.body.fields['ip']),8000)
         defer.succeed(self.pre_login())
     else:
         if message.body.fields['status'][0]==5:
             print message.body.fields['data'][0]
         elif message.body.fields['status'][0]==6:
             self.printl('您的号码[' + str(self.qq.id) + ']可能存在异常情况,已受到限制登录保护,需激活后才能正常登录。\
                         激活地址是:\
                         电信或网通用户 :im.qq.com/jh或activate.qq.com\
                         教育网用户: activateedu.qq.com') 
         else:
             self.printl('登陆成功')
             self.qq.login = 1
             self.qq.session=message.body.fields['session']
             message = qqmsg.outqqMessage(self.qq)
             message.setMsgName('qq_chang_status')
             message.body.setField('online',basic.QQ_status['online'])
             message.body.setField('video',basic.QQ_video)
             self.sendDataToQueue(message)
开发者ID:Javacym,项目名称:python-qq,代码行数:26,代码来源:qqlib.py


示例16: _try_to_connect

 def _try_to_connect(reactor, port, stdout, txtorcon):
     tried.append( (reactor, port, stdout, txtorcon) )
     if not reachable:
         return defer.succeed(None)
     if port == expected_port: # second one on the list
         return defer.succeed(tor_state)
     return defer.succeed(None)
开发者ID:tahoe-lafs,项目名称:tahoe-lafs,代码行数:7,代码来源:test_tor_provider.py


示例17: on_qq_chang_status

 def on_qq_chang_status(self, message):
     '''
     到这里登陆过程的自动处理完成,开始循环发送活动包,表示自己在线。
     '''
     self.qq.log.info("您当前的状态为:在线")
     #开始每隔1分钟发送一次在线包
     defer.succeed(self.alive())
开发者ID:Javacym,项目名称:python-qq,代码行数:7,代码来源:qqlib.py


示例18: cb

 def cb(res):
     stdout, stderr, exit = res
     if errortoo:
         return defer.succeed(stdout + stderr)
     if stderr:
         return defer.fail(IOError("got stderr: %r" % (stderr,)))
     return defer.succeed(stdout)
开发者ID:Cray,项目名称:buildbot,代码行数:7,代码来源:gpo.py


示例19: _test_nested_change

def _test_nested_change(case, outer_factory, inner_factory):
    """
    Assert that ``IChangeState`` providers wrapped inside ``inner_factory``
    wrapped inside ``outer_factory`` are run with the same deployer argument as
    is passed to ``run_state_change``.

    :param TestCase case: A running test.
    :param outer_factory: Either ``sequentially`` or ``in_parallel`` to
        construct the top-level change to pass to ``run_state_change``.
    :param inner_factory: Either ``sequentially`` or ``in_parallel`` to
        construct a change to include the top-level change passed to
        ``run_state_change``.

    :raise: A test failure if the inner change is not run with the same
        deployer as is passed to ``run_state_change``.
    """
    inner_action = ControllableAction(result=succeed(None))
    subchanges = [
        ControllableAction(result=succeed(None)),
        inner_factory(changes=[inner_action]),
        ControllableAction(result=succeed(None))
    ]
    change = outer_factory(changes=subchanges)
    run_state_change(change, DEPLOYER)
    case.assertEqual(
        (True, DEPLOYER),
        (inner_action.called, inner_action.deployer)
    )
开发者ID:Kaffa-MY,项目名称:flocker,代码行数:28,代码来源:test_change.py


示例20: verifyHostKey

    def verifyHostKey(self, pubKey, fingerprint):
        goodKey = self.isInKnownHosts(options['host'], pubKey)
        if goodKey == 1: # good key
            return defer.succeed(1)
        elif goodKey == 2: # AAHHHHH changed
            return defer.fail(ConchError('changed host key'))
        else:
            oldout, oldin = sys.stdout, sys.stdin
            sys.stdin = sys.stdout = open('/dev/tty','r+')
            if options['host'] == self.transport.getPeer()[1]:
                host = options['host']
                khHost = options['host']
            else:
                host = '%s (%s)' % (options['host'],
                                    self.transport.getPeer()[1])
                khHost = '%s,%s' % (options['host'],
                                    self.transport.getPeer()[1])
            keyType = common.getNS(pubKey)[0]
            print """The authenticity of host '%s' can't be extablished.
%s key fingerprint is %s.""" % (host,
                                {'ssh-dss':'DSA', 'ssh-rsa':'RSA'}[keyType],
                                fingerprint)
            ans = raw_input('Are you sure you want to continue connecting (yes/no)? ')
            while ans.lower() not in ('yes', 'no'):
                ans = raw_input("Please type 'yes' or 'no': ")
            sys.stdout,sys.stdin=oldout,oldin
            if ans == 'no':
                print 'Host key verification failed.'
                return defer.fail(ConchError('bad host key'))
            print "Warning: Permanently added '%s' (%s) to the list of known hosts." % (khHost, {'ssh-dss':'DSA', 'ssh-rsa':'RSA'}[keyType])
            known_hosts = open(os.path.expanduser('~/.ssh/known_hosts'), 'a')
            encodedKey = base64.encodestring(pubKey).replace('\n', '')
            known_hosts.write('\n%s %s %s' % (khHost, keyType, encodedKey))
            known_hosts.close()
            return defer.succeed(1)
开发者ID:fxia22,项目名称:ASM_xf,代码行数:35,代码来源:conch.py



注:本文中的twisted.internet.defer.succeed函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python defer.wFD函数代码示例发布时间:2022-05-27
下一篇:
Python defer.setDebugging函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap