• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python signal.signal函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中signal.signal函数的典型用法代码示例。如果您正苦于以下问题:Python signal函数的具体用法?Python signal怎么用?Python signal使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了signal函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: save

def save(model, timings, post_fix=""):
    print "Saving the model..."

    # ignore keyboard interrupt while saving
    start = time.time()
    s = signal.signal(signal.SIGINT, signal.SIG_IGN)

    model.save(
        model.state["save_dir"] + "/" + model.state["run_id"] + "_" + model.state["prefix"] + post_fix + "model.npz"
    )
    cPickle.dump(
        model.state,
        open(
            model.state["save_dir"]
            + "/"
            + model.state["run_id"]
            + "_"
            + model.state["prefix"]
            + post_fix
            + "state.pkl",
            "w",
        ),
    )
    numpy.savez(
        model.state["save_dir"] + "/" + model.state["run_id"] + "_" + model.state["prefix"] + post_fix + "timing.npz",
        **timings
    )
    signal.signal(signal.SIGINT, s)

    print "Model saved, took {}".format(time.time() - start)
开发者ID:pcolonna,项目名称:hed-dlg-truncated,代码行数:30,代码来源:train.py


示例2: embeded_window

def embeded_window():
	""" Create Gtk.Plug widget
	"""
	
	window_id = 0
	plug = Gtk.Plug(window_id)
	
	#FIXME
	print plug.get_id()
	
	builder = Gtk.Builder()
	builder.add_from_file(dirname + '/data/ui/blivet-gui.ui')

	signal.signal(signal.SIGINT, signal.SIG_DFL)

	vbox = builder.get_object("vbox")
	vbox.reparent(plug)

	dlist = ListDevices(builder)

	builder.get_object("disks_viewport").add(dlist.get_disks_view())

	builder.get_object("partitions_viewport").add(dlist.return_partitions_view())

	builder.get_object("actions_viewport").add(dlist.return_actions_view())

	builder.get_object("image_window").add(dlist.return_partitions_image())

	builder.get_object("vbox").add(dlist.get_partions_list().get_toolbar)
	
	return plug
开发者ID:vpodzime,项目名称:blivet-gui,代码行数:31,代码来源:main_window.py


示例3: register_signal

 def register_signal(self, signum, callback):
     """callback signature: (signum)"""
     if signum not in self._signal_handlers:
         self._signal_handlers[signum] = [callback]
         signal.signal(signum, self._generic_signal_handler)
     else:
         self._signal_handlers[signum].append(callback)
开发者ID:pombreda,项目名称:microactor,代码行数:7,代码来源:base.py


示例4: build

def build(args):
    """ the entry point for all build subcommand tasks """
    builder = None
    fail = False
    try:
        manifest = None

        with open(args.manifest, 'r') as fh:
            manifest = fh.read()

        buildspec = BuildSpec(manifest, args.version, args.type, args.parallel,
                              args.projects)
        builder = BuilderFactory.create_builder(args.type, buildspec)

        def _signal_handler(*args):
            LOG.info("Process interrrupted. Cleaning up.")
            builder.cleanup()
            sys.exit()
        signal.signal(signal.SIGINT, _signal_handler)

        rc = builder.build()
    except Exception as e:
        LOG.exception("Oops something went wrong: %s", e)
        fail = True

    builder.cleanup()
    if fail:
        sys.exit(-1)
    sys.exit(rc)
开发者ID:blueboxgroup,项目名称:giftwrap,代码行数:29,代码来源:shell.py


示例5: daemonize

def daemonize( errfile ):
    """
    Detach process and become a daemon.
    """
    pid = os.fork()
    if pid:
        os._exit(0)

    os.setsid()
    signal.signal(signal.SIGHUP, signal.SIG_IGN)
    os.umask(0)

    pid = os.fork()
    if pid:
        os._exit(0)

    os.chdir("/")
    for fd in range(0,20):
        try:
            os.close(fd)
        except OSError:
            pass

    sys.stdin = open("/dev/null","r")
    sys.stdout = open("/dev/null","w")
    sys.stderr = ErrorLog( errfile )
开发者ID:AnyMesh,项目名称:anyMesh-Python,代码行数:26,代码来源:web_display.py


