• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python globalLogPublisher.addObserver函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中twisted.logger.globalLogPublisher.addObserver函数的典型用法代码示例。如果您正苦于以下问题:Python addObserver函数的具体用法?Python addObserver怎么用?Python addObserver使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了addObserver函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: init

def init(outFile):
    level = levels[config.LOG_LEVEL]
    predicate = LogLevelFilterPredicate(defaultLogLevel=level)
    observer = FilteringLogObserver(textFileLogObserver(outFile=outFile), [predicate])
    observer._encoding = "utf-8"
    globalLogPublisher.addObserver(observer)
    log.info("Start logging with {l}", l=level)
开发者ID:melnikk,项目名称:worker,代码行数:7,代码来源:logs.py


示例2: test_verbose_logging

    def test_verbose_logging(self):
        """
        If verbose logging is turned on, the full request and response is
        logged.
        """
        self.patch(helpers, 'get_site', partial(get_site, logging=True))
        logged_events = []
        addObserver(logged_events.append)
        self.addCleanup(removeObserver, logged_events.append)

        response, url = self.make_request_to_site()

        self.assertEqual(2, len(logged_events))
        self.assertTrue(all([not event['isError'] for event in logged_events]))

        messages = [get_log_message(event) for event in logged_events]
        request_match = re.compile(
            "^Received request: GET (?P<url>.+)\n"
            "Headers: (?P<headers>\{.+\})\n\s*$"
        ).match(messages[0])
        self.assertNotEqual(None, request_match)
        self.assertEqual(url, request_match.group('url'))
        headers = json.loads(request_match.group('headers'))
        self.assertEqual(['two'], headers.get('One'))

        response_match = re.compile(
            "^Responding with 200 for: GET (?P<url>.+)\n"
            "Headers: (?P<headers>\{.+\})\n"
            "\nresponse\!\n\s*$"
        ).match(messages[1])
        self.assertNotEqual(None, response_match)
        self.assertEqual(url, response_match.group('url'))
        headers = json.loads(response_match.group('headers'))
        self.assertEqual(['application/json'], headers.get('Content-Type'))
开发者ID:derwolfe,项目名称:mimic,代码行数:34,代码来源:test_resource.py


示例3: start_upload_server

def start_upload_server():
	import argparse

	from twisted.internet import reactor
	from twisted.logger import Logger, globalLogPublisher, STDLibLogObserver
	from twisted.web.server import Site
	from twisted.web.resource import Resource

	from cheesepi.server.upload import UploadHandler

	# Argument parsing
	parser = argparse.ArgumentParser()
	parser.add_argument('--port', type=int, default=18090,
	                    help='Port to listen on')
	args = parser.parse_args()

	init_logging()

	# Make twisted logging write to pythons logging module
	globalLogPublisher.addObserver(STDLibLogObserver(name="cheesepi.server.upload"))

	# Use twisted logger when in twisted
	log = Logger()

	root = Resource()
	root.putChild("upload", UploadHandler())
	upload_server = Site(root)

	reactor.listenTCP(args.port, upload_server)
	log.info("Starting upload server on port %d..." % args.port)
	reactor.run()
开发者ID:haf,项目名称:cheesepi,代码行数:31,代码来源:utils.py


示例4: start_control_server

def start_control_server():
	import argparse

	from twisted.internet import reactor
	from twisted.logger import Logger, globalLogPublisher, STDLibLogObserver

	from cheesepi.server.control import (CheeseRPCServerFactory,
	                                     CheeseRPCServer)
	from cheesepi.server.storage.mongo import MongoDAO

	# Argument parsing
	parser = argparse.ArgumentParser()
	parser.add_argument('--port', type=int, default=18080,
	                    help='Port to listen on')
	args = parser.parse_args()

	init_logging()

	# Make twisted logging write to pythons logging module
	globalLogPublisher.addObserver(STDLibLogObserver(name="cheesepi.server.control"))

	# Use twisted logger when in twisted
	log = Logger()

	# Logging
	#log = Logger()
	#globalLogPublisher.addObserver(PrintingObserver())

	#dao = MongoDAO()
	dao = MongoDAO('localhost', 27017)
	control_server = CheeseRPCServer(dao).getStreamFactory(CheeseRPCServerFactory)

	reactor.listenTCP(args.port, control_server)
	log.info("Starting control server on port %d..." % args.port)
	reactor.run()
