本文整理汇总了Python中six.b函数的典型用法代码示例。如果您正苦于以下问题:Python b函数的具体用法?Python b怎么用?Python b使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了b函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_bytes_tokens
def test_bytes_tokens(self):
bytes_token = BytesToken(unhexlify(six.b('01')))
self.assertEqual(bytes_token.value, six.b('\x01'))
self.assertEqual(str(bytes_token), "<BytesToken: %s>" % bytes_token.value)
self.assertEqual(bytes_token.hash_fn('123'), '123')
self.assertEqual(bytes_token.hash_fn(123), 123)
self.assertEqual(bytes_token.hash_fn(str(cassandra.metadata.MAX_LONG)), str(cassandra.metadata.MAX_LONG))
开发者ID:mambocab,项目名称:python-driver,代码行数:7,代码来源:test_metadata.py
示例2: decode_from
def decode_from(stream):
'''
object = decode_from(stream)
Decodes the object from the stream ``stream``
Parameters
----------
stream : file-like object
Returns
-------
object : decoded object
'''
stream = decompress_stream(stream)
prefix = stream.read(1)
if not prefix:
return None
elif prefix == six.b('P'):
return pickle.load(stream)
elif prefix == six.b('N'):
import numpy as np
return np.load(stream)
else:
raise IOError("jug.backend.decode_from: unknown prefix '%s'" % prefix)
开发者ID:luispedro,项目名称:jug,代码行数:25,代码来源:encode.py
示例3: readline
def readline(self):
"""Read one entire line from the file.
A trailing newline character is kept in the string (but may be absent
when a file ends with an incomplete line).
"""
bits = [self._buffer if self._buffer_pos == self.tell() else b("")]
indx = bits[-1].find(self._newline)
if indx == -1:
# Read chunks until first newline is found or entire file is read.
while indx == -1:
bit = self.read(self.buffer_size)
bits.append(bit)
if not bit:
break
indx = bit.find(self._newline)
if indx == -1:
return b("").join(bits)
indx += len(self._newline)
extra = bits[-1][indx:]
bits[-1] = bits[-1][:indx]
self._buffer = extra
self._buffer_pos = self.tell()
return b("").join(bits)
开发者ID:jirikuncar,项目名称:xrootdpyfs,代码行数:29,代码来源:xrdfile.py
示例4: PwDecrypt
def PwDecrypt(self, password):
"""Unobfuscate a RADIUS password. RADIUS hides passwords in packets by
using an algorithm based on the MD5 hash of the packet authenticator
and RADIUS secret. This function reverses the obfuscation process.
:param password: obfuscated form of password
:type password: binary string
:return: plaintext password
:rtype: unicode string
"""
buf = password
pw = six.b('')
last = self.authenticator
while buf:
hash = md5_constructor(self.secret + last).digest()
if six.PY3:
for i in range(16):
pw += bytes((hash[i] ^ buf[i],))
else:
for i in range(16):
pw += chr(ord(hash[i]) ^ ord(buf[i]))
(last, buf) = (buf[:16], buf[16:])
while pw.endswith(six.b('\x00')):
pw = pw[:-1]
return pw.decode('utf-8')
开发者ID:jeffl,项目名称:pyrad,代码行数:29,代码来源:packet.py
示例5: testUtf8Validator
def testUtf8Validator(self):
state = validate_utf8(six.b('\xf0\x90\x80\x80'))
self.assertEqual(state, True)
state = validate_utf8(six.b('\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5\xed\xa0\x80edited'))
self.assertEqual(state, False)
state = validate_utf8(six.b(''))
self.assertEqual(state, True)
开发者ID:ensime,项目名称:ensime-sublime,代码行数:7,代码来源:test_websocket.py
示例6: test_return_nesteddict
def test_return_nesteddict(self):
r = self.call('returntypes/getnesteddict',
_rt={wsme.types.bytes: NestedOuter})
self.assertEquals(r, {
b('a'): {'inner': {'aint': 0}},
b('b'): {'inner': {'aint': 0}}
})
开发者ID:Kjir,项目名称:wsme,代码行数:7,代码来源:protocol.py
示例7: test_boto3_list_object_versions
def test_boto3_list_object_versions():
s3 = boto3.client('s3', region_name='us-east-1')
bucket_name = 'mybucket'
key = 'key-with-versions'
s3.create_bucket(Bucket=bucket_name)
s3.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration={
'Status': 'Enabled'
}
)
items = (six.b('v1'), six.b('v2'))
for body in items:
s3.put_object(
Bucket=bucket_name,
Key=key,
Body=body
)
response = s3.list_object_versions(
Bucket=bucket_name
)
# Two object versions should be returned
len(response['Versions']).should.equal(2)
keys = set([item['Key'] for item in response['Versions']])
keys.should.equal({key})
# Test latest object version is returned
response = s3.get_object(Bucket=bucket_name, Key=key)
response['Body'].read().should.equal(items[-1])
开发者ID:netors,项目名称:moto,代码行数:28,代码来源:test_s3.py
示例8: test_watch_readfile
def test_watch_readfile(self):
self.setupWatchers()
self.fs.setcontents("hello", b("hello world"))
self.assertEventOccurred(CREATED,"/hello")
self.clearCapturedEvents()
old_atime = self.fs.getinfo("hello").get("accessed_time")
self.assertEquals(self.fs.getcontents("hello"), b("hello world"))
if not isinstance(self.watchfs,PollingWatchableFS):
# Help it along by updting the atime.
# TODO: why is this necessary?
if self.fs.hassyspath("hello"):
syspath = self.fs.getsyspath("hello")
mtime = os.stat(syspath).st_mtime
atime = int(time.time())
os.utime(self.fs.getsyspath("hello"),(atime,mtime))
self.assertEventOccurred(ACCESSED,"/hello")
elif old_atime is not None:
# Some filesystems don't update atime synchronously, or only
# update it if it's too old, or don't update it at all!
# Try to force the issue, wait for it to change, but eventually
# give up and bail out.
for i in xrange(10):
if self.fs.getinfo("hello").get("accessed_time") != old_atime:
if not self.checkEventOccurred(MODIFIED,"/hello"):
self.assertEventOccurred(ACCESSED,"/hello")
break
time.sleep(0.2)
if self.fs.hassyspath("hello"):
syspath = self.fs.getsyspath("hello")
mtime = os.stat(syspath).st_mtime
atime = int(time.time())
os.utime(self.fs.getsyspath("hello"),(atime,mtime))
开发者ID:anthonybishopric,项目名称:pyboxfs,代码行数:32,代码来源:test_watch.py
示例9: test_egain_on_buffer_size
def test_egain_on_buffer_size(self, *args):
# get a connection that's already fully started
c = self.test_successful_connection()
header = six.b('\x00\x00\x00\x00') + int32_pack(20000)
responses = [
header + (six.b('a') * (4096 - len(header))),
six.b('a') * 4096,
socket_error(errno.EAGAIN),
six.b('a') * 100,
socket_error(errno.EAGAIN)]
def side_effect(*args):
response = responses.pop(0)
if isinstance(response, socket_error):
raise response
else:
return response
c._socket.recv.side_effect = side_effect
c.handle_read(None, 0)
self.assertEqual(c._total_reqd_bytes, 20000 + len(header))
# the EAGAIN prevents it from reading the last 100 bytes
c._iobuf.seek(0, os.SEEK_END)
pos = c._iobuf.tell()
self.assertEqual(pos, 4096 + 4096)
# now tell it to read the last 100 bytes
c.handle_read(None, 0)
c._iobuf.seek(0, os.SEEK_END)
pos = c._iobuf.tell()
self.assertEqual(pos, 4096 + 4096 + 100)
开发者ID:mike-tr-adamson,项目名称:python-driver,代码行数:32,代码来源:test_libevreactor.py
示例10: format
def format(self):
"""
format this object to string(byte array) to send data to server.
"""
if any(x not in (0, 1) for x in [self.fin, self.rsv1, self.rsv2, self.rsv3]):
raise ValueError("not 0 or 1")
if self.opcode not in ABNF.OPCODES:
raise ValueError("Invalid OPCODE")
length = len(self.data)
if length >= ABNF.LENGTH_63:
raise ValueError("data is too long")
frame_header = chr(self.fin << 7 | self.rsv1 << 6 | self.rsv2 << 5 | self.rsv3 << 4 | self.opcode)
if length < ABNF.LENGTH_7:
frame_header += chr(self.mask << 7 | length)
frame_header = six.b(frame_header)
elif length < ABNF.LENGTH_16:
frame_header += chr(self.mask << 7 | 0x7E)
frame_header = six.b(frame_header)
frame_header += struct.pack("!H", length)
else:
frame_header += chr(self.mask << 7 | 0x7F)
frame_header = six.b(frame_header)
frame_header += struct.pack("!Q", length)
if not self.mask:
return frame_header + self.data
else:
mask_key = self.get_mask_key(4)
return frame_header + self._get_masked(mask_key)
开发者ID:elbowz,项目名称:xbmc-pushbullet,代码行数:30,代码来源:__init__.py
示例11: test_watch_writefile
def test_watch_writefile(self):
self.setupWatchers()
self.fs.setcontents("hello", b("hello world"))
self.assertEventOccurred(CREATED,"/hello")
self.clearCapturedEvents()
self.fs.setcontents("hello", b("hello again world"))
self.assertEventOccurred(MODIFIED,"/hello")
开发者ID:anthonybishopric,项目名称:pyboxfs,代码行数:7,代码来源:test_watch.py
示例12: _read_mathp
def _read_mathp(self, data, n):
"""MATHP(4506,45,374) - Record 11"""
nmaterials = 0
s1 = Struct(b(self._endian + 'i7f3i23fi'))
s2 = Struct(b(self._endian + '8i'))
n2 = n
while n2 < n:
edata = data[n:n+140]
n += 140
out1 = s1.unpack(edata)
(mid, a10, a01, d1, rho, alpha, tref, ge, sf, na, nd, kp,
a20, a11, a02, d2,
a30, a21, a12, a03, d3,
a40, a31, a22, a13, a04, d4,
a50, a41, a32, a23, a14, a05, d5,
continue_flag) = out1
data_in = [out1]
if continue_flag:
edata = data[n:n+32] # 7*4
n += 32
out2 = s2.unpack(edata)
(tab1, tab2, tab3, tab4, x1, x2, x3, tab5) = out2
data_in.append(out2)
mat = MATHP.add_op2_data(data_in)
self.add_op2_material(mat)
nmaterials += 1
self.card_count['MATHP'] = nmaterials
return n
开发者ID:EmanueleCannizzaro,项目名称:pyNastran,代码行数:29,代码来源:op2_geom_mpt.py
示例13: encode_multiparts
def encode_multiparts(fields):
"""
Breaks up the multipart_encoded content into first and last part, to be able to "insert" the file content
itself in-between
:param fields: dict() fields to encode
:return:(header_body, close_body, content_type)
"""
(data, content_type) = requests.packages.urllib3.filepost.encode_multipart_formdata(fields)
#logging.debug(data)
header_body = BytesIO()
# Remove closing boundary
lines = data.split("\r\n")
boundary = lines[0]
lines = lines[0:len(lines)-2]
header_body.write(b("\r\n".join(lines) + "\r\n"))
#Add file data part except data
header_body.write(b('%s\r\n' % boundary))
header_body.write(b('Content-Disposition: form-data; name="userfile_0"; filename="fake-name"\r\n'))
header_body.write(b('Content-Type: application/octet-stream\r\n\r\n'))
closing_boundary = b('\r\n%s--\r\n' % (boundary))
return (header_body.getvalue(), closing_boundary, content_type)
开发者ID:nguyentamvinhlong,项目名称:pydio-sync,代码行数:26,代码来源:utils.py
示例14: test_make_f77_block
def test_make_f77_block():
"""Plain F77-unformatted block"""
f77_block = make_f77_block(six.b('asdf'))
assert_equal(
f77_block,
six.b('\x04\x00\x00\x00asdf\x04\x00\x00\x00')
)
开发者ID:aepsil0n,项目名称:jelly,代码行数:7,代码来源:test_ics.py
示例15: get_body_content
def get_body_content(self):
"""
Returns content of BODY element for this HTML document. Content will be of type 'str' (Python 2) or 'bytes' (Python 3).
:Returns:
Returns content of this document.
"""
content = self.get_content()
try:
html_tree = parse_html_string(self.content)
except:
return ''
html_root = html_tree.getroottree()
if len(html_root.find('body')) != 0:
body = html_tree.find('body')
tree_str = etree.tostring(body, pretty_print=True, encoding='utf-8', xml_declaration=False)
# this is so stupid
if tree_str.startswith(six.b('<body>')):
n = tree_str.rindex(six.b('</body>'))
return tree_str[7:n]
return tree_str
return ''
开发者ID:homaralex,项目名称:ebooklib,代码行数:30,代码来源:epub.py
示例16: test_make_f77_block_padded
def test_make_f77_block_padded():
"""F77-unformatted block with padding"""
f77_block_padded = make_f77_block(six.b('asdf'), 10)
assert_equal(
f77_block_padded,
six.b('\x0a\x00\x00\x00asdf\x00\x00\x00\x00\x00\x00\x0a\x00\x00\x00')
)
开发者ID:aepsil0n,项目名称:jelly,代码行数:7,代码来源:test_ics.py
示例17: test_walk
def test_walk(self):
self.fs.setcontents('a.txt', b('hello'))
self.fs.setcontents('b.txt', b('world'))
self.fs.makeopendir('foo').setcontents('c', b('123'))
sorted_walk = sorted([(d, sorted(fs)) for (d, fs) in self.fs.walk()])
self.assertEquals(sorted_walk,
[("/", ["a.txt", "b.txt"]),
("/foo", ["c"])])
# When searching breadth-first, shallow entries come first
found_a = False
for _, files in self.fs.walk(search="breadth"):
if "a.txt" in files:
found_a = True
if "c" in files:
break
assert found_a, "breadth search order was wrong"
# When searching depth-first, deep entries come first
found_c = False
for _, files in self.fs.walk(search="depth"):
if "c" in files:
found_c = True
if "a.txt" in files:
break
assert found_c, "depth search order was wrong: " + \
str(list(self.fs.walk(search="depth")))
开发者ID:DANCEcollaborative,项目名称:forum-xblock,代码行数:25,代码来源:__init__.py
示例18: _get_login_info
def _get_login_info(self):
"""Get login IP, username and password from config file."""
logininfo = {}
filename = self.configuration.manila_huawei_conf_file
tree = ET.parse(filename)
root = tree.getroot()
RestURL = root.findtext('Storage/RestURL')
logininfo['RestURL'] = RestURL.strip()
# Prefix !$$$ means encoded already.
prefix_name = '!$$$'
need_encode = False
for key in ['UserName', 'UserPassword']:
node = root.find('Storage/%s' % key)
if node.text.find(prefix_name) > -1:
logininfo[key] = base64.b64decode(six.b(node.text[4:]))
else:
logininfo[key] = node.text
node.text = prefix_name + six.text_type(
base64.b64encode(six.b(node.text)))
need_encode = True
if need_encode:
self._change_file_mode(filename)
try:
tree.write(filename, 'UTF-8')
except Exception as err:
err_msg = (_('File write error %s.') % err)
LOG.error(err_msg)
raise exception.InvalidShare(reason=err_msg)
return logininfo
开发者ID:huaweistorage,项目名称:OpenStack_Driver,代码行数:31,代码来源:helper.py
示例19: test_rename
def test_rename(self):
check = self.check
# test renaming a file in the same directory
self.fs.setcontents("foo.txt", b("Hello, World!"))
self.assert_(check("foo.txt"))
self.fs.rename("foo.txt", "bar.txt")
self.assert_(check("bar.txt"))
self.assert_(not check("foo.txt"))
# test renaming a directory in the same directory
self.fs.makedir("dir_a")
self.fs.setcontents("dir_a/test.txt", b("testerific"))
self.assert_(check("dir_a"))
self.fs.rename("dir_a", "dir_b")
self.assert_(check("dir_b"))
self.assert_(check("dir_b/test.txt"))
self.assert_(not check("dir_a/test.txt"))
self.assert_(not check("dir_a"))
# test renaming a file into a different directory
self.fs.makedir("dir_a")
self.fs.rename("dir_b/test.txt", "dir_a/test.txt")
self.assert_(not check("dir_b/test.txt"))
self.assert_(check("dir_a/test.txt"))
# test renaming a file into a non-existent directory
self.assertRaises(ParentDirectoryMissingError,
self.fs.rename, "dir_a/test.txt", "nonexistent/test.txt")
开发者ID:DANCEcollaborative,项目名称:forum-xblock,代码行数:25,代码来源:__init__.py
示例20: readline
def readline(self,size=-1):
"""Read a line from the file, or at most <size> bytes."""
bits = []
indx = -1
sizeSoFar = 0
while indx == -1:
nextBit = self.read(self._bufsize)
bits.append(nextBit)
sizeSoFar += len(nextBit)
if not nextBit:
break
if size > 0 and sizeSoFar >= size:
break
indx = nextBit.find(b("\n"))
# If not found, return whole string up to <size> length
# Any leftovers are pushed onto front of buffer
if indx == -1:
data = b("").join(bits)
if size > 0 and sizeSoFar > size:
extra = data[size:]
data = data[:size]
self._rbuffer = extra + self._rbuffer
return data
# If found, push leftovers onto front of buffer
# Add one to preserve the newline in the return value
indx += 1
extra = bits[-1][indx:]
bits[-1] = bits[-1][:indx]
self._rbuffer = extra + self._rbuffer
return b("").join(bits)
开发者ID:DANCEcollaborative,项目名称:forum-xblock,代码行数:30,代码来源:filelike.py
注:本文中的six.b函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论