示例6: __init__

    def __init__(self, prompt="> ", stdin=None, stdout=None):
        if _debug: ConsoleCmd._debug("__init__")
        cmd.Cmd.__init__(self, stdin=stdin, stdout=stdout)
        Thread.__init__(self, name="ConsoleCmd")

        # check to see if this is running interactive
        self.interactive = sys.__stdin__.isatty()

        # save the prompt for interactive sessions, otherwise be quiet
        if self.interactive:
            self.prompt = prompt
        else:
            self.prompt = ''

        # gc counters
        self.type2count = {}
        self.type2all = {}

        # logging handlers
        self.handlers = {}

        # set a INT signal handler, ^C will only get sent to the
        # main thread and there's no way to break the readline
        # call initiated by this thread - sigh
        if hasattr(signal, 'SIGINT'):
            signal.signal(signal.SIGINT, console_interrupt)

        # start the thread
        self.start()
开发者ID:DemandLogic,项目名称:bacpypes,代码行数:29,代码来源:consolecmd.py


示例7: serve_forever

    def serve_forever(self):
        """ Accepts connection from multiple clients.
        """
        # register SIGTERM for graceful exit.
        def stop_gracefully(signum, frame):
            self.stop_server()
        
        signal.signal(signal.SIGINT, stop_gracefully)

        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind(self.addr)
        log.debug('%s listening on %s' % (type(self).__name__, self.addr))

        # start listening on this port
        sock.listen(5)
        # sock.setblocking(0)

        # spawn worker processes
        self.spawn_worker_processes()

        try:
            while True:
                # start accepting connections
                log.debug('Main: Waiting for incoming connections')
                connection, address = sock.accept()
                # connection.setblocking(0)

                serialized_socket = reduce_handle(connection.fileno())
                self.conn_queue.put(serialized_socket)
        except socket.error:
            log.warning('Interrupt: Stopping main process')
            self.stop_server()
        finally:
            sock.close()
开发者ID:elisbyberi,项目名称:utils,代码行数:35,代码来源:prefork_server.py


示例8: run

    def run(self):
        """Runs the handler, flushes the streams, and ends the request."""
        # If there is a timeout
        if self._timeout:
            old_alarm = signal.signal(signal.SIGALRM, self.timeout_handler)
            signal.alarm(self._timeout)
            
        try:
            protocolStatus, appStatus = self.server.handler(self)
        except:
            traceback.print_exc(file=self.stderr)
            self.stderr.flush()
            if not self.stdout.dataWritten:
                self.server.error(self)

            protocolStatus, appStatus = FCGI_REQUEST_COMPLETE, 0

        if __debug__: _debug(1, 'protocolStatus = %d, appStatus = %d' %
                             (protocolStatus, appStatus))

        # Restore old handler if timeout was given
        if self._timeout:
            signal.alarm(0)
            signal.signal(signal.SIGALRM, old_alarm)

        try:
            self._flush()
            self._end(appStatus, protocolStatus)
        except socket.error, e:
            if e[0] != errno.EPIPE:
                raise
开发者ID:Blue0ctober,项目名称:microblog,代码行数:31,代码来源:fcgi_base.py


示例9: _install_signal_handlers

    def _install_signal_handlers(self):
        """Installs signal handlers for handling SIGINT and SIGTERM
        gracefully.
        """

        signal.signal(signal.SIGINT, self.request_stop)
        signal.signal(signal.SIGTERM, self.request_stop)
开发者ID:Kisioj,项目名称:rq,代码行数:7,代码来源:worker.py


示例10: __init__

  def __init__(self, signum, handler_fn=None, wakeup=None):
    """Constructs a new SignalHandler instance.

    @type signum: int or list of ints
    @param signum: Single signal number or set of signal numbers
    @type handler_fn: callable
    @param handler_fn: Signal handling function

    """
    assert handler_fn is None or callable(handler_fn)

    self.signum = set(signum)
    self.called = False

    self._handler_fn = handler_fn
    self._wakeup = wakeup

    self._previous = {}
    try:
      for signum in self.signum:
        # Setup handler
        prev_handler = signal.signal(signum, self._HandleSignal)
        try:
          self._previous[signum] = prev_handler
        except:
          # Restore previous handler
          signal.signal(signum, prev_handler)
          raise
    except:
      # Reset all handlers
      self.Reset()
      # Here we have a race condition: a handler may have already been called,
      # but there's not much we can do about it at this point.
      raise
开发者ID:skycover,项目名称:ganeti-sci,代码行数:34,代码来源:__init__.py


示例11: _paramiko_tunnel

def _paramiko_tunnel(lport, rport, server, remoteip, keyfile=None, password=None):
    """Function for actually starting a paramiko tunnel, to be passed
    to multiprocessing.Process(target=this), and not called directly.
    """
    username, server, port = _split_server(server)
    client = paramiko.SSHClient()
    client.load_system_host_keys()
    client.set_missing_host_key_policy(paramiko.WarningPolicy())

    try:
        client.connect(server, port, username=username, key_filename=keyfile,
                       look_for_keys=True, password=password)
