本文整理汇总了Python中structlog.get_logger函数的典型用法代码示例。如果您正苦于以下问题:Python get_logger函数的具体用法?Python get_logger怎么用?Python get_logger使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_logger函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: main
def main(args):
init_logger(level=args.log_level.upper())
GITHUB_ACCESS_TOKEN = os.environ.get('GITHUB_ACCESS_TOKEN', None)
if GITHUB_ACCESS_TOKEN is None:
print("Missing GITHUB_ACCESS_TOKEN!", file=sys.stderr)
sys.exit(1)
RTM_API_KEY = os.environ.get('RTM_API_KEY', None)
RTM_SHARED_SECRET = os.environ.get('RTM_SHARED_SECRET', None)
RTM_TOKEN = os.environ.get('RTM_TOKEN', None)
if None in (RTM_API_KEY, RTM_SHARED_SECRET, RTM_TOKEN):
print("Missing RTM_API_KEY, RTM_SHARED_SECRET or RTM_TOKEN!", file=sys.stderr)
sys.exit(1)
log = get_logger()
log = log.bind(component="main")
log.info("gh.init")
gh = github.GitHub(access_token=GITHUB_ACCESS_TOKEN)
log.info("gh.get_issues")
issues = get_github_issues(gh, GITHUB_ISSUE_QUERIES)
log.info("rtm.init")
rtm = createRTM(RTM_API_KEY, RTM_SHARED_SECRET, RTM_TOKEN)
log.info("complete_missing_issues.start")
complete_missing_issues(rtm, issues)
log.info("complete_missing_issues.finish")
log.info("add_new_issues.start")
add_new_issues(rtm, issues)
log.info("add_new_issues.finish")
开发者ID:zoni,项目名称:rtm,代码行数:33,代码来源:gh2rtm.py
示例2: update_logging
def update_logging(instance_id, vcore_id):
"""
Add the vcore id to the structured logger
:param vcore_id: The assigned vcore id
:return: structure logger
"""
def add_exc_info_flag_for_exception(_, name, event_dict):
if name == 'exception':
event_dict['exc_info'] = True
return event_dict
def add_instance_id(_, __, event_dict):
event_dict['instance_id'] = instance_id
return event_dict
def add_vcore_id(_, __, event_dict):
event_dict['vcore_id'] = vcore_id
return event_dict
processors = [
add_exc_info_flag_for_exception,
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
add_instance_id,
add_vcore_id,
FluentRenderer(),
]
structlog.configure(processors=processors)
# Mark first line of log
log = structlog.get_logger()
log.info("updated-logger")
return log
开发者ID:gcgirish-radisys,项目名称:voltha,代码行数:33,代码来源:structlog_setup.py
示例3: __init__
def __init__(self, **options):
#: logging facility
self.logger = get_logger()
#: keep track of retry attempts
self.retry_attempt = 0
#: retry limit
self.retry_limit = options.get('retry_limit', 30)
#: retry interval
self.retry_interval = options.get('retry_interval', 3)
#: rabbit mq connection
self.connection = None
#: rabbit mq channnels
self.channels = dict()
#: services callback
self.services_callback = None
#: keep track of whether or not we are retrying
self.retry_mode = False
开发者ID:meantheory,项目名称:wrabbit,代码行数:25,代码来源:rmq.py
示例4: restore_db
def restore_db(file_name):
logger = get_logger(__name__).bind(
action='restore_db'
)
logger.info('starting')
command = "pg_restore --verbose --clean --no-acl --no-owner" \
" -h {host} -p {port} -U {username} -d {db_name} {file_path}"
if settings.POSTGRES_PASSWORD:
command = 'PGPASSWORD={password} ' + command
file_path = "/usr/src/app/core/{}".format(file_name)
full_command = command.format(
password=settings.POSTGRES_PASSWORD,
username=settings.POSTGRES_USER,
host=settings.POSTGRES_HOST,
port=settings.POSTGRES_PORT,
file_path=file_path,
db_name=settings.POSTGRES_DB
)
logger.debug('full_command', command=full_command)
os.system(full_command)
logger.debug('finished successfully')
#'PGPASSWORD=mysecretpassword pg_dump -U postgres -h db -p 5432 -d slash_air -w'
# pg_dump -h db -p 5432 -F c -O -U postgres postgres > backup.dump
# pg_restore --verbose --clean --no-acl --no-owner -h db -p 5432 -U postgres -d postgres backup.dump
# /usr/src/app/core
开发者ID:eliaskioni,项目名称:DbBackup,代码行数:32,代码来源:backup_restore_db.py
示例5: backup
def backup():
logger = get_logger(__name__).bind(
action='backup_db'
)
logger.info('starting')
command = 'pg_dump -U ' \
'{username} -h {host} -p {port} -F tar -O -d {db_name} -f {file} -w'
if settings.POSTGRES_PASSWORD:
command = 'PGPASSWORD={password} ' + command
file_path = '/usr/src/app/core/dump_file_{unique_id}.tar'.format(unique_id=uuid.uuid4())
full_command = command.format(password=settings.POSTGRES_PASSWORD,
username=settings.POSTGRES_USER,
host=settings.POSTGRES_HOST,
port=settings.POSTGRES_PORT,
file=file_path,
db_name=settings.POSTGRES_DB
)
logger.debug('full_command', command=full_command)
os.system(full_command)
logger.debug('finished_successfully')
return file_path
开发者ID:eliaskioni,项目名称:DbBackup,代码行数:26,代码来源:backup_restore_db.py
示例6: delete_process
def delete_process(request):
event_log = request.app.get('smartmob.event_log') or structlog.get_logger()
# Resolve the process.
processes = request.app.setdefault('smartmob.processes', {})
slug = request.match_info['slug']
try:
process = processes[slug]
except KeyError:
raise web.HTTPNotFound
# Log the request.
event_log.info('process.delete', slug=slug)
# Kill the process and wait for it to complete.
process['stop'].set_result(None)
try:
yield from process['task']
except Exception: # TODO: be more accurate!
pass
# Erase bookkeeping.
del processes[slug]
# Format the response.
return web.Response(
content_type='application/json',
body=json.dumps({
# ...
}).encode('utf-8'),
)
开发者ID:smartmob-project,项目名称:smartmob-agent,代码行数:32,代码来源:__init__.py
示例7: attach_console
def attach_console(request):
event_log = request.app.get('smartmob.event_log') or structlog.get_logger()
# Must connect here with a WebSocket.
if request.headers.get('Upgrade', '').lower() != 'websocket':
pass
# Resolve the process.
processes = request.app.setdefault('smartmob.processes', {})
slug = request.match_info['slug']
if slug not in processes:
raise web.HTTPNotFound
# WebSocket handshake.
stream = web.WebSocketResponse()
yield from stream.prepare(request)
# TODO: retrieve data from the process and pipe it to the WebSocket.
# Strawboss implementation doesn't provide anything for this at the
# moment, so we'll have to do this later.
# Log the request.
event_log.info('process.attach', slug=slug)
# Close the WebSocket.
yield from stream.close()
# Required by the framework, but I don't know why.
return stream
开发者ID:smartmob-project,项目名称:smartmob-agent,代码行数:30,代码来源:__init__.py
示例8: test_mozdef
def test_mozdef(app):
sent = []
orig_MozDefEvent = mozdef_client.MozDefEvent
with mock.patch('mozdef_client.MozDefEvent') as MozDefEvent:
def constructor(target):
msg = orig_MozDefEvent(target)
msg.send = lambda: sent.append(msg)
return msg
MozDefEvent.side_effect = constructor
logger = structlog.get_logger(__name__)
logger.warn("unseen")
logger.warn("test message", mozdef=True)
logger.warn("with attr", attr="foo", mozdef=True)
# check that 'unseen' wasn't seen, since mozdef was not true
eq_({m.summary for m in sent}, {"test message", "with attr"})
# check a few other fields in one of the messages
msg = sent[0]
eq_(msg.source, __name__)
eq_(msg._severity, orig_MozDefEvent.SEVERITY_WARNING)
eq_(msg.tags, ['relengapi'])
# and verify that the attribute showed up in details
assert any([m.details.get('attr') == 'foo' for m in sent])
开发者ID:azizadnan,项目名称:build-relengapi,代码行数:26,代码来源:test_lib_logging.py
示例9: run
def run(host, port, shell_host, shell_port, project_config, log,
application_factory=web.Application,
config_factory=Config,
config_source_factory=YamlFileConfigSource,
push_handler_factory=PushHandler,
repo_manager_factory=RepoManager,
cmd_factory=MergeitShell,
telnet_shell_factory=CmdTelsh,
telnet_server_factory=TelnetServer):
logger = get_logger()
loop = asyncio.get_event_loop()
app = application_factory()
config = config_factory(config_source_factory(project_config))
init_logging(log, config.get('name'))
app.router.add_route('POST', '/push', lambda request: gitlab_push(request, config))
logger.info('application_start')
loop.run_until_complete(loop.create_server(app.make_handler(), host, port))
shell = cmd_factory(config=config,
push_handler_factory=push_handler_factory,
repo_manager_factory=repo_manager_factory,
forward=True)
shell_factory = (lambda server, stream=TelnetShellStream, log=logging:
telnet_shell_factory(server, stream, log, cmd=shell))
loop.run_until_complete(
loop.create_server(lambda: telnet_server_factory(log=logging.getLogger(telnetlib3.__name__), shell=shell_factory),
shell_host, shell_port)
)
try:
loop.run_forever()
except KeyboardInterrupt:
logger.info('application_interrupt')
开发者ID:insolite,项目名称:mergeit,代码行数:31,代码来源:server.py
示例10: send_invite_email
def send_invite_email(self):
from sentry.utils.email import MessageBuilder
context = {
'email': self.email,
'organization': self.organization,
'url': absolute_uri(reverse('sentry-accept-invite', kwargs={
'member_id': self.id,
'token': self.token,
})),
}
msg = MessageBuilder(
subject='Join %s in using Sentry' % self.organization.name,
template='sentry/emails/member-invite.txt',
html_template='sentry/emails/member-invite.html',
type='organization.invite',
context=context,
)
try:
msg.send_async([self.get_email()])
except Exception as e:
logger = get_logger(name='sentry.mail')
logger.exception(e)
开发者ID:WhoTrades,项目名称:sentry,代码行数:25,代码来源:organizationmember.py
示例11: get_github_issues
def get_github_issues(gh, queries):
"""
Return a dict of issues found matching specified filters.
:param gh: github.GitHub client
:param queries: A list containing individual search queries to perform
:return: A dictionary whose elements are in the form of
{'repo#000: Summary': {..data from github}}
"""
log = get_logger()
log = log.bind(component='get_github_issues')
issues = {}
results = []
for query in queries:
log.debug("gh.issues.get", q=query)
items = gh.issues.get(**query)
log.debug("gh.issues.result", q=query, result_count=len(items))
results += items
for issue in results:
title = "{repository[name]}#{number}: {title}".format(**issue).strip()
log.debug("gh.parse_issue", title=title)
if title not in issues:
issues[title] = issue
return issues
开发者ID:zoni,项目名称:rtm,代码行数:27,代码来源:gh2rtm.py
示例12: get_logger
def get_logger(level=logging.DEBUG, name=None, stream=DEFAULT_STREAM):
"""Configure and return a logger with structlog and stdlib."""
wrap_dict_class = structlog.threadlocal.wrap_dict(dict)
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt='iso'),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(sort_keys=True)
],
context_class=wrap_dict_class,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True)
log = structlog.get_logger(name)
if not _has_streamhandler(logging.getLogger(name),
level=level, stream=stream):
streamhandler = logging.StreamHandler(stream)
streamhandler.setLevel(level)
streamhandler.setFormatter(logging.Formatter(fmt=LOG_FORMAT))
log.addHandler(streamhandler)
log.setLevel(level)
return log
开发者ID:Gifflen,项目名称:fleece,代码行数:27,代码来源:log.py
示例13: emit
def emit(self, record, logger=get_logger()):
# If anyone wants to use the 'extra' kwarg to provide context within
# structlog, we have to strip all of the default attributes from
# a record because the RootLogger will take the 'extra' dictionary
# and just turn them into attributes.
kwargs = {
k: v for k, v in six.iteritems(vars(record)) if k not in throwaways and v is not None
}
kwargs.update({
'level': record.levelno,
'event': record.msg,
})
if record.args:
# record.args inside of LogRecord.__init__ gets unrolled
# if it's the shape `({},)`, a single item dictionary.
# so we need to check for this, and re-wrap it because
# down the line of structlog, it's expected to be this
# original shape.
if isinstance(record.args, (tuple, list)):
kwargs['positional_args'] = record.args
else:
kwargs['positional_args'] = (record.args, )
logger.log(**kwargs)
开发者ID:alexandrul,项目名称:sentry,代码行数:25,代码来源:handlers.py
示例14: __init__
def __init__(self, core, logical_device):
try:
self.core = core
self.local_handler = core.get_local_handler()
self.logical_device_id = logical_device.id
self.root_proxy = core.get_proxy('/')
self.flows_proxy = core.get_proxy(
'/logical_devices/{}/flows'.format(logical_device.id))
self.groups_proxy = core.get_proxy(
'/logical_devices/{}/flow_groups'.format(logical_device.id))
self.self_proxy = core.get_proxy(
'/logical_devices/{}'.format(logical_device.id))
self.flows_proxy.register_callback(
CallbackType.POST_UPDATE, self._flow_table_updated)
self.groups_proxy.register_callback(
CallbackType.POST_UPDATE, self._group_table_updated)
self.self_proxy.register_callback(
CallbackType.POST_ADD, self._port_added)
self.self_proxy.register_callback(
CallbackType.POST_REMOVE, self._port_removed)
self.port_proxy = {}
self.event_bus = EventBusClient()
self.packet_in_subscription = self.event_bus.subscribe(
topic='packet-in:{}'.format(logical_device.id),
callback=self.handle_packet_in_event)
self.log = structlog.get_logger(logical_device_id=logical_device.id)
self._routes = None
except Exception, e:
self.log.exception('init-error', e=e)
开发者ID:gcgirish-radisys,项目名称:voltha,代码行数:35,代码来源:logical_device_agent.py
示例15: main
def main():
"""Initializes the application
"""
# Load settings and create the config object
config = Configuration()
settings = config.settings
# Set up logger
logs.configure_logging(settings['log_level'], settings['log_mode'])
logger = structlog.get_logger()
# Configure and run configured behaviour.
exchange_interface = ExchangeInterface(config.exchanges)
notifier = Notifier(config.notifiers)
behaviour = Behaviour(
config,
exchange_interface,
notifier
)
while True:
behaviour.run(settings['market_pairs'], settings['output_mode'])
logger.info("Sleeping for %s seconds", settings['update_interval'])
time.sleep(settings['update_interval'])
开发者ID:adrielliu,项目名称:crypto-signal,代码行数:25,代码来源:app.py
示例16: __init__
def __init__(self, notifier_config):
self.logger = structlog.get_logger()
self.twilio_configured = self.__validate_required_config('twilio', notifier_config)
if self.twilio_configured:
self.twilio_client = TwilioNotifier(
twilio_key=notifier_config['twilio']['required']['key'],
twilio_secret=notifier_config['twilio']['required']['secret'],
twilio_sender_number=notifier_config['twilio']['required']['sender_number'],
twilio_receiver_number=notifier_config['twilio']['required']['receiver_number']
)
self.slack_configured = self.__validate_required_config('slack', notifier_config)
if self.slack_configured:
self.slack_client = SlackNotifier(
slack_key=notifier_config['slack']['required']['key'],
slack_channel=notifier_config['slack']['required']['channel']
)
self.gmail_configured = self.__validate_required_config('gmail', notifier_config)
if self.gmail_configured:
self.gmail_client = GmailNotifier(
username=notifier_config['gmail']['required']['username'],
password=notifier_config['gmail']['required']['password'],
destination_addresses=notifier_config['gmail']['required']['destination_emails']
)
self.integram_configured = self.__validate_required_config('integram', notifier_config)
if self.integram_configured:
self.integram_client = IntegramNotifier(
url=notifier_config['integram']['required']['url']
)
开发者ID:cristoferdomingues,项目名称:crypto-signal,代码行数:31,代码来源:notification.py
示例17: __init__
def __init__(self, username, password, destination_addresses):
self.logger = structlog.get_logger()
smtp_server = 'smtp.gmail.com:587'
self.smtp_handler = smtplib.SMTP(smtp_server)
self.username = username
self.password = password
self.destination_addresses = ','.join(destination_addresses)
开发者ID:cristoferdomingues,项目名称:crypto-signal,代码行数:7,代码来源:gmail_client.py
示例18: __init__
def __init__(self, database_config):
connection_string = self.__create_connection_string(database_config)
engine = create_engine(connection_string)
Base.metadata.create_all(engine)
session = sessionmaker(bind=engine)
self.session = session()
self.logger = structlog.get_logger()
开发者ID:cristoferdomingues,项目名称:crypto-signal,代码行数:7,代码来源:database.py
示例19: __init__
def __init__(self, namespace=None, source=None):
if not namespace:
try:
frame = inspect.currentframe(1)
namespace = frame.f_globals['__name__']
except:
namespace = 'unknown'
self._log = structlog.get_logger(namespace=namespace)
开发者ID:wrapp,项目名称:pywrapplog,代码行数:8,代码来源:wrapplog.py
示例20: __init__
def __init__(self, handler, alloc_id, traffic_descriptor, entity_id,
name=None, vont_ani=None, is_mock=False):
super(OnuTCont, self).__init__(alloc_id, traffic_descriptor,
name=name, vont_ani=vont_ani)
self._handler = handler
self._is_mock = is_mock
self._entity_id = entity_id
self.log = structlog.get_logger(device_id=handler.device_id, alloc_id=alloc_id)
开发者ID:gcgirish-radisys,项目名称:voltha,代码行数:8,代码来源:onu_tcont.py
注:本文中的structlog.get_logger函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论