• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python comm.fmt_msg函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中qdb.comm.fmt_msg函数的典型用法代码示例。如果您正苦于以下问题:Python fmt_msg函数的具体用法?Python fmt_msg怎么用?Python fmt_msg使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了fmt_msg函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_eval_state_update

    def test_eval_state_update(self):
        """
        Tests that eval may update the state of the program.
        """
        # We will try to corrupt this variable with a stateful operation.
        test_var = 'pure'  # NOQA

        db = Qdb(
            uuid='eval_test',
            cmd_manager=self.cmd_manager,
            host=self.tracer_host,
            port=self.tracer_port,
            redirect_output=False,
        )
        sleep(0.01)
        self.server.session_store.send_to_tracer(
            uuid=db.uuid,
            event=fmt_msg('eval', "test_var = 'mutated'")
        )
        self.server.session_store.send_to_tracer(
            uuid=db.uuid,
            event=fmt_msg('continue')
        )
        db.set_trace(stop=True)
        self.server.session_store.slaughter(db.uuid)

        self.assertEqual(test_var, 'mutated')
开发者ID:brdfdr,项目名称:qdb,代码行数:27,代码来源:test_cmd_manager.py


示例2: test_stack_transpose_no_skip

    def test_stack_transpose_no_skip(self, direction):
        """
        Tests moving up the stack.
        """
        events = []

        def capture_event(self, event, payload):
            events.append(fmt_msg(event, payload))

        class cmd_manager(type(self.cmd_manager)):
            """
            Wrap send_stack by just capturing the output to make assertions on
            it.
            """
            def send_stack(self, tracer):
                with patch.object(cmd_manager, 'send_event', capture_event):
                    super(cmd_manager, self).send_stack(tracer)

        db = Qdb(
            uuid='test_' + direction,
            cmd_manager=cmd_manager(),
            host=self.tracer_host,
            port=self.tracer_port,
            redirect_output=False,
            green=True,
        )
        gyield()
        if direction == 'down':
            # We are already located in the bottom frame, let's go up one
            # so that we may try going down.
            self.server.session_store.send_to_tracer(
                uuid=db.uuid,
                event=fmt_msg('up')
            )

        self.server.session_store.send_to_tracer(
            uuid=db.uuid,
            event=fmt_msg(direction)
        )
        self.server.session_store.send_to_tracer(
            uuid=db.uuid,
            event=fmt_msg('disable', 'soft')
        )
        gyield()
        db.set_trace()

        start_ind = events[-2]['p']['index']
        shift_ind = events[-1]['p']['index']

        if direction == 'up':
            self.assertEqual(start_ind - shift_ind, 1)
        elif direction == 'down':
            self.assertEqual(shift_ind - start_ind, 1)
        else:
            self.fail("direction is not 'up' or 'down'")  # wut did u do?
开发者ID:quantopian,项目名称:qdb,代码行数:55,代码来源:test_cmd_manager.py


示例3: test_eval_timeout

    def test_eval_timeout(self):
        """
        Tests that evaluating user repl commands will raise Timeouts.
        """
        def g():
            while True:
                pass

        prints = []

        class cmd_manager(type(self.cmd_manager)):
            """
            Captures print commands to make assertions on them.
            """
            def send_print(self, input_, exc, output):
                prints.append({
                    'input': input_,
                    'exc': exc,
                    'output': output
                })

        to_eval = 'g()'

        db = Qdb(
            uuid='timeout_test',
            cmd_manager=cmd_manager(),
            host=self.tracer_host,
            port=self.tracer_port,
            redirect_output=False,
            execution_timeout=1,
            green=True,
        )
        gyield()
        self.server.session_store.send_to_tracer(
            uuid=db.uuid,
            event=fmt_msg('eval', to_eval)
        )
        self.server.session_store.send_to_tracer(
            uuid=db.uuid,
            event=fmt_msg('continue')
        )
        db.set_trace(stop=True)
        self.server.session_store.slaughter(db.uuid)

        self.assertTrue(prints)
        print_ = prints[0]

        self.assertEqual(print_['input'], to_eval)
        self.assertTrue(print_['exc'])
        self.assertEqual(
            print_['output'],
            db.exception_serializer(QdbExecutionTimeout(to_eval, 1))
        )