#    except paramiko.AuthenticationException:
#        if password is None:
#            password = getpass("%[email protected]%s's password: "%(username, server))
#            client.connect(server, port, username=username, password=password)
#        else:
#            raise
    except Exception as e:
        print ('*** Failed to connect to %s:%d: %r' % (server, port, e))
        sys.exit(1)
    
    # Don't let SIGINT kill the tunnel subprocess
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    
    try:
        forward_tunnel(lport, remoteip, rport, client.get_transport())
    except KeyboardInterrupt:
        print ('SIGINT: Port forwarding stopped cleanly')
        sys.exit(0)
    except Exception as e:
        print ("Port forwarding stopped uncleanly: %s"%e)
        sys.exit(255)
开发者ID:dvska,项目名称:ipython,代码行数:33,代码来源:tunnel.py


示例12: _daemonize

 def _daemonize(self):
     if not self.config.NODETACH:
         # fork so the parent can exist
         if os.fork():
             return -1
         # deconnect from tty and create a new session
         os.setsid()
         # fork again so the parent, (the session group leader), can exit.
         # as a non-session group leader, we can never regain a controlling
         # terminal.
         if os.fork():
             return -1
         # move to the root to avoit mount pb
         os.chdir("/")
         # set paranoid umask
         os.umask(0o77)
         # write pid in a file
         f = open(self._pid_file, "w")
         f.write(str(os.getpid()))
         f.close()
         # close standard descriptors
         sys.stdin.close()
         sys.stdout.close()
         sys.stderr.close()
         # put signal handler
         signal.signal(signal.SIGTERM, self.signal_handler)
         signal.signal(signal.SIGHUP, self.signal_handler)
开发者ID:shaleh,项目名称:clonedigger,代码行数:27,代码来源:daemon.py


示例13: main

def main(argv):
    global command
    try:
        opts, args = getopt.getopt(argv, "sari:t:", ["sort", "rand", "rev", "item", "test"])
    except getopt.GetoptError:
        err()

    a = ["sort", "rand", "rev"]
    i = 1
    test = "100"
    for opt, arg in opts:
        if opt == "-u":
            err()
        elif opt in ("-s", "sort"):
            command = "sort"
        elif opt in ("-a", "random"):
            command = "rand"
        elif opt in ("-r", "reversed"):
            command = "rev"
        elif opt in ("-i", "item"):
            i = int(arg)
        elif opt in ("-t", "test"):
            test = arg
        else:
            err()
    atexit.register(exit, (test))
    signal.signal(signal.SIGTERM,exit)
    run(i, test)
开发者ID:paolo215,项目名称:Sorting-Algorithms,代码行数:28,代码来源:item.py


示例14: remove_signal_handler

    def remove_signal_handler(self, sig):
        """Remove a handler for a signal.  UNIX only.

        Return True if a signal handler was removed, False if not.
        """
        self._check_signal(sig)
        try:
            del self._signal_handlers[sig]
        except KeyError:
            return False

        if sig == signal.SIGINT:
            handler = signal.default_int_handler
        else:
            handler = signal.SIG_DFL

        try:
            signal.signal(sig, handler)
        except OSError as exc:
            if exc.errno == errno.EINVAL:
                raise RuntimeError('sig {} cannot be caught'.format(sig))
            else:
                raise

        if not self._signal_handlers:
            try:
                signal.set_wakeup_fd(-1)
            except (ValueError, OSError) as exc:
                logger.info('set_wakeup_fd(-1) failed: %s', exc)

        return True
开发者ID:LPRD,项目名称:build_tools,代码行数:31,代码来源:unix_events.py


示例15: main

def main():
    # Instantiate the Infrared-Thermopile Sensor on I2C on bus 1
    mySensor = TMP006.TMP006(1)

    ## Exit handlers ##
    # This stops python from printing a stacktrace when you hit control-C
    def SIGINTHandler(signum, frame):
        raise SystemExit

    # This lets you run code on exit,
    # including functions from mySensor
    def exitHandler():
        print("Exiting")
        sys.exit(0)

    # Register exit handlers
    atexit.register(exitHandler)
    signal.signal(signal.SIGINT, SIGINTHandler)

    # activate periodic measurements
    mySensor.setActive();

    # Print out temperature value and config-reg in hex every 0.5 seconds
    while(1):
        mySensor.getTemperature(True)
        hex(mySensor.getConfig())

        time.sleep(.5)
开发者ID:chihchun,项目名称:upm,代码行数:28,代码来源:tmp006.py