开发者ID:haf,项目名称:cheesepi,代码行数:35,代码来源:utils.py


示例5: startService

    def startService(self):
        self.stdlib_cleanup = stdlib_logging_to_eliot_configuration(getLogger())
        self.twisted_observer = TwistedLoggerToEliotObserver()
        globalLogPublisher.addObserver(self.twisted_observer)

        for dest in self.destinations:
            add_destination(dest)
开发者ID:LeastAuthority,项目名称:leastauthority.com,代码行数:7,代码来源:eliot_destination.py


示例6: noiseControl

def noiseControl(options):
    # terminal noise/info logic
    # allows the specification of the log file location
    if not options["loud"]:
        log_path = options["log"]
        globalLogPublisher.addObserver(hendrixObserver(log_path))
    return None
开发者ID:pombredanne,项目名称:hendrix,代码行数:7,代码来源:ux.py


示例7: __init__

 def __init__(self, reactor, config_filename):
     self._network = None
     self._proc = None
     self._reactor = reactor
     self._config_filename = config_filename
     self.connections = dict()
     with open(config_filename) as f:
         self.config = json.load(f)
     f = open(self.core_config["log_file"], "a")
     globalLogPublisher.addObserver(textFileLogObserver(f))
     self.api = ApiProxy(self._reactor)
     self.server_factory = pb.PBServerFactory(self.api)
开发者ID:nanonyme,项目名称:nanobot,代码行数:12,代码来源:nanobot.py


示例8: init_logging

def init_logging(log_level):
    """
    Initialise the logging by adding an observer to the global log publisher.

    :param str log_level: The minimum log level to log messages for.
    """
    log_level_filter = LogLevelFilterPredicate(
        LogLevel.levelWithName(log_level))
    log_level_filter.setLogLevelForNamespace(
        'twisted.web.client._HTTP11ClientFactory', LogLevel.warn)
    log_observer = FilteringLogObserver(
        textFileLogObserver(sys.stdout), [log_level_filter])
    globalLogPublisher.addObserver(log_observer)
开发者ID:praekeltfoundation,项目名称:marathon-acme,代码行数:13,代码来源:cli.py


示例9: getLogger

def getLogger(level):

    loglevel = getattr(LogLevel, level)
    filter_ = LogLevelFilterPredicate(defaultLogLevel=loglevel)
    if loglevel > LogLevel.debug:
        filter_.setLogLevelForNamespace('stdout', LogLevel.warn)
    observer = FilteringLogObserver(stdoutFileLogObserver(), [filter_])
#     observer = FilteringLogObserver(globalLogPublisher, [filter])
#     log = Logger()

#     globalLogBeginner.beginLoggingTo([observer])
    globalLogPublisher.addObserver(observer)
    return lambda event: None
开发者ID:bverdu,项目名称:onDemand,代码行数:13,代码来源:logger.py


示例10: startService

    def startService(self):
        super(SpreadFlowService, self).startService()

        if self.options['confpath']:
            confpath = self.options['confpath']
        else:
            confpath = os.path.join(os.getcwd(), 'spreadflow.conf')

        stream = config_eval(confpath)

        pipeline = list()
        pipeline.append(AliasResolverPass())
        pipeline.append(PortsValidatorPass())

        if self.options['multiprocess']:
            pipeline.append(PartitionExpanderPass())
            pipeline.append(PartitionBoundsPass())
            if self.options['partition']:
                pipeline.append(PartitionWorkerPass())
                partition = self.options['partition']
                stream.append(AddTokenOp(PartitionSelectToken(partition)))
            else:
                pipeline.append(PartitionControllersPass())

        pipeline.append(ComponentsPurgePass())
        pipeline.append(EventHandlersPass())

        for compiler_step in pipeline:
            stream = compiler_step(stream)

        self._eventdispatcher = EventDispatcher()

        if self.options['oneshot']:
            self._eventdispatcher.add_listener(JobEvent, 0, self._oneshot_job_event_handler)

        connection_parser = ConnectionParser()
        stream = connection_parser.extract(stream)
        self._scheduler = Scheduler(connection_parser.get_portmap(), self._eventdispatcher)

        event_handler_parser = EventHandlerParser()
        stream = event_handler_parser.extract(stream)
        for event_type, priority, callback in event_handler_parser.get_handlers():
            self._eventdispatcher.add_listener(event_type, priority, callback)

        if self.options['queuestatus']:
            statuslog = SpreadFlowQueuestatusLogger(self.options['queuestatus'])
            statuslog.watch(1, self._scheduler)
            globalLogPublisher.addObserver(statuslog.logstatus)

        self._scheduler.run().addBoth(self._stop)