开发者ID:quantopian,项目名称:qdb,代码行数:53,代码来源:test_cmd_manager.py


示例4: send_tracer_event

def send_tracer_event(sck, event, payload):
    """
    Sends an event over the socket.
    """
    msg = fmt_msg(event, payload, serial=pickle.dumps)
    sck.sendall(pack('>i', len(msg)))
    sck.sendall(msg)
开发者ID:ThisGuyCodes,项目名称:qdb,代码行数:7,代码来源:test_server.py


示例5: test_tracer_attach_timeout

    def test_tracer_attach_timeout(self, mode):
        """
        Tests the case where a tracer attaches but no client does.
        """
        with QdbServer(tracer_host='localhost',
                       tracer_port=0,
                       client_server=QdbNopServer(),
                       attach_timeout=0.01,
                       timeout_disable_mode=mode) as server:

            tracer = socket.create_connection(
                ('localhost', server.tracer_server.server_port)
            )
            send_tracer_event(tracer, 'start', {
                'uuid': 'test',
                'auth': '',
                'local': (0, 0),
            })
            disable_event = None
            with Timeout(0.1, False):
                error_event = recv_tracer_event(tracer)
                disable_event = recv_tracer_event(tracer)

            error_dict = fmt_err_msg('client', 'No client')

            self.assertEqual(error_dict, error_event)
            self.assertEqual(fmt_msg('disable', mode), disable_event)
            self.assertNotIn('test', server.session_store)
开发者ID:ThisGuyCodes,项目名称:qdb,代码行数:28,代码来源:test_server.py


示例6: test_client_attach_timeout

    def test_client_attach_timeout(self, mode):
        """
        Tests the case when a client attaches but no tracer does.
        """
        with QdbServer(tracer_server=QdbNopServer(),
                       client_host='localhost',
                       client_port=0,
                       attach_timeout=0.01,
                       timeout_disable_mode=mode) as server:

            client = create_connection(
                'ws://localhost:%d%s' % (server.client_server.server_port,
                                         DEFAULT_ROUTE_FMT.format(uuid='test'))
            )
            send_client_event(client, 'start', '')
            disable_event = None
            with Timeout(0.1, False):
                error_event = recv_client_event(client)
                disable_event = recv_client_event(client)

            error_dict = fmt_err_msg('tracer', 'No tracer')

            self.assertEqual(error_dict, error_event)
            self.assertEqual(fmt_msg('disable'), disable_event)
            self.assertNotIn('test', server.session_store)
开发者ID:ThisGuyCodes,项目名称:qdb,代码行数:25,代码来源:test_server.py


示例7: test_client_auth_timeout

    def test_client_auth_timeout(self):
        with QdbServer(client_host='localhost',
                       client_port=0,
                       auth_timeout=1,  # Timeout after 1 second.
                       tracer_server=QdbNopServer()) as server:
            ws = create_connection(
                'ws://localhost:%d%s' % (server.client_server.server_port,
                                         DEFAULT_ROUTE_FMT.format(uuid='test'))
            )

            auth_failed_dict = fmt_err_msg('auth', 'No start event received')
            disable_dict = fmt_msg('disable')

            auth_failed_msg = ''
            disable_msg = ''

            with Timeout(2, False):
                # The server should time us out in 1 second and send back these
                # two messages.
                auth_failed_msg = ws.recv()
                disable_msg = ws.recv()

            self.assertEquals(auth_failed_msg, json.dumps(auth_failed_dict))
            self.assertEquals(disable_msg, json.dumps(disable_dict))
            self.assertFalse('test' in server.session_store)
开发者ID:ThisGuyCodes,项目名称:qdb,代码行数:25,代码来源:test_server.py