示例16: __init__

	def __init__ (self,configuration):
		self.ip = environment.settings().tcp.bind
		self.port = environment.settings().tcp.port

		self.max_loop_time = environment.settings().reactor.speed
		self.half_loop_time = self.max_loop_time / 2

		self.logger = Logger()
		self.daemon = Daemon(self)
		self.processes = None
		self.listener = None
		self.configuration = Configuration(configuration)

		self._peers = {}
		self._shutdown = False
		self._reload = False
		self._reload_processes = False
		self._restart = False
		self._route_update = False
		self._saved_pid = False
		self._commands = []
		self._pending = []

		signal.signal(signal.SIGTERM, self.sigterm)
		signal.signal(signal.SIGHUP, self.sighup)
		signal.signal(signal.SIGALRM, self.sigalrm)
		signal.signal(signal.SIGUSR1, self.sigusr1)
		signal.signal(signal.SIGUSR2, self.sigusr2)
开发者ID:bpd1069,项目名称:exabgp,代码行数:28,代码来源:__init__.py


示例17: main

def main():
  signal.signal(signal.SIGUSR1, DumpThreadStacks)

  parser = argparse.ArgumentParser()
  command_parsers = parser.add_subparsers(title='test types',
                                          dest='command')

  for test_type, config in sorted(VALID_COMMANDS.iteritems(),
                                  key=lambda x: x[0]):
    subparser = command_parsers.add_parser(
        test_type, usage='%(prog)s [options]', help=config.help_txt)
    config.add_options_func(subparser)

  args = parser.parse_args()

  try:
    return RunTestsCommand(args, parser)
  except base_error.BaseError as e:
    logging.exception('Error occurred.')
    if e.is_infra_error:
      return constants.INFRA_EXIT_CODE
    return constants.ERROR_EXIT_CODE
  except: # pylint: disable=W0702
    logging.exception('Unrecognized error occurred.')
    return constants.ERROR_EXIT_CODE
开发者ID:Hepolise,项目名称:Chromium-Snapdragon-caf,代码行数:25,代码来源:test_runner.py


示例18: wait_for_keyboard

 def wait_for_keyboard(self):
     """quit when press q or press ctrl-c, or exception from other threads"""
     try:
         while True:
             time.sleep(2)   # 减少死锁概率
             print("enter 'q' to quit, 'r' to reload, 'l' to list ports.")
             key = input().lower().strip()
             if key == 'q':
                 break
             elif key == 'r':
                 log.info('main: reload config.')
                 self.start_threads_from_config()
             elif key == 'l':
                 try:
                     print('name, station_port, dispatch_port, control_port')
                     for name, config in sorted(self.configs['entry'].items()):
                         control_port = str(config['controlPort']) if 'controlPort' in config else None
                         print('%s, %d, %d, %s'
                               % (name, config['stationPort'], config['listenPort'], control_port))
                 except Exception as e:
                     print('Error when list config: %s' % e)
     except KeyboardInterrupt:
         pass
     except (EOFError, OSError):
         # no input
         signal.signal(signal.SIGINT, self.exit_by_signal)
         signal.signal(signal.SIGTERM, self.exit_by_signal)
         while not self.is_interrupt:
             time.sleep(1)
开发者ID:bssthu,项目名称:rtk_trans,代码行数:29,代码来源:rtk_trans.py


示例19: ignore_sigint

def ignore_sigint():
    """Ignore SIGINT for the duration."""
    old_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
    try:
        yield
    finally:
        signal.signal(signal.SIGINT, old_handler)
开发者ID:evanunderscore,项目名称:pygnurl,代码行数:7,代码来源:interface.py


示例20: main

def main():

    # Default subnet is required for the host tracker to work.
    subnet_control = SubnetControl()
    subnet_control.list()
    subnet_control.add_subnet("defaultSubnet", "10.0.0.254/8")

    raw_input("[Press enter when mininet is ready] ")
    print("-------------------------")

    # Add per-protocol flows so we can monitor stats that way
    x = analytics.add_protocol_flows()
    if (not x):
        print "Unable to add per-protocol flows"

    m = WaypointMonitor(Stats.TYPE_SUBNET, subnet="10.0.0.1/32")
    m.set_waypoint("10.0.0.2")
    m.set_large_flow_threshold(2000) # 2000 bytes
    m.start()

    # Register signal-handler to catch SIG_INT
    signal.signal(signal.SIGINT, signal_handler)
    signal.pause()

    # join() won't return until SIG_INT has been captured
    m.join()
开发者ID:inocybe,项目名称:odl-affinity,代码行数:26,代码来源:demo.py



注:本文中的signal.signal函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python signals.emit函数代码示例发布时间:2022-05-27
下一篇:
Python signal.siginterrupt函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap