本文整理汇总了Python中qdb.comm.fmt_msg函数的典型用法代码示例。如果您正苦于以下问题:Python fmt_msg函数的具体用法?Python fmt_msg怎么用?Python fmt_msg使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了fmt_msg函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_eval_state_update
def test_eval_state_update(self):
"""
Tests that eval may update the state of the program.
"""
# We will try to corrupt this variable with a stateful operation.
test_var = 'pure' # NOQA
db = Qdb(
uuid='eval_test',
cmd_manager=self.cmd_manager,
host=self.tracer_host,
port=self.tracer_port,
redirect_output=False,
)
sleep(0.01)
self.server.session_store.send_to_tracer(
uuid=db.uuid,
event=fmt_msg('eval', "test_var = 'mutated'")
)
self.server.session_store.send_to_tracer(
uuid=db.uuid,
event=fmt_msg('continue')
)
db.set_trace(stop=True)
self.server.session_store.slaughter(db.uuid)
self.assertEqual(test_var, 'mutated')
开发者ID:brdfdr,项目名称:qdb,代码行数:27,代码来源:test_cmd_manager.py
示例2: test_stack_transpose_no_skip
def test_stack_transpose_no_skip(self, direction):
"""
Tests moving up the stack.
"""
events = []
def capture_event(self, event, payload):
events.append(fmt_msg(event, payload))
class cmd_manager(type(self.cmd_manager)):
"""
Wrap send_stack by just capturing the output to make assertions on
it.
"""
def send_stack(self, tracer):
with patch.object(cmd_manager, 'send_event', capture_event):
super(cmd_manager, self).send_stack(tracer)
db = Qdb(
uuid='test_' + direction,
cmd_manager=cmd_manager(),
host=self.tracer_host,
port=self.tracer_port,
redirect_output=False,
green=True,
)
gyield()
if direction == 'down':
# We are already located in the bottom frame, let's go up one
# so that we may try going down.
self.server.session_store.send_to_tracer(
uuid=db.uuid,
event=fmt_msg('up')
)
self.server.session_store.send_to_tracer(
uuid=db.uuid,
event=fmt_msg(direction)
)
self.server.session_store.send_to_tracer(
uuid=db.uuid,
event=fmt_msg('disable', 'soft')
)
gyield()
db.set_trace()
start_ind = events[-2]['p']['index']
shift_ind = events[-1]['p']['index']
if direction == 'up':
self.assertEqual(start_ind - shift_ind, 1)
elif direction == 'down':
self.assertEqual(shift_ind - start_ind, 1)
else:
self.fail("direction is not 'up' or 'down'") # wut did u do?
开发者ID:quantopian,项目名称:qdb,代码行数:55,代码来源:test_cmd_manager.py
示例3: test_eval_timeout
def test_eval_timeout(self):
"""
Tests that evaluating user repl commands will raise Timeouts.
"""
def g():
while True:
pass
prints = []
class cmd_manager(type(self.cmd_manager)):
"""
Captures print commands to make assertions on them.
"""
def send_print(self, input_, exc, output):
prints.append({
'input': input_,
'exc': exc,
'output': output
})
to_eval = 'g()'
db = Qdb(
uuid='timeout_test',
cmd_manager=cmd_manager(),
host=self.tracer_host,
port=self.tracer_port,
redirect_output=False,
execution_timeout=1,
green=True,
)
gyield()
self.server.session_store.send_to_tracer(
uuid=db.uuid,
event=fmt_msg('eval', to_eval)
)
self.server.session_store.send_to_tracer(
uuid=db.uuid,
event=fmt_msg('continue')
)
db.set_trace(stop=True)
self.server.session_store.slaughter(db.uuid)
self.assertTrue(prints)
print_ = prints[0]
self.assertEqual(print_['input'], to_eval)
self.assertTrue(print_['exc'])
self.assertEqual(
print_['output'],
db.exception_serializer(QdbExecutionTimeout(to_eval, 1))
)
开发者ID:quantopian,项目名称:qdb,代码行数:53,代码来源:test_cmd_manager.py
示例4: send_tracer_event
def send_tracer_event(sck, event, payload):
"""
Sends an event over the socket.
"""
msg = fmt_msg(event, payload, serial=pickle.dumps)
sck.sendall(pack('>i', len(msg)))
sck.sendall(msg)
开发者ID:ThisGuyCodes,项目名称:qdb,代码行数:7,代码来源:test_server.py
示例5: test_tracer_attach_timeout
def test_tracer_attach_timeout(self, mode):
"""
Tests the case where a tracer attaches but no client does.
"""
with QdbServer(tracer_host='localhost',
tracer_port=0,
client_server=QdbNopServer(),
attach_timeout=0.01,
timeout_disable_mode=mode) as server:
tracer = socket.create_connection(
('localhost', server.tracer_server.server_port)
)
send_tracer_event(tracer, 'start', {
'uuid': 'test',
'auth': '',
'local': (0, 0),
})
disable_event = None
with Timeout(0.1, False):
error_event = recv_tracer_event(tracer)
disable_event = recv_tracer_event(tracer)
error_dict = fmt_err_msg('client', 'No client')
self.assertEqual(error_dict, error_event)
self.assertEqual(fmt_msg('disable', mode), disable_event)
self.assertNotIn('test', server.session_store)
开发者ID:ThisGuyCodes,项目名称:qdb,代码行数:28,代码来源:test_server.py
示例6: test_client_attach_timeout
def test_client_attach_timeout(self, mode):
"""
Tests the case when a client attaches but no tracer does.
"""
with QdbServer(tracer_server=QdbNopServer(),
client_host='localhost',
client_port=0,
attach_timeout=0.01,
timeout_disable_mode=mode) as server:
client = create_connection(
'ws://localhost:%d%s' % (server.client_server.server_port,
DEFAULT_ROUTE_FMT.format(uuid='test'))
)
send_client_event(client, 'start', '')
disable_event = None
with Timeout(0.1, False):
error_event = recv_client_event(client)
disable_event = recv_client_event(client)
error_dict = fmt_err_msg('tracer', 'No tracer')
self.assertEqual(error_dict, error_event)
self.assertEqual(fmt_msg('disable'), disable_event)
self.assertNotIn('test', server.session_store)
开发者ID:ThisGuyCodes,项目名称:qdb,代码行数:25,代码来源:test_server.py
示例7: test_client_auth_timeout
def test_client_auth_timeout(self):
with QdbServer(client_host='localhost',
client_port=0,
auth_timeout=1, # Timeout after 1 second.
tracer_server=QdbNopServer()) as server:
ws = create_connection(
'ws://localhost:%d%s' % (server.client_server.server_port,
DEFAULT_ROUTE_FMT.format(uuid='test'))
)
auth_failed_dict = fmt_err_msg('auth', 'No start event received')
disable_dict = fmt_msg('disable')
auth_failed_msg = ''
disable_msg = ''
with Timeout(2, False):
# The server should time us out in 1 second and send back these
# two messages.
auth_failed_msg = ws.recv()
disable_msg = ws.recv()
self.assertEquals(auth_failed_msg, json.dumps(auth_failed_dict))
self.assertEquals(disable_msg, json.dumps(disable_dict))
self.assertFalse('test' in server.session_store)
开发者ID:ThisGuyCodes,项目名称:qdb,代码行数:25,代码来源:test_server.py
示例8: send_to_tracer
def send_to_tracer(self, uuid, event):
"""
Sends an event the tracer uuid.
"""
if uuid not in self._sessions:
log.warn('send_to_tracer failed: session %s does not exist'
% uuid)
return # Session doesn't exist.
try:
if event['e'] == 'pause' and self.is_local(uuid):
self.pause_tracer(uuid)
log.info('Raising pause signal (%d) in server local session %s'
% (self._sessions[uuid].pause_signal, uuid))
self._update_timestamp(uuid)
return # We 'sent' this event.
msg = fmt_msg(event['e'], event.get('p'), serial=pickle.dumps)
except (pickle.PicklingError, KeyError) as e:
log.warn('send_to_tracer(uuid=%s, event=%s) failed: %s'
% (uuid, event, e))
raise # The event is just wrong, reraise this to the user.
sck = self._sessions[uuid].tracer
if sck:
self._send_to_socket(sck, msg)
else:
log.warn('No client session is alive for %s' % uuid)
self._update_timestamp(uuid)
开发者ID:ThisGuyCodes,项目名称:qdb,代码行数:28,代码来源:session_store.py
示例9: test_pause
def test_pause(self):
"""
Asserts that sending a pause to the process will raise the pause signal
in the tracer process.
"""
pause_called = [False]
def pause_handler(signal, stackframe):
"""
Pause handler that marks that we made it into this function.
"""
pause_called[0] = True
db = Qdb(
cmd_manager=self.cmd_manager,
host=self.tracer_host,
port=self.tracer_port,
)
signal.signal(db.pause_signal, pause_handler)
self.server.session_store.send_to_tracer(
uuid=db.uuid,
event=fmt_msg('pause')
)
self.assertTrue(pause_called)
开发者ID:brdfdr,项目名称:qdb,代码行数:25,代码来源:test_cmd_manager.py
示例10: test_locals
def test_locals(self):
"""
Tests accessing the locals.
"""
tracer = self.MockTracer()
tracer.curframe_locals = {'a': 'a'}
cmd_manager = self.cmd_manager
cmd_manager.start(tracer, '')
gyield()
self.server.session_store.send_to_tracer(
uuid=tracer.uuid,
event=fmt_msg('locals')
)
command_locals_called = NonLocal(False)
def test_command_locals(cmd_manager, tracer, payload):
command_locals_called.value = True
type(self.cmd_manager).command_locals(cmd_manager, tracer, payload)
cmd_locals = partial(test_command_locals, cmd_manager)
with gevent.Timeout(0.1, False), \
patch.object(cmd_manager, 'command_locals', cmd_locals):
cmd_manager.next_command(tracer)
self.assertTrue(command_locals_called.value)
tracer.start.assert_called() # Start always gets called.
self.server.session_store.slaughter(tracer.uuid)
开发者ID:quantopian,项目名称:qdb,代码行数:28,代码来源:test_cmd_manager.py
示例11: user_return
def user_return(self, stackframe, return_value):
stackframe.f_locals['__return__'] = return_value
self.setup_stack(stackframe, None)
self.cmd_manager.send_watchlist()
self.cmd_manager.send_stack()
msg = fmt_msg('return', str(return_value), serial=pickle.dumps)
self.cmd_manager.next_command(msg)
开发者ID:fuyuanwu,项目名称:qdb,代码行数:7,代码来源:tracer.py
示例12: send_to_clients
def send_to_clients(self, uuid, event):
"""
Routes an event to all clients connected to a session.
"""
if uuid not in self._sessions:
log.warn('send_to_clients failed: session %s does not exist'
% uuid)
return # Session doesn't exist.
try:
msg = fmt_msg(event['e'], event.get('p'), serial=json.dumps)
except (KeyError, ValueError) as e:
log.warn('send_to_clients(uuid=%s, event=%s) failed: %s'
% (uuid, event, e))
raise
clients = self._sessions[uuid].clients
with self._lock:
for client in set(clients):
try:
client.send(msg)
except Exception:
log.info('Client was closed for debug session: %s' % uuid)
clients.remove(client)
self._update_timestamp(uuid)
开发者ID:ThisGuyCodes,项目名称:qdb,代码行数:27,代码来源:session_store.py
示例13: test_locals
def test_locals(self):
"""
Tests accessing the locals.
"""
command_locals_called = [False]
def test_command_locals(cmd_manager, payload):
command_locals_called[0] = True
self.cmd_manager.command_locals(cmd_manager, payload)
tracer = self.MockTracer()
tracer.curframe_locals = {'a': 'a'}
cmd_manager = self.cmd_manager(tracer)
cmd_manager.command_locals = partial(test_command_locals, cmd_manager)
tracer.cmd_manager = cmd_manager
cmd_manager.start('')
sleep(0.01)
self.server.session_store.send_to_tracer(
uuid=tracer.uuid,
event=fmt_msg('locals')
)
with Timeout(0.1, False):
cmd_manager.next_command()
self.assertTrue(command_locals_called[0])
tracer.start.assert_called() # Start always gets called.
self.server.session_store.slaughter(tracer.uuid)
开发者ID:brdfdr,项目名称:qdb,代码行数:28,代码来源:test_cmd_manager.py
示例14: user_return
def user_return(self, stackframe, return_value):
stackframe.f_locals['__return__'] = return_value
self.setup_stack(stackframe, None)
bound_cmd_manager = self.bound_cmd_manager
bound_cmd_manager.send_watchlist()
bound_cmd_manager.send_stack()
bound_cmd_manager.next_command(
fmt_msg('return', str(return_value), serial=json.dumps),
)
开发者ID:quantopian,项目名称:qdb,代码行数:9,代码来源:tracer.py
示例15: test_eval_results
def test_eval_results(self, input_, exc, output):
"""
Tests that evaling code returns the proper results.
"""
prints = []
class cmd_manager(type(self.cmd_manager)):
"""
Captures print commands to make assertions on them.
"""
def send_print(self, input_, exc, output):
prints.append({
'input': input_,
'exc': exc,
'output': output
})
db = Qdb(
uuid='eval_test',
cmd_manager=cmd_manager(),
host=self.tracer_host,
port=self.tracer_port,
redirect_output=False,
green=True,
)
gyield()
self.server.session_store.send_to_tracer(
uuid=db.uuid,
event=fmt_msg('eval', input_)
)
self.server.session_store.send_to_tracer(
uuid=db.uuid,
event=fmt_msg('continue')
)
db.set_trace(stop=True)
self.server.session_store.slaughter(db.uuid)
self.assertTrue(prints)
print_ = prints[0]
self.assertEqual(print_['input'], input_)
self.assertEqual(print_['exc'], exc)
self.assertEqual(print_['output'], output)
开发者ID:quantopian,项目名称:qdb,代码行数:43,代码来源:test_cmd_manager.py
示例16: handle_client
def handle_client(self, environ, start_response):
path = environ['PATH_INFO']
ws = environ['wsgi.websocket']
addr = environ['REMOTE_ADDR']
try:
match = self.route.match(path)
if not match:
# This did not match our route.
return
log.info('Client request from %s' % addr)
uuid = match.group(1)
start_event = None
with Timeout(self.auth_timeout, False):
start_event = self.get_event(ws)
failed = False
message = ''
# Fall through the various ways to fail to generate a more helpful
# error message.
if not start_event:
message = 'No start event received'
failed = True
elif start_event['e'] != 'start':
message = "First event must be of type: 'start'"
failed = True
elif not self.auth_fn(start_event.get('p', '')):
log.warn('Client %s failed to authenticate' % addr)
message = 'Authentication failed'
failed = True
if failed:
try:
self.send_error(ws, 'auth', message)
ws.send(fmt_msg('disable', serial=json.dumps))
except WebSocketError:
# We are unable to send the disable message for some
# reason; however, they already failed auth so suppress
# it and close.
pass
return
if not self.session_store.attach_client(uuid, ws):
# We are attaching to a client that does not exist.
return
self.session_store.send_to_tracer(uuid, event=start_event)
for event in self.get_events(ws):
self.session_store.send_to_tracer(uuid, event=event)
finally:
log.info('Closing websocket to client %s' % addr)
ws.close()
开发者ID:OspreyX,项目名称:qdb,代码行数:54,代码来源:client.py
示例17: test_why_are_you_executing_all_these_commands
def test_why_are_you_executing_all_these_commands(self):
db = Qdb(
uuid='send_stack_test',
cmd_manager=self.cmd_manager,
host=self.tracer_host,
port=self.tracer_port,
redirect_output=False,
green=True,
)
gyield()
for n in range(sys.getrecursionlimit()):
self.server.session_store.send_to_tracer(
uuid=db.uuid,
event=fmt_msg('eval', 'None')
)
self.server.session_store.send_to_tracer(
uuid=db.uuid,
event=fmt_msg('continue')
)
with gevent.Timeout(1):
db.set_trace(stop=True)
开发者ID:quantopian,项目名称:qdb,代码行数:21,代码来源:test_cmd_manager.py
示例18: test_send_stack_results
def test_send_stack_results(self, use_skip_fn):
"""
Tests that the results from sending the stack are accurate.
WARNING: This test uses lines of it's own source as string literals,
be sure to edit the source and the string if you make any changes.
"""
def skip_fn(filename):
return not fix_filename(__file__) in filename
events = []
def capture_event(self, event, payload):
events.append(fmt_msg(event, payload))
class cmd_manager(type(self.cmd_manager)):
"""
Wrap send_stack by just capturing the output to make assertions on
it.
"""
def send_stack(self, tracer):
with patch.object(cmd_manager, 'send_event', capture_event):
super(cmd_manager, self).send_stack(tracer)
db = Qdb(
uuid='send_stack_test',
cmd_manager=cmd_manager(),
host=self.tracer_host,
port=self.tracer_port,
redirect_output=False,
skip_fn=skip_fn if use_skip_fn else None,
green=True,
)
gyield()
self.server.session_store.send_to_tracer(
uuid=db.uuid,
event=fmt_msg('continue')
)
db.set_trace(stop=True)
self.assertTrue(events) # EDIT IN BOTH PLACES
event = events[0]
if use_skip_fn:
# Assert that we actually suppressed some frames.
self.assertTrue(len(event['p']['stack']) < len(db.stack))
self.assertEqual(
# I love dictionaries so much!
event['p']['stack'][event['p']['index']]['code'],
' self.assertTrue(events) # EDIT IN BOTH PLACES',
)
self.server.session_store.slaughter(db.uuid)
开发者ID:quantopian,项目名称:qdb,代码行数:52,代码来源:test_cmd_manager.py
示例19: user_exception
def user_exception(self, stackframe, exc_info):
exc_type, exc_value, exc_traceback = exc_info
stackframe.f_locals['__exception__'] = exc_type, exc_value
self.setup_stack(stackframe, exc_traceback)
self.cmd_manager.send_watchlist()
self.cmd_manager.send_stack()
msg = fmt_msg(
'exception', {
'type': exc_type.__name__,
'value': str(exc_value),
'traceback': traceback.format_tb(exc_traceback)
},
serial=pickle.dumps,
)
self.cmd_manager.next_command(msg)
开发者ID:fuyuanwu,项目名称:qdb,代码行数:15,代码来源:tracer.py
示例20: test_commands
def test_commands(self, attrgetter_, event, payload=None):
"""
Tests various commands with or without payloads.
"""
tracer = self.MockTracer()
cmd_manager = self.cmd_manager
cmd_manager.start(tracer, '')
self.server.session_store.send_to_tracer(
uuid=tracer.uuid,
event=fmt_msg(event, payload)
)
with gevent.Timeout(0.1, False):
cmd_manager.next_command(tracer)
tracer.start.assert_called() # Start always gets called.
attrgetter_(tracer).assert_called()
# Kill the session we just created
self.server.session_store.slaughter(tracer.uuid)
开发者ID:quantopian,项目名称:qdb,代码行数:17,代码来源:test_cmd_manager.py
注:本文中的qdb.comm.fmt_msg函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论