示例8: send_to_tracer

    def send_to_tracer(self, uuid, event):
        """
        Sends an event the tracer uuid.
        """
        if uuid not in self._sessions:
            log.warn('send_to_tracer failed: session %s does not exist'
                     % uuid)
            return  # Session doesn't exist.

        try:
            if event['e'] == 'pause' and self.is_local(uuid):
                self.pause_tracer(uuid)
                log.info('Raising pause signal (%d) in server local session %s'
                         % (self._sessions[uuid].pause_signal, uuid))
                self._update_timestamp(uuid)
                return  # We 'sent' this event.
            msg = fmt_msg(event['e'], event.get('p'), serial=pickle.dumps)
        except (pickle.PicklingError, KeyError) as e:
            log.warn('send_to_tracer(uuid=%s, event=%s) failed: %s'
                     % (uuid, event, e))
            raise  # The event is just wrong, reraise this to the user.

        sck = self._sessions[uuid].tracer
        if sck:
            self._send_to_socket(sck, msg)
        else:
            log.warn('No client session is alive for %s' % uuid)
        self._update_timestamp(uuid)
开发者ID:ThisGuyCodes,项目名称:qdb,代码行数:28,代码来源:session_store.py


示例9: test_pause

    def test_pause(self):
        """
        Asserts that sending a pause to the process will raise the pause signal
        in the tracer process.
        """
        pause_called = [False]

        def pause_handler(signal, stackframe):
            """
            Pause handler that marks that we made it into this function.
            """
            pause_called[0] = True

        db = Qdb(
            cmd_manager=self.cmd_manager,
            host=self.tracer_host,
            port=self.tracer_port,
        )
        signal.signal(db.pause_signal, pause_handler)
        self.server.session_store.send_to_tracer(
            uuid=db.uuid,
            event=fmt_msg('pause')
        )

        self.assertTrue(pause_called)
开发者ID:brdfdr,项目名称:qdb,代码行数:25,代码来源:test_cmd_manager.py


示例10: test_locals

    def test_locals(self):
        """
        Tests accessing the locals.
        """
        tracer = self.MockTracer()
        tracer.curframe_locals = {'a': 'a'}
        cmd_manager = self.cmd_manager
        cmd_manager.start(tracer, '')
        gyield()
        self.server.session_store.send_to_tracer(
            uuid=tracer.uuid,
            event=fmt_msg('locals')
        )

        command_locals_called = NonLocal(False)

        def test_command_locals(cmd_manager, tracer, payload):
            command_locals_called.value = True
            type(self.cmd_manager).command_locals(cmd_manager, tracer, payload)

        cmd_locals = partial(test_command_locals, cmd_manager)
        with gevent.Timeout(0.1, False), \
                patch.object(cmd_manager, 'command_locals', cmd_locals):
            cmd_manager.next_command(tracer)
        self.assertTrue(command_locals_called.value)

        tracer.start.assert_called()  # Start always gets called.
        self.server.session_store.slaughter(tracer.uuid)
开发者ID:quantopian,项目名称:qdb,代码行数:28,代码来源:test_cmd_manager.py


示例11: user_return

 def user_return(self, stackframe, return_value):
     stackframe.f_locals['__return__'] = return_value
     self.setup_stack(stackframe, None)
     self.cmd_manager.send_watchlist()
     self.cmd_manager.send_stack()
     msg = fmt_msg('return', str(return_value), serial=pickle.dumps)
     self.cmd_manager.next_command(msg)
开发者ID:fuyuanwu,项目名称:qdb,代码行数:7,代码来源:tracer.py


示例12: send_to_clients

    def send_to_clients(self, uuid, event):
        """
        Routes an event to all clients connected to a session.
        """
        if uuid not in self._sessions:
            log.warn('send_to_clients failed: session %s does not exist'
                     % uuid)
            return  # Session doesn't exist.

        try:
            msg = fmt_msg(event['e'], event.get('p'), serial=json.dumps)
        except (KeyError, ValueError) as e:
            log.warn('send_to_clients(uuid=%s, event=%s) failed: %s'
                     % (uuid, event, e))
            raise

        clients = self._sessions[uuid].clients

        with self._lock:
            for client in set(clients):
                try:
                    client.send(msg)
                except Exception:
                    log.info('Client was closed for debug session: %s' % uuid)
                    clients.remove(client)

        self._update_timestamp(uuid)
开发者ID:ThisGuyCodes,项目名称:qdb,代码行数:27,代码来源:session_store.py


