本文整理汇总了Python中qutebrowser.misc.ipc.send_to_running_instance函数的典型用法代码示例。如果您正苦于以下问题:Python send_to_running_instance函数的具体用法?Python send_to_running_instance怎么用?Python send_to_running_instance使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了send_to_running_instance函数的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_socket_error
def test_socket_error(self):
socket = FakeSocket(error=QLocalSocket.ConnectionError)
with pytest.raises(ipc.Error) as excinfo:
ipc.send_to_running_instance('qute-test', [], None, socket=socket)
msg = "Error while writing to running instance: Error string (error 7)"
assert str(excinfo.value) == msg
开发者ID:axs221,项目名称:qutebrowser,代码行数:7,代码来源:test_ipc.py
示例2: send_cmd
def send_cmd(self, command, count=None, invalid=False, *, escape=True):
"""Send a command to the running qutebrowser instance.
Args:
count: The count to pass to the command.
invalid: If True, we don't wait for "command called: ..." in the
log
escape: Escape backslashes in the command
"""
summary = command
if count is not None:
summary += ' (count {})'.format(count)
self.log_summary(summary)
assert self._ipc_socket is not None
time.sleep(self._delay / 1000)
if escape:
command = command.replace('\\', r'\\')
if count is not None:
command = ':{}:{}'.format(count, command.lstrip(':'))
ipc.send_to_running_instance(self._ipc_socket, [command],
target_arg='')
if not invalid:
self.wait_for(category='commands', module='command',
function='run', message='command called: *')
开发者ID:addictedtoflames,项目名称:qutebrowser,代码行数:29,代码来源:quteprocess.py
示例3: send_cmd
def send_cmd(self, command):
assert self._ipc_socket is not None
ipc.send_to_running_instance(self._ipc_socket, [command],
target_arg='')
self.wait_for(category='commands', module='command', function='run',
message='command called: *')
开发者ID:LeonZZhou,项目名称:qutebrowser,代码行数:7,代码来源:quteprocess.py
示例4: run
def run(args):
"""Initialize everthing and run the application."""
# pylint: disable=too-many-statements
if args.version:
print(version.version())
print()
print()
print(qutebrowser.__copyright__)
print()
print(version.GPL_BOILERPLATE.strip())
sys.exit(usertypes.Exit.ok)
if args.temp_basedir:
args.basedir = tempfile.mkdtemp(prefix='qutebrowser-basedir-')
quitter = Quitter(args)
objreg.register('quitter', quitter)
global qApp
qApp = Application(args)
qApp.lastWindowClosed.connect(quitter.on_last_window_closed)
crash_handler = crashsignal.CrashHandler(
app=qApp, quitter=quitter, args=args, parent=qApp)
crash_handler.activate()
objreg.register('crash-handler', crash_handler)
signal_handler = crashsignal.SignalHandler(app=qApp, quitter=quitter,
parent=qApp)
signal_handler.activate()
objreg.register('signal-handler', signal_handler)
try:
sent = ipc.send_to_running_instance(args)
if sent:
sys.exit(usertypes.Exit.ok)
log.init.debug("Starting IPC server...")
server = ipc.IPCServer(args, qApp)
objreg.register('ipc-server', server)
server.got_args.connect(lambda args, cwd:
process_pos_args(args, cwd=cwd, via_ipc=True))
except ipc.AddressInUseError as e:
# This could be a race condition...
log.init.debug("Got AddressInUseError, trying again.")
time.sleep(500)
sent = ipc.send_to_running_instance(args)
if sent:
sys.exit(usertypes.Exit.ok)
else:
ipc.display_error(e, args)
sys.exit(usertypes.Exit.err_ipc)
except ipc.Error as e:
ipc.display_error(e, args)
# We didn't really initialize much so far, so we just quit hard.
sys.exit(usertypes.Exit.err_ipc)
init(args, crash_handler)
ret = qt_mainloop()
return ret
开发者ID:xetch,项目名称:qutebrowser,代码行数:59,代码来源:app.py
示例5: test_socket_error_no_server
def test_socket_error_no_server(self):
socket = FakeSocket(error=QLocalSocket.ConnectionError,
connect_successful=False)
with pytest.raises(ipc.Error) as excinfo:
ipc.send_to_running_instance('qute-test', [], None, socket=socket)
msg = ("Error while connecting to running instance: Error string "
"(error 7)")
assert str(excinfo.value) == msg
开发者ID:axs221,项目名称:qutebrowser,代码行数:9,代码来源:test_ipc.py
示例6: send_ipc
def send_ipc(self, commands, target_arg=''):
"""Send a raw command to the running IPC socket."""
delay = self.request.config.getoption('--qute-delay')
time.sleep(delay / 1000)
assert self._ipc_socket is not None
ipc.send_to_running_instance(self._ipc_socket, commands, target_arg)
self.wait_for(category='ipc', module='ipc', function='on_ready_read',
message='Read from socket *')
开发者ID:Dietr1ch,项目名称:qutebrowser,代码行数:9,代码来源:quteprocess.py
示例7: send_cmd
def send_cmd(self, command, count=None):
"""Send a command to the running qutebrowser instance."""
assert self._ipc_socket is not None
time.sleep(self._delay / 1000)
if count is not None:
command = ":{}:{}".format(count, command.lstrip(":"))
ipc.send_to_running_instance(self._ipc_socket, [command], target_arg="")
self.wait_for(category="commands", module="command", function="run", message="command called: *")
开发者ID:Link-Satonaka,项目名称:qutebrowser,代码行数:11,代码来源:quteprocess.py
示例8: send_cmd
def send_cmd(self, command, count=None):
"""Send a command to the running qutebrowser instance."""
assert self._ipc_socket is not None
time.sleep(self._delay / 1000)
if count is not None:
command = ':{}:{}'.format(count, command.lstrip(':'))
ipc.send_to_running_instance(self._ipc_socket, [command],
target_arg='')
self.wait_for(category='commands', module='command', function='run',
message='command called: *')
开发者ID:IsSuEat,项目名称:qutebrowser,代码行数:13,代码来源:quteprocess.py
示例9: test_normal
def test_normal(self, qtbot, tmpdir, ipc_server, mocker, has_cwd):
ipc_server.listen()
spy = QSignalSpy(ipc_server.got_args)
raw_spy = QSignalSpy(ipc_server.got_raw)
error_spy = QSignalSpy(ipc_server.got_invalid_data)
with qtbot.waitSignal(ipc_server.got_args, raising=True, timeout=5000):
with tmpdir.as_cwd():
if not has_cwd:
m = mocker.patch('qutebrowser.misc.ipc.os')
m.getcwd.side_effect = OSError
sent = ipc.send_to_running_instance('qute-test', ['foo'], None)
assert sent
assert not error_spy
expected_cwd = str(tmpdir) if has_cwd else ''
assert len(spy) == 1
assert spy[0] == [['foo'], '', expected_cwd]
assert len(raw_spy) == 1
assert len(raw_spy[0]) == 1
raw_expected = {'args': ['foo'], 'target_arg': None,
'version': qutebrowser.__version__,
'protocol_version': ipc.PROTOCOL_VERSION}
if has_cwd:
raw_expected['cwd'] = str(tmpdir)
parsed = json.loads(raw_spy[0][0].decode('utf-8'))
assert parsed == raw_expected
开发者ID:axs221,项目名称:qutebrowser,代码行数:30,代码来源:test_ipc.py
示例10: test_normal
def test_normal(self, qtbot, tmpdir, ipc_server, mocker, has_cwd):
ipc_server.listen()
with qtbot.assertNotEmitted(ipc_server.got_invalid_data):
with qtbot.waitSignal(ipc_server.got_args,
timeout=5000) as blocker:
with qtbot.waitSignal(ipc_server.got_raw,
timeout=5000) as raw_blocker:
with tmpdir.as_cwd():
if not has_cwd:
m = mocker.patch('qutebrowser.misc.ipc.os')
m.getcwd.side_effect = OSError
sent = ipc.send_to_running_instance(
'qute-test', ['foo'], None)
assert sent
expected_cwd = str(tmpdir) if has_cwd else ''
assert blocker.args == [['foo'], '', expected_cwd]
raw_expected = {'args': ['foo'], 'target_arg': None,
'version': qutebrowser.__version__,
'protocol_version': ipc.PROTOCOL_VERSION}
if has_cwd:
raw_expected['cwd'] = str(tmpdir)
assert len(raw_blocker.args) == 1
parsed = json.loads(raw_blocker.args[0].decode('utf-8'))
assert parsed == raw_expected
开发者ID:michaelbeaumont,项目名称:qutebrowser,代码行数:30,代码来源:test_ipc.py
示例11: test_not_disconnected_immediately
def test_not_disconnected_immediately(self):
socket = FakeSocket()
ipc.send_to_running_instance('qute-test', [], None, socket=socket)
开发者ID:axs221,项目名称:qutebrowser,代码行数:3,代码来源:test_ipc.py
示例12: test_no_server
def test_no_server(self, caplog):
sent = ipc.send_to_running_instance('qute-test', [], None)
assert not sent
msg = caplog.records[-1].message
assert msg == "No existing instance present (error 2)"
开发者ID:axs221,项目名称:qutebrowser,代码行数:5,代码来源:test_ipc.py
示例13: __init__
def __init__(self, args):
"""Constructor.
Args:
Argument namespace from argparse.
"""
# pylint: disable=too-many-statements
self._quit_status = {
'crash': True,
'tabs': False,
'main': False,
}
self.geometry = None
self._shutting_down = False
self._crashdlg = None
self._crashlogfile = None
qt_args = qtutils.get_args(args)
log.init.debug("Qt arguments: {}, based on {}".format(qt_args, args))
super().__init__(qt_args)
sys.excepthook = self._exception_hook
self._args = args
objreg.register('args', args)
objreg.register('app', self)
if self._args.version:
print(version.version())
print()
print()
print(qutebrowser.__copyright__)
print()
print(version.GPL_BOILERPLATE.strip())
sys.exit(0)
try:
sent = ipc.send_to_running_instance(self._args.command)
if sent:
sys.exit(0)
log.init.debug("Starting IPC server...")
ipc.init()
except ipc.IPCError as e:
text = ('{}\n\nMaybe another instance is running but '
'frozen?'.format(e))
msgbox = QMessageBox(QMessageBox.Critical, "Error while "
"connecting to running instance!", text)
msgbox.exec_()
# We didn't really initialize much so far, so we just quit hard.
sys.exit(1)
log.init.debug("Starting init...")
self.setQuitOnLastWindowClosed(False)
self.setOrganizationName("qutebrowser")
self.setApplicationName("qutebrowser")
self.setApplicationVersion(qutebrowser.__version__)
self._init_icon()
utils.actute_warning()
try:
self._init_modules()
except (OSError, UnicodeDecodeError) as e:
msgbox = QMessageBox(
QMessageBox.Critical, "Error while initializing!",
"Error while initializing: {}".format(e))
msgbox.exec_()
sys.exit(1)
QTimer.singleShot(0, self._process_args)
log.init.debug("Initializing eventfilter...")
self._event_filter = modeman.EventFilter(self)
self.installEventFilter(self._event_filter)
log.init.debug("Connecting signals...")
self._connect_signals()
log.init.debug("Setting up signal handlers...")
self._setup_signals()
QDesktopServices.setUrlHandler('http', self.open_desktopservices_url)
QDesktopServices.setUrlHandler('https', self.open_desktopservices_url)
QDesktopServices.setUrlHandler('qute', self.open_desktopservices_url)
log.init.debug("Init done!")
if self._crashdlg is not None:
self._crashdlg.raise_()
开发者ID:JIVS,项目名称:qutebrowser,代码行数:86,代码来源:app.py
示例14: test_socket_error
def test_socket_error(self):
socket = FakeSocket(error=QLocalSocket.ConnectionError)
with pytest.raises(ipc.Error, match=r"Error while writing to running "
r"instance: Error string \(error 7\)"):
ipc.send_to_running_instance('qute-test', [], None, socket=socket)
开发者ID:michaelbeaumont,项目名称:qutebrowser,代码行数:5,代码来源:test_ipc.py
示例15: qute_reload
def qute_reload():
"""Send config-source command to qutebrowsers ipc server."""
args = qutebrowser.get_argparser().parse_args()
app.standarddir.init(args)
socket = ipc._get_socketname(args.basedir)
ipc.send_to_running_instance(socket, [":config-source"], args.target)
开发者ID:pcalves,项目名称:dotfiles,代码行数:6,代码来源:qutebrowser_reload.py
示例16: test_socket_error_no_server
def test_socket_error_no_server(self):
socket = FakeSocket(error=QLocalSocket.ConnectionError,
connect_successful=False)
with pytest.raises(ipc.Error, match=r"Error while connecting to "
r"running instance: Error string \(error 7\)"):
ipc.send_to_running_instance('qute-test', [], None, socket=socket)
开发者ID:michaelbeaumont,项目名称:qutebrowser,代码行数:6,代码来源:test_ipc.py
示例17: __init__
def __init__(self, args):
"""Constructor.
Args:
Argument namespace from argparse.
"""
# pylint: disable=too-many-statements
self._quit_status = {
'crash': True,
'tabs': False,
'main': False,
}
self.geometry = None
self._shutting_down = False
self._crashdlg = None
self._crashlogfile = None
if args.debug:
# We don't enable this earlier because some imports trigger
# warnings (which are not our fault).
warnings.simplefilter('default')
qt_args = qtutils.get_args(args)
log.init.debug("Qt arguments: {}, based on {}".format(qt_args, args))
super().__init__(qt_args)
sys.excepthook = self._exception_hook
self._args = args
objreg.register('args', args)
objreg.register('app', self)
if self._args.version:
print(version.version())
print()
print()
print(qutebrowser.__copyright__)
print()
print(version.GPL_BOILERPLATE.strip())
sys.exit(0)
sent = ipc.send_to_running_instance(self._args.command)
if sent:
sys.exit(0)
log.init.debug("Starting init...")
self.setQuitOnLastWindowClosed(False)
self.setOrganizationName("qutebrowser")
self.setApplicationName("qutebrowser")
self.setApplicationVersion(qutebrowser.__version__)
utils.actute_warning()
try:
self._init_modules()
except OSError as e:
msgbox = QMessageBox(
QMessageBox.Critical, "Error while initializing!",
"Error while initializing: {}".format(e))
msgbox.exec_()
sys.exit(1)
QTimer.singleShot(0, self._open_pages)
log.init.debug("Initializing eventfilter...")
self._event_filter = modeman.EventFilter(self)
self.installEventFilter(self._event_filter)
log.init.debug("Connecting signals...")
self._connect_signals()
log.init.debug("Applying python hacks...")
self._python_hacks()
log.init.debug("Starting IPC server...")
ipc.init()
QDesktopServices.setUrlHandler('http', self.open_desktopservices_url)
QDesktopServices.setUrlHandler('https', self.open_desktopservices_url)
QDesktopServices.setUrlHandler('qute', self.open_desktopservices_url)
log.init.debug("Init done!")
if self._crashdlg is not None:
self._crashdlg.raise_()
开发者ID:helenst,项目名称:qutebrowser,代码行数:82,代码来源:app.py
注:本文中的qutebrowser.misc.ipc.send_to_running_instance函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论