开发者ID:spreadflow,项目名称:spreadflow-core,代码行数:50,代码来源:service.py


示例11: test_doStartLoggingStatement

    def test_doStartLoggingStatement(self):
        """
        L{Factory.doStart} logs that it is starting a factory, followed by
        the L{repr} of the L{Factory} instance that is being started.
        """
        events = []
        globalLogPublisher.addObserver(events.append)
        self.addCleanup(
            lambda: globalLogPublisher.removeObserver(events.append))

        f = Factory()
        f.doStart()

        self.assertIs(events[0]['factory'], f)
        self.assertEqual(events[0]['log_level'], LogLevel.info)
        self.assertEqual(events[0]['log_format'],
                         'Starting factory {factory!r}')
开发者ID:Architektor,项目名称:PySnip,代码行数:17,代码来源:test_protocol.py


示例12: redirect_to_twisted

    def redirect_to_twisted(self):
        """
        Redirect Eliot logs to Twisted.

        @return: L{list} of L{dict} - the log messages written to Twisted will
             eventually be appended to this list.
        """
        written = []

        def got_event(event):
            if event.get("log_namespace") == "eliot":
                written.append((event["log_level"].name, event["eliot"]))

        globalLogPublisher.addObserver(got_event)
        self.addCleanup(globalLogPublisher.removeObserver, got_event)
        destination = TwistedDestination()
        addDestination(destination)
        self.addCleanup(removeDestination, destination)
        return written
开发者ID:ClusterHQ,项目名称:eliot,代码行数:19,代码来源:test_twisted.py


示例13: importFailureObserver

    optParameters = []

    optFlags = [
        ['help', 'h', 'Display this help and exit.']
    ]


@provider(ILogObserver)
def importFailureObserver(event):
    if 'failure' in event and event['failure'].type is ImportError:
        log.err("ERROR: %s. Please run `pip install -U -r requirements.txt` "
                "from Cowrie's install directory and virtualenv to install "
                "the new dependency" % event['failure'].value.message)


globalLogPublisher.addObserver(importFailureObserver)


@implementer(IServiceMaker, IPlugin)
class CowrieServiceMaker(object):
    tapname = "cowrie"
    description = "She sells sea shells by the sea shore."
    options = Options
    output_plugins = None

    def makeService(self, options):
        """
        Construct a TCPServer from a factory defined in Cowrie.
        """

        if options["help"] is True:
开发者ID:micheloosterhof,项目名称:cowrie,代码行数:31,代码来源:cowrie_plugin.py


示例14: startService

 def startService(self):
     self.stdlib_cleanup = _stdlib_logging_to_eliot_configuration(getLogger())
     self.twisted_observer = _TwistedLoggerToEliotObserver()
     globalLogPublisher.addObserver(self.twisted_observer)
     add_destinations(*self.destinations)
     return Service.startService(self)
开发者ID:tahoe-lafs,项目名称:tahoe-lafs,代码行数:6,代码来源:eliotutil.py


示例15: run

def run():
    """
    Entry point into (native) worker processes. This wires up stuff such that
    a worker instance is talking WAMP-over-stdio to the node controller.
    """
    import os
    import sys
    import platform
    import signal

    # Ignore SIGINT so we get consistent behavior on control-C versus
    # sending SIGINT to the controller process. When the controller is
    # shutting down, it sends TERM to all its children but ctrl-C
    # handling will send a SIGINT to all the processes in the group
    # (so then the controller sends a TERM but the child already or
    # will very shortly get a SIGINT as well). Twisted installs signal
    # handlers, but not for SIGINT if there's already a custom one
    # present.

    def ignore(sig, frame):
        log.debug("Ignoring SIGINT in worker.")
    signal.signal(signal.SIGINT, ignore)

    # create the top-level parser
    #
    import argparse
    parser = argparse.ArgumentParser()

    parser.add_argument('--reactor',
                        default=None,
                        choices=['select', 'poll', 'epoll', 'kqueue', 'iocp'],
                        help='Explicit Twisted reactor selection (optional).')

    parser.add_argument('--loglevel',
                        default="info",
                        choices=['none', 'error', 'warn', 'info', 'debug', 'trace'],
                        help='Initial log level.')

    parser.add_argument('-c',
                        '--cbdir',
                        type=str,
                        help="Crossbar.io node directory (required).")

    parser.add_argument('-n',
                        '--node',
                        type=str,
                        help='Crossbar.io node ID (required).')

    parser.add_argument('-w',
                        '--worker',
                        type=str,
                        help='Crossbar.io worker ID (required).')

    parser.add_argument('-r',
                        '--realm',
                        type=str,
                        help='Crossbar.io node (management) realm (required).')

    parser.add_argument('-t',
                        '--type',
                        choices=['router', 'container'],
                        help='Worker type (required).')

    parser.add_argument('--title',
                        type=str,
                        default=None,
                        help='Worker process title to set (optional).')

    options = parser.parse_args()

    # make sure logging to something else than stdio is setup _first_
    #
    from crossbar._logging import make_JSON_observer, cb_logging_aware, _stderr
    from crossbar._logging import make_logger, start_logging, set_global_log_level
    from twisted.logger import globalLogPublisher as log_publisher

    # Set the global log level
    set_global_log_level(options.loglevel)

    log = make_logger()

    # Print a magic phrase that tells the capturing logger that it supports
    # Crossbar's rich logging
    print(cb_logging_aware, file=_stderr)
    _stderr.flush()

    flo = make_JSON_observer(_stderr)
    log_publisher.addObserver(flo)
    start_logging()

    try:
        import setproctitle
    except ImportError:
        log.debug("Could not set worker process title (setproctitle not installed)")
    else:
        # set process title if requested to
        #
        if options.title:
            setproctitle.setproctitle(options.title)
        else:
#.........这里部分代码省略.........
开发者ID:gluegl,项目名称:crossbar,代码行数:101,代码来源:process.py


示例16: __enter__

 def __enter__(self):
     set_global_log_level(self.desired_level)
     globalLogPublisher.addObserver(self.logs.append)
     return self
开发者ID:RonaldChristopher,项目名称:crossbar,代码行数:4,代码来源:_logging.py


示例17:

import io
from twisted.logger import jsonFileLogObserver, globalLogPublisher
from ad_hoc import AdHoc

globalLogPublisher.addObserver(jsonFileLogObserver(io.open("log.json", "a")))

AdHoc(3, 4).logMessage()
开发者ID:vmarkovtsev,项目名称:twisted,代码行数:7,代码来源:ad_hoc_save.py


示例18: apply_logging