示例13: test_locals

    def test_locals(self):
        """
        Tests accessing the locals.
        """
        command_locals_called = [False]

        def test_command_locals(cmd_manager, payload):
            command_locals_called[0] = True
            self.cmd_manager.command_locals(cmd_manager, payload)

        tracer = self.MockTracer()
        tracer.curframe_locals = {'a': 'a'}
        cmd_manager = self.cmd_manager(tracer)
        cmd_manager.command_locals = partial(test_command_locals, cmd_manager)
        tracer.cmd_manager = cmd_manager
        cmd_manager.start('')
        sleep(0.01)
        self.server.session_store.send_to_tracer(
            uuid=tracer.uuid,
            event=fmt_msg('locals')
        )

        with Timeout(0.1, False):
            cmd_manager.next_command()
        self.assertTrue(command_locals_called[0])

        tracer.start.assert_called()  # Start always gets called.
        self.server.session_store.slaughter(tracer.uuid)
开发者ID:brdfdr,项目名称:qdb,代码行数:28,代码来源:test_cmd_manager.py


示例14: user_return

 def user_return(self, stackframe, return_value):
     stackframe.f_locals['__return__'] = return_value
     self.setup_stack(stackframe, None)
     bound_cmd_manager = self.bound_cmd_manager
     bound_cmd_manager.send_watchlist()
     bound_cmd_manager.send_stack()
     bound_cmd_manager.next_command(
         fmt_msg('return', str(return_value), serial=json.dumps),
     )
开发者ID:quantopian,项目名称:qdb,代码行数:9,代码来源:tracer.py


示例15: test_eval_results

    def test_eval_results(self, input_, exc, output):
        """
        Tests that evaling code returns the proper results.
        """
        prints = []

        class cmd_manager(type(self.cmd_manager)):
            """
            Captures print commands to make assertions on them.
            """
            def send_print(self, input_, exc, output):
                prints.append({
                    'input': input_,
                    'exc': exc,
                    'output': output
                })

        db = Qdb(
            uuid='eval_test',
            cmd_manager=cmd_manager(),
            host=self.tracer_host,
            port=self.tracer_port,
            redirect_output=False,
            green=True,
        )
        gyield()
        self.server.session_store.send_to_tracer(
            uuid=db.uuid,
            event=fmt_msg('eval', input_)
        )
        self.server.session_store.send_to_tracer(
            uuid=db.uuid,
            event=fmt_msg('continue')
        )
        db.set_trace(stop=True)
        self.server.session_store.slaughter(db.uuid)

        self.assertTrue(prints)
        print_ = prints[0]

        self.assertEqual(print_['input'], input_)
        self.assertEqual(print_['exc'], exc)
        self.assertEqual(print_['output'], output)
开发者ID:quantopian,项目名称:qdb,代码行数:43,代码来源:test_cmd_manager.py


示例16: handle_client

    def handle_client(self, environ, start_response):
        path = environ['PATH_INFO']
        ws = environ['wsgi.websocket']
        addr = environ['REMOTE_ADDR']

        try:
            match = self.route.match(path)
            if not match:
                # This did not match our route.
                return
            log.info('Client request from %s' % addr)
            uuid = match.group(1)
            start_event = None
            with Timeout(self.auth_timeout, False):
                start_event = self.get_event(ws)

            failed = False
            message = ''

            # Fall through the various ways to fail to generate a more helpful
            # error message.
            if not start_event:
                message = 'No start event received'
                failed = True
            elif start_event['e'] != 'start':
                message = "First event must be of type: 'start'"
                failed = True
            elif not self.auth_fn(start_event.get('p', '')):
                log.warn('Client %s failed to authenticate' % addr)
                message = 'Authentication failed'
                failed = True

            if failed:
                try:
                    self.send_error(ws, 'auth', message)
                    ws.send(fmt_msg('disable', serial=json.dumps))
                except WebSocketError:
                    # We are unable to send the disable message for some
                    # reason; however, they already failed auth so suppress
                    # it and close.
                    pass
                return

            if not self.session_store.attach_client(uuid, ws):
                # We are attaching to a client that does not exist.
                return

            self.session_store.send_to_tracer(uuid, event=start_event)
            for event in self.get_events(ws):
                self.session_store.send_to_tracer(uuid, event=event)

        finally:
            log.info('Closing websocket to client %s' % addr)
            ws.close()
开发者ID:OspreyX,项目名称:qdb,代码行数:54,代码来源:client.py


示例17: test_why_are_you_executing_all_these_commands

 def test_why_are_you_executing_all_these_commands(self):
     db = Qdb(
         uuid='send_stack_test',
         cmd_manager=self.cmd_manager,
         host=self.tracer_host,
         port=self.tracer_port,
         redirect_output=False,
         green=True,
     )
     gyield()
     for n in range(sys.getrecursionlimit()):
         self.server.session_store.send_to_tracer(
             uuid=db.uuid,
             event=fmt_msg('eval', 'None')
         )
     self.server.session_store.send_to_tracer(
         uuid=db.uuid,
         event=fmt_msg('continue')
     )
     with gevent.Timeout(1):
         db.set_trace(stop=True)
开发者ID:quantopian,项目名称:qdb,代码行数:21,代码来源:test_cmd_manager.py


示例18: test_send_stack_results

    def test_send_stack_results(self, use_skip_fn):
        """
        Tests that the results from sending the stack are accurate.
        WARNING: This test uses lines of it's own source as string literals,
        be sure to edit the source and the string if you make any changes.
        """
        def skip_fn(filename):
            return not fix_filename(__file__) in filename

        events = []

        def capture_event(self, event, payload):
            events.append(fmt_msg(event, payload))

        class cmd_manager(type(self.cmd_manager)):
            """
            Wrap send_stack by just capturing the output to make assertions on
            it.
            """
            def send_stack(self, tracer):
                with patch.object(cmd_manager, 'send_event', capture_event):
                    super(cmd_manager, self).send_stack(tracer)

        db = Qdb(
            uuid='send_stack_test',
            cmd_manager=cmd_manager(),
            host=self.tracer_host,
            port=self.tracer_port,
            redirect_output=False,
            skip_fn=skip_fn if use_skip_fn else None,
            green=True,
        )
        gyield()
        self.server.session_store.send_to_tracer(
            uuid=db.uuid,
            event=fmt_msg('continue')
        )
        db.set_trace(stop=True)
        self.assertTrue(events)  # EDIT IN BOTH PLACES

        event = events[0]
        if use_skip_fn:
            # Assert that we actually suppressed some frames.
            self.assertTrue(len(event['p']['stack']) < len(db.stack))

        self.assertEqual(
            # I love dictionaries so much!
            event['p']['stack'][event['p']['index']]['code'],
            '        self.assertTrue(events)  # EDIT IN BOTH PLACES',
        )

        self.server.session_store.slaughter(db.uuid)
开发者ID:quantopian,项目名称:qdb,代码行数:52,代码来源:test_cmd_manager.py


示例19: user_exception

 def user_exception(self, stackframe, exc_info):
     exc_type, exc_value, exc_traceback = exc_info
     stackframe.f_locals['__exception__'] = exc_type, exc_value
     self.setup_stack(stackframe, exc_traceback)
     self.cmd_manager.send_watchlist()
     self.cmd_manager.send_stack()
     msg = fmt_msg(
         'exception', {
             'type': exc_type.__name__,
             'value': str(exc_value),
             'traceback': traceback.format_tb(exc_traceback)
         },
         serial=pickle.dumps,
     )
     self.cmd_manager.next_command(msg)
开发者ID:fuyuanwu,项目名称:qdb,代码行数:15,代码来源:tracer.py


示例20: test_commands

 def test_commands(self, attrgetter_, event, payload=None):
     """
     Tests various commands with or without payloads.
     """
     tracer = self.MockTracer()
     cmd_manager = self.cmd_manager
     cmd_manager.start(tracer, '')
     self.server.session_store.send_to_tracer(
         uuid=tracer.uuid,
         event=fmt_msg(event, payload)
     )
     with gevent.Timeout(0.1, False):
         cmd_manager.next_command(tracer)
     tracer.start.assert_called()  # Start always gets called.
     attrgetter_(tracer).assert_called()
     # Kill the session we just created
     self.server.session_store.slaughter(tracer.uuid)
开发者ID:quantopian,项目名称:qdb,代码行数:17,代码来源:test_cmd_manager.py



注:本文中的qdb.comm.fmt_msg函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python qds.main函数代码示例发布时间:2022-05-26
下一篇:
Python qdb.Qdb类代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap