本文整理汇总了Python中supervisor.compat.as_string函数的典型用法代码示例。如果您正苦于以下问题:Python as_string函数的具体用法?Python as_string怎么用?Python as_string使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了as_string函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: collect_incoming_data
def collect_incoming_data(self, data):
if PY3:
self.buffer = as_string(self.buffer) + as_string(data)
else:
self.buffer = self.buffer + data
if self.part==self.body:
self.feed(self.buffer)
self.buffer = ''
开发者ID:ngocson2vn,项目名称:supervisor,代码行数:9,代码来源:http_client.py
示例2: test_emit_unicode_witherror
def test_emit_unicode_witherror(self):
handler = self._makeOne()
called = []
def fake_syslog(msg):
if not called:
called.append(msg)
raise UnicodeError
handler._syslog = fake_syslog
record = self._makeLogRecord(as_string('fií'))
handler.emit(record)
self.assertEqual(called, [as_string('fi\xc3\xad')])
开发者ID:Supervisor,项目名称:supervisor,代码行数:11,代码来源:test_loggers.py
示例3: test_emit_unicode_witherror
def test_emit_unicode_witherror(self):
from supervisor.loggers import LevelsByName
handler = self._makeOne()
called = []
def fake_syslog(pri, msg):
if not called:
called.append((pri, msg))
raise UnicodeError
handler._syslog = fake_syslog
record = self._makeLogRecord(as_string('fií'))
handler.emit(record)
self.assertEqual(
called, [(LevelsByName.TRAC, as_string('fi\xc3\xad'))],
)
开发者ID:AmericanCouncils,项目名称:supervisor,代码行数:14,代码来源:test_loggers.py
示例4: _dispatchEvent
def _dispatchEvent(self, event):
pool_serial = event.pool_serials[self.config.name]
for process in self.processes.values():
if process.state != ProcessStates.RUNNING:
continue
if process.listener_state == EventListenerStates.READY:
processname = as_string(process.config.name)
payload = event.payload()
try:
event_type = event.__class__
serial = event.serial
envelope = self._eventEnvelope(event_type, serial,
pool_serial, payload)
process.write(as_bytes(envelope))
except OSError as why:
if why.args[0] != errno.EPIPE:
raise
self.config.options.logger.debug(
'epipe occurred while sending event %s '
'to listener %s, listener state unchanged' % (
event.serial, processname))
continue
process.listener_state = EventListenerStates.BUSY
process.event = event
self.config.options.logger.debug(
'event %s sent to listener %s' % (
event.serial, processname))
return True
return False
开发者ID:the5fire,项目名称:supervisor,代码行数:33,代码来源:process.py
示例5: _acceptEvent
def _acceptEvent(self, event, head=False):
# events are required to be instances
# this has a side effect to fail with an attribute error on 'old style'
# classes
processname = as_string(self.config.name)
if not hasattr(event, 'serial'):
event.serial = new_serial(GlobalSerial)
if not hasattr(event, 'pool_serials'):
event.pool_serials = {}
if self.config.name not in event.pool_serials:
event.pool_serials[self.config.name] = new_serial(self)
else:
self.config.options.logger.debug(
'rebuffering event %s for pool %s (buf size=%d, max=%d)' % (
(event.serial, processname, len(self.event_buffer),
self.config.buffer_size)))
if len(self.event_buffer) >= self.config.buffer_size:
if self.event_buffer:
# discard the oldest event
discarded_event = self.event_buffer.pop(0)
self.config.options.logger.error(
'pool %s event buffer overflowed, discarding event %s' % (
(processname, discarded_event.serial)))
if head:
self.event_buffer.insert(0, event)
else:
self.event_buffer.append(event)
开发者ID:the5fire,项目名称:supervisor,代码行数:28,代码来源:process.py
示例6: test_emit_unicode_noerror
def test_emit_unicode_noerror(self):
from supervisor.loggers import LevelsByName
handler = self._makeOne()
inp = as_string('fií')
record = self._makeLogRecord(inp)
handler.emit(record)
syslog.syslog.assert_called_with(LevelsByName.TRAC, 'fi\xc3\xad')
开发者ID:AmericanCouncils,项目名称:supervisor,代码行数:7,代码来源:test_loggers.py
示例7: test_emit_unicode_noerror
def test_emit_unicode_noerror(self):
handler = self._makeOne(self.filename)
record = self._makeLogRecord(as_string(b"fi\xc3\xad"))
handler.emit(record)
handler.close()
with open(self.filename, "rb") as f:
self.assertEqual(f.read(), b"fi\xc3\xad")
开发者ID:Supervisor,项目名称:supervisor,代码行数:7,代码来源:test_loggers.py
示例8: _assertInState
def _assertInState(self, *states):
if self.state not in states:
current_state = getProcessStateDescription(self.state)
allowable_states = ' '.join(map(getProcessStateDescription, states))
processname = as_string(self.config.name)
raise AssertionError('Assertion failed for %s: %s not in %s' % (
processname, current_state, allowable_states))
开发者ID:the5fire,项目名称:supervisor,代码行数:7,代码来源:process.py
示例9: test_token
def test_token(self):
from supervisor.childutils import listener
from supervisor.dispatchers import PEventListenerDispatcher
token = as_string(PEventListenerDispatcher.READY_FOR_EVENTS_TOKEN)
stdout = StringIO()
listener.ready(stdout)
self.assertEqual(stdout.getvalue(), token)
开发者ID:Supervisor,项目名称:supervisor,代码行数:7,代码来源:test_childutils.py
示例10: request
def request(self, host, handler, request_body, verbose=0):
if not self.connection:
self.connection = self._get_connection()
self.headers = {
"User-Agent" : self.user_agent,
"Content-Type" : "text/xml",
"Accept": "text/xml"
}
# basic auth
if self.username is not None and self.password is not None:
unencoded = "%s:%s" % (self.username, self.password)
encoded = as_string(encodestring(as_bytes(unencoded)))
encoded = encoded.replace('\n', '')
encoded = encoded.replace('\012', '')
self.headers["Authorization"] = "Basic %s" % encoded
self.headers["Content-Length"] = str(len(request_body))
self.connection.request('POST', handler, request_body, self.headers)
r = self.connection.getresponse()
if r.status != 200:
self.connection.close()
self.connection = None
raise xmlrpclib.ProtocolError(host + handler,
r.status,
r.reason,
'' )
data = r.read()
p, u = self.getparser()
p.feed(data)
p.close()
return u.close()
开发者ID:denghp,项目名称:supervisor,代码行数:35,代码来源:xmlrpc.py
示例11: test_fail
def test_fail(self):
from supervisor.childutils import listener
from supervisor.dispatchers import PEventListenerDispatcher
begin = as_string(PEventListenerDispatcher.RESULT_TOKEN_START)
stdout = StringIO()
listener.fail(stdout)
self.assertEqual(stdout.getvalue(), begin + '4\nFAIL')
开发者ID:Supervisor,项目名称:supervisor,代码行数:7,代码来源:test_childutils.py
示例12: payload
def payload(self):
groupname = ''
if self.process.group is not None:
groupname = self.process.group.config.name
try:
data = as_string(self.data)
except UnicodeDecodeError:
data = 'Undecodable: %r' % self.data
# On Python 2, stuff needs to be in Unicode before invoking the
# % operator, otherwise implicit encodings to ASCII can cause
# failures
fmt = as_string('processname:%s groupname:%s pid:%s channel:%s\n%s')
result = fmt % (as_string(self.process.config.name),
as_string(groupname), self.pid,
as_string(self.channel), data)
return result
开发者ID:Supervisor,项目名称:supervisor,代码行数:16,代码来源:events.py
示例13: feed
def feed(self, url, data):
try:
data = as_string(data)
except UnicodeDecodeError:
data = 'Undecodable: %r' % data
sys.stdout.write(data)
sys.stdout.flush()
开发者ID:Supervisor,项目名称:supervisor,代码行数:7,代码来源:http_client.py
示例14: send
def send(self, data, stdout=sys.stdout):
resultlen = len(data)
result = '%s%s\n%s' % (as_string(PEventListenerDispatcher.RESULT_TOKEN_START),
str(resultlen),
data)
stdout.write(result)
stdout.flush()
开发者ID:Supervisor,项目名称:supervisor,代码行数:7,代码来源:childutils.py
示例15: handle_request
def handle_request(self, request):
# authorize a request before handling it...
scheme = get_header(AUTHORIZATION, request.header)
if scheme:
scheme = scheme.lower()
if scheme == "basic":
cookie = get_header(AUTHORIZATION, request.header, 2)
try:
decoded = as_string(decodestring(as_bytes(cookie)))
except:
print_function("malformed authorization info <%s>" % cookie)
request.error(400)
return
auth_info = decoded.split(":", 1)
if self.authorizer.authorize(auth_info):
self.pass_count.increment()
request.auth_info = auth_info
self.handler.handle_request(request)
else:
self.handle_unauthorized(request)
# elif scheme == 'digest':
# print 'digest: ',AUTHORIZATION.group(2)
else:
print("unknown/unsupported auth method: %s" % scheme)
self.handle_unauthorized(request)
else:
# list both? prefer one or the other?
# you could also use a 'nonce' here. [see below]
# auth = 'Basic realm="%s" Digest realm="%s"' % (self.realm, self.realm)
# nonce = self.make_nonce (request)
# auth = 'Digest realm="%s" nonce="%s"' % (self.realm, nonce)
# request['WWW-Authenticate'] = auth
# print 'sending header: %s' % request['WWW-Authenticate']
self.handle_unauthorized(request)
开发者ID:GulsahKose,项目名称:supervisor,代码行数:35,代码来源:auth_handler.py
示例16: test_handle_more_follow_file_recreated
def test_handle_more_follow_file_recreated(self):
request = DummyRequest('/logtail/foo', None, None, None)
f = tempfile.NamedTemporaryFile()
f.write(as_bytes('a' * 80))
f.flush()
producer = self._makeOne(request, f.name, 80)
result = producer.more()
self.assertEqual(result, as_string('a' * 80))
f.close()
f2 = open(f.name, 'wb')
try:
f2.write(as_bytes('b' * 80))
f2.close()
result = producer.more()
finally:
os.unlink(f2.name)
self.assertEqual(result, as_string('b' * 80))
开发者ID:ngocson2vn,项目名称:supervisor,代码行数:17,代码来源:test_http.py
示例17: stop_report
def stop_report(self):
""" Log a 'waiting for x to stop' message with throttling. """
if self.state == ProcessStates.STOPPING:
now = time.time()
if now > (self.laststopreport + 2): # every 2 seconds
self.config.options.logger.info(
'waiting for %s to stop' % as_string(self.config.name))
self.laststopreport = now
开发者ID:the5fire,项目名称:supervisor,代码行数:8,代码来源:process.py
示例18: test_handle_request_does_not_authorize_bad_credentials
def test_handle_request_does_not_authorize_bad_credentials(self):
request = DummyRequest('/logtail/process1', None, None, None)
encoded = base64.b64encode(as_bytes("wrong:wrong"))
request.header = ["Authorization: Basic %s" % as_string(encoded)]
handler = DummyHandler()
auth_handler = self._makeOne({'user':'password'}, handler)
auth_handler.handle_request(request)
self.assertFalse(handler.handled_request)
开发者ID:ngocson2vn,项目名称:supervisor,代码行数:8,代码来源:test_http.py
示例19: __repr__
def __repr__(self):
# repr can't return anything other than a native string,
# but the name might be unicode - a problem on Python 2.
name = self.config.name
if PY2:
name = as_string(name).encode('unicode-escape')
return '<%s instance at %s named %s>' % (self.__class__, id(self),
name)
开发者ID:the5fire,项目名称:supervisor,代码行数:8,代码来源:process.py
示例20: test_handle_more_follow_file_gone
def test_handle_more_follow_file_gone(self):
request = DummyRequest('/logtail/foo', None, None, None)
filename = tempfile.mktemp()
with open(filename, 'wb') as f:
f.write(as_bytes('a' * 80))
try:
producer = self._makeOne(request, f.name, 80)
finally:
os.unlink(f.name)
result = producer.more()
self.assertEqual(result, as_string('a' * 80))
with open(filename, 'wb') as f:
f.write(as_bytes('b' * 80))
try:
result = producer.more() # should open in new file
self.assertEqual(result, as_string('b' * 80))
finally:
os.unlink(f.name)
开发者ID:ngocson2vn,项目名称:supervisor,代码行数:18,代码来源:test_http.py
注:本文中的supervisor.compat.as_string函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论