#.........这里部分代码省略.........
                _logger = loggers.get(record.name, None)
                if _logger is None:
                    _logger = loggers[record.name] = Logger(record.name)

                t = self.format(record)

                if six.PY2 and isinstance(t, str):
                    # Libraries outside of our control can throw exceptions with
                    # a "str with known encoding", which can cause issues down
                    # the logging pipeline. Her
                    t = t.decode('utf8', errors='replace')

                _logger.emit(LOGLEVEL_TWISTED_MAP[record.levelno], log_text=t)

        if self.logger_dest is not None:
            from twisted.python.logfile import DailyLogFile

            class DailyLogWithLeadingZero(DailyLogFile):
                def suffix(self, tupledate):
                    # this closely imitates the same function from parent class
                    try:
                        return '-'.join(("%02d" % i for i in tupledate))
                    except:
                        # try taking a float unixtime
                        return '-'.join(("%02d" % i for i in
                                                        self.toDate(tupledate)))

            self.logger_dest = abspath(self.logger_dest)
            if access(dirname(self.logger_dest), os.R_OK | os.W_OK):
                log_dest = DailyLogWithLeadingZero \
                                                 .fromFullPath(self.logger_dest)

            else:
                Logger().warn("%r is not accessible. We need rwx on it to "
                               "rotate logs." % dirname(self.logger_dest))
                log_dest = open(self.logger_dest, 'w+')

            formatter = logging.Formatter(self.LOGGING_PROD_FORMAT)

        else:
            formatter = logging.Formatter(self.LOGGING_DEVEL_FORMAT)
            log_dest = sys.stdout

            try:
                import colorama
                colorama.init()
                logger.debug("colorama loaded.")

            except Exception as e:
                logger.debug("coloarama not loaded: %r" % e)

        def record_as_string(record):
            if 'log_failure' in record:
                failure = record['log_failure']
                try:
                    s = pformat(vars(failure.value))
                except TypeError:
                    # vars() argument must have __dict__ attribute
                    s = repr(failure.value)
                return "%s: %s" % (failure.type, s)

            if 'log_text' in record:
                return record['log_text'] + "\n"

            if 'log_format' in record:
                level = record.get('log_level', LogLevel.debug)
                level = LOGLEVEL_MAP_ABB[TWISTED_LOGLEVEL_MAP[level]]

                text = record['log_format'].format(**record) + "\n"
                ns = record.get('log_namespace', "???")
                lineno = 0
                record = logging.LogRecord('?', level, ns, lineno, text,
                                                                     None, None)
                record.l = level
                record.module = ns.split('.')[-2]

                return formatter.format(record)

            if 'log_io' in record:
                return record['log_io'] + "\n"

            if 'message' in record:
                return record['message'] + "\n"

            return pformat(record)

        observer = FileLogObserver(log_dest, record_as_string)
        globalLogPublisher.addObserver(observer)
        self._clear_other_observers(globalLogPublisher, observer)

        handler = TwistedHandler()
        handler.setFormatter(formatter)
        logging.getLogger().addHandler(handler)

        self.pre_logging_apply()

        for l in self._loggers or []:
            l.apply()

        return self
开发者ID:cemrecan,项目名称:neurons,代码行数:101,代码来源:config.py


示例19: Logger

from __future__ import print_function, absolute_import

from twisted.internet import reactor
from twisted.logger import Logger, globalLogPublisher

from cheeselib.logger import PrintingObserver
from cheeselib.server.rpc import CheeseRPCServerFactory, CheeseRPCServer
from cheeselib.server.storage.mongo import MongoDAO

SERVER_PORT = 18080

log = Logger()

globalLogPublisher.addObserver(PrintingObserver())

dao = MongoDAO()

rpc_server = CheeseRPCServer(dao).getStreamFactory(CheeseRPCServerFactory)

reactor.listenTCP(SERVER_PORT, rpc_server)
log.info("Starting server on port %d..." % SERVER_PORT)
reactor.run()
开发者ID:haf,项目名称:cheesepi,代码行数:22,代码来源:test_server.py


示例20: import

from .defaults import DEFAULT_LOG_FILE
from twisted.logger import (
    ILogObserver, jsonFileLogObserver, FilteringLogObserver,
    LogLevelFilterPredicate, LogLevel, globalLogPublisher
)
from zope.interface import provider

import io


@provider(ILogObserver)
def hendrixObserver(path=DEFAULT_LOG_FILE, log_level=LogLevel.warn):
    json_observer = jsonFileLogObserver(
        io.open(path, 'a')
    )
    return FilteringLogObserver(
        json_observer,
        [LogLevelFilterPredicate(log_level), ]
    )


globalLogPublisher.addObserver(hendrixObserver(log_level=LogLevel.debug))
开发者ID:SlashRoot,项目名称:hendrix,代码行数:22,代码来源:logger.py



注:本文中的twisted.logger.globalLogPublisher.addObserver函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python smtp.rfc822date函数代码示例发布时间:2022-05-27
下一篇:
Python globalLogBeginner.beginLoggingTo函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap