本文整理汇总了Python中xpra.os_util.strtobytes函数的典型用法代码示例。如果您正苦于以下问题:Python strtobytes函数的具体用法?Python strtobytes怎么用?Python strtobytes使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了strtobytes函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: authenticate_hmac
def authenticate_hmac(self, challenge_response, client_salt):
if not self.salt:
log.error("Error: illegal challenge response received - salt cleared or unset")
return None
#ensure this salt does not get re-used:
if client_salt is None:
salt = self.salt
else:
salt = xor(self.salt, client_salt)
self.salt = None
password = self.get_password()
if not password:
log.error("Error: %s authentication failed", self)
log.error(" no password defined for '%s'", self.username)
return False
verify = hmac.HMAC(strtobytes(password), strtobytes(salt), digestmod=hashlib.md5).hexdigest()
log("authenticate(%s) password=%s, hex(salt)=%s, hash=%s", challenge_response, password, binascii.hexlify(strtobytes(salt)), verify)
if hasattr(hmac, "compare_digest"):
eq = hmac.compare_digest(verify, challenge_response)
else:
eq = verify==challenge_response
if not eq:
log("expected '%s' but got '%s'", verify, challenge_response)
log.error("Error: hmac password challenge for '%s' does not match", self.username)
return False
return True
开发者ID:ljmljz,项目名称:xpra,代码行数:26,代码来源:sys_auth_base.py
示例2: get_key
def get_key(password, key_salt, block_size, iterations):
global backend
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
kdf = PBKDF2HMAC(algorithm=hashes.SHA1(), length=block_size, salt=strtobytes(key_salt), iterations=iterations, backend=backend)
key = kdf.derive(strtobytes(password))
return key
开发者ID:ljmljz,项目名称:xpra,代码行数:7,代码来源:pycryptography_backend.py
示例3: _test_file_auth
def _test_file_auth(self, name, module, genauthdata):
#no file, no go:
a = self._init_auth(module)
assert a.requires_challenge()
p = a.get_password()
assert not p, "got a password from %s: %s" % (a, p)
#challenge twice is a fail
assert a.get_challenge()
assert not a.get_challenge()
assert not a.get_challenge()
for muck in (0, 1):
f = tempfile.NamedTemporaryFile(prefix=name)
filename = f.name
with f:
a = self._init_auth(module, {"password_file" : filename})
password, filedata = genauthdata(a)
print("saving password file data='%s' to '%s'", filedata, filename)
f.write(strtobytes(filedata))
f.flush()
assert a.requires_challenge()
salt, mac = a.get_challenge()
assert salt
assert mac=="hmac"
password = strtobytes(password)
client_salt = strtobytes(uuid.uuid4().hex+uuid.uuid4().hex)
auth_salt = strtobytes(xor(salt, client_salt))
if muck==0:
verify = hmac.HMAC(password, auth_salt, digestmod=hashlib.md5).hexdigest()
assert a.authenticate(verify, client_salt)
assert not a.authenticate(verify, client_salt)
assert a.get_password()==password
elif muck==1:
for verify in ("whatever", None, "bad"):
assert not a.authenticate(verify, client_salt)
开发者ID:ljmljz,项目名称:xpra,代码行数:34,代码来源:auth_test.py
示例4: authenticate_hmac
def authenticate_hmac(self, challenge_response, client_salt):
self.sessions = None
if not self.salt:
log.error("Error: illegal challenge response received - salt cleared or unset")
return None
#ensure this salt does not get re-used:
if client_salt is None:
salt = self.salt
else:
salt = xor(self.salt, client_salt)
self.salt = None
entry = self.get_auth_info()
log("authenticate(%s) auth-info=%s", self.username, entry)
if entry is None:
log.error("Error: authentication failed")
log.error(" no password for '%s' in '%s'", self.username, self.password_filename)
return None
fpassword, uid, gid, displays, env_options, session_options = entry
verify = hmac.HMAC(strtobytes(fpassword), strtobytes(salt), digestmod=hashlib.md5).hexdigest()
log("authenticate(%s) password='%s', hex(salt)=%s, hash=%s", challenge_response, fpassword, binascii.hexlify(strtobytes(salt)), verify)
if not hmac.compare_digest(verify, challenge_response):
log("expected '%s' but got '%s'", verify, challenge_response)
log.error("Error: hmac password challenge for '%s' does not match", self.username)
return False
self.sessions = uid, gid, displays, env_options, session_options
return True
开发者ID:svn2github,项目名称:Xpra,代码行数:26,代码来源:multifile_auth.py
示例5: test_large_xor_speed
def test_large_xor_speed(self):
start = time.time()
size = 1*1024*1024 #1MB
zeroes = strtobytes(chr(0)*size)
ones = strtobytes(chr(1)*size)
count = 10
for _ in range(count):
self.check_xor(zeroes, ones, ones)
end = time.time()
speed = size/(end-start)/1024/1024
#print("%iMB/s: took %ims on average (%s iterations)" % (speed, 1000*(end-start)/count, count))
assert speed>0, "running the xor speed test took too long"
开发者ID:ljmljz,项目名称:xpra,代码行数:12,代码来源:cyxor_test.py
示例6: _process_challenge
def _process_challenge(self, packet):
authlog("processing challenge: %s", packet[1:])
def warn_server_and_exit(code, message, server_message="authentication failed"):
authlog.error("Error: authentication failed:")
authlog.error(" %s", message)
self.disconnect_and_quit(code, server_message)
if not self.has_password():
warn_server_and_exit(EXIT_PASSWORD_REQUIRED, "this server requires authentication, please provide a password", "no password available")
return
password = self.load_password()
if not password:
warn_server_and_exit(EXIT_PASSWORD_FILE_ERROR, "failed to load password from file %s" % self.password_file, "no password available")
return
salt = packet[1]
if self.encryption:
assert len(packet)>=3, "challenge does not contain encryption details to use for the response"
server_cipher = typedict(packet[2])
key = self.get_encryption_key()
if key is None:
warn_server_and_exit(EXIT_ENCRYPTION, "the server does not use any encryption", "client requires encryption")
return
if not self.set_server_encryption(server_cipher, key):
return
#all server versions support a client salt,
#they also tell us which digest to use:
digest = packet[3]
client_salt = get_hex_uuid()+get_hex_uuid()
#TODO: use some key stretching algorigthm? (meh)
try:
from xpra.codecs.xor.cyxor import xor_str #@UnresolvedImport
salt = xor_str(salt, client_salt)
except:
salt = xor(salt, client_salt)
if digest==b"hmac":
import hmac, hashlib
password = strtobytes(password)
salt = strtobytes(salt)
challenge_response = hmac.HMAC(password, salt, digestmod=hashlib.md5).hexdigest()
elif digest==b"xor":
#don't send XORed password unencrypted:
if not self._protocol.cipher_out and not ALLOW_UNENCRYPTED_PASSWORDS:
warn_server_and_exit(EXIT_ENCRYPTION, "server requested digest %s, cowardly refusing to use it without encryption" % digest, "invalid digest")
return
challenge_response = xor(password, salt)
else:
warn_server_and_exit(EXIT_PASSWORD_REQUIRED, "server requested an unsupported digest: %s" % digest, "invalid digest")
return
if digest:
authlog("%s(%s, %s)=%s", digest, binascii.hexlify(password), binascii.hexlify(salt), challenge_response)
self.password_sent = True
self.remove_packet_handlers("challenge")
self.send_hello(challenge_response, client_salt)
开发者ID:ljmljz,项目名称:xpra,代码行数:52,代码来源:client_base.py
示例7: _test_hmac_auth
def _test_hmac_auth(self, auth_class, password, **kwargs):
for x in (password, "somethingelse"):
a = self._init_auth(auth_class, **kwargs)
assert a.requires_challenge()
assert a.get_password()
salt, mac = a.get_challenge()
assert salt
assert mac=="hmac", "invalid mac: %s" % mac
client_salt = strtobytes(uuid.uuid4().hex+uuid.uuid4().hex)
auth_salt = strtobytes(xor(salt, client_salt))
verify = hmac.HMAC(x, auth_salt, digestmod=hashlib.md5).hexdigest()
passed = a.authenticate(verify, client_salt)
assert passed == (x==password), "expected authentication to %s with %s vs %s" % (["fail", "succeed"][x==password], x, password)
assert not a.authenticate(verify, client_salt)
开发者ID:ljmljz,项目名称:xpra,代码行数:14,代码来源:auth_test.py
示例8: send_hello
def send_hello(self, challenge_response=None, client_salt=None):
try:
hello = self.make_hello_base()
if (self.password_file or os.environ.get('XPRA_PASSWORD')) and not challenge_response:
#avoid sending the full hello: tell the server we want
#a packet challenge first
hello["challenge"] = True
else:
hello.update(self.make_hello())
except InitExit as e:
log.error("error preparing connection:")
log.error(" %s", e)
self.quit(EXIT_INTERNAL_ERROR)
return
except Exception as e:
log.error("error preparing connection: %s", e, exc_info=True)
self.quit(EXIT_INTERNAL_ERROR)
return
if challenge_response:
assert self.password_file or os.environ.get('XPRA_PASSWORD')
hello["challenge_response"] = challenge_response
if client_salt:
hello["challenge_client_salt"] = client_salt
log("send_hello(%s) packet=%s", binascii.hexlify(strtobytes(challenge_response or "")), hello)
self.send("hello", hello)
开发者ID:svn2github,项目名称:Xpra,代码行数:25,代码来源:client_base.py
示例9: nasty_rgb_via_png_paint
def nasty_rgb_via_png_paint(self, cairo_format, has_alpha, img_data, x, y, width, height, rowstride, rgb_format):
log("nasty_rgb_via_png_paint%s", (cairo_format, has_alpha, len(img_data), x, y, width, height, rowstride, rgb_format))
#PIL fallback
PIL = get_codec("PIL")
if has_alpha:
oformat = "RGBA"
else:
oformat = "RGB"
#use frombytes rather than frombuffer to be compatible with python3 new-style buffers
#this is slower, but since this codepath is already dreadfully slow, we don't care
bdata = strtobytes(memoryview_to_bytes(img_data))
try:
img = PIL.Image.frombytes(oformat, (width,height), bdata, "raw", rgb_format.replace("X", "A"), rowstride, 1)
except ValueError as e:
raise Exception("failed to parse raw %s data to %s: %s" % (rgb_format, oformat, e))
#This is insane, the code below should work, but it doesn't:
# img_data = bytearray(img.tostring('raw', oformat, 0, 1))
# pixbuf = pixbuf_new_from_data(img_data, COLORSPACE_RGB, True, 8, width, height, rowstride)
# success = self.cairo_paint_pixbuf(pixbuf, x, y)
#So we still rountrip via PNG:
png = BytesIOClass()
img.save(png, format="PNG")
reader = BytesIOClass(png.getvalue())
png.close()
img = cairo.ImageSurface.create_from_png(reader)
self.cairo_paint_surface(img, x, y)
return True
开发者ID:ljmljz,项目名称:xpra,代码行数:27,代码来源:cairo_backing_base.py
示例10: test_xor_str
def test_xor_str(self):
zeroes = strtobytes(chr(0)*16)
ones = strtobytes(chr(1)*16)
ff = strtobytes(chr(255)*16)
fe = strtobytes(chr(254)*16)
empty = b""
lstr = b"\0x80"*64
self.check_xor(zeroes, zeroes, zeroes)
self.check_xor(ones, ones, zeroes)
self.check_xor(ff, ones, fe)
self.check_xor(fe, ones, ff)
#feed some invalid data:
self.fail_xor(ones, empty)
self.fail_xor(empty, zeroes)
self.fail_xor(lstr, ff)
self.fail_xor(bool, int)
开发者ID:ljmljz,项目名称:xpra,代码行数:16,代码来源:cyxor_test.py
示例11: send_hello
def send_hello(self, challenge_response=None, client_salt=None):
try:
hello = self.make_hello_base()
if self.has_password() and not challenge_response:
#avoid sending the full hello: tell the server we want
#a packet challenge first
hello["challenge"] = True
else:
hello.update(self.make_hello())
except InitExit as e:
log.error("error preparing connection:")
log.error(" %s", e)
self.quit(EXIT_INTERNAL_ERROR)
return
except Exception as e:
log.error("error preparing connection: %s", e, exc_info=True)
self.quit(EXIT_INTERNAL_ERROR)
return
if challenge_response:
assert self.has_password(), "got a password challenge response but we don't have a password! (malicious or broken server?)"
hello["challenge_response"] = challenge_response
if client_salt:
hello["challenge_client_salt"] = client_salt
log("send_hello(%s) packet=%s", binascii.hexlify(strtobytes(challenge_response or "")), hello)
self.send("hello", hello)
开发者ID:ljmljz,项目名称:xpra,代码行数:25,代码来源:client_base.py
示例12: validate_backend
def validate_backend(try_backend):
import binascii
from xpra.os_util import strtobytes
try_backend.init()
message = b"some message1234"
password = "this is our secret"
key_salt = DEFAULT_SALT
iterations = DEFAULT_ITERATIONS
block_size = DEFAULT_BLOCKSIZE
key = try_backend.get_key(password, key_salt, block_size, iterations)
log("validate_backend(%s) key=%s", try_backend, binascii.hexlify(key))
assert key is not None, "backend %s failed to generate a key" % try_backend
enc = try_backend.get_encryptor(key, DEFAULT_IV)
log("validate_backend(%s) encryptor=%s", try_backend, enc)
assert enc is not None, "backend %s failed to generate an encryptor" % enc
dec = try_backend.get_decryptor(key, DEFAULT_IV)
log("validate_backend(%s) decryptor=%s", try_backend, dec)
assert dec is not None, "backend %s failed to generate a decryptor" % enc
ev = enc.encrypt(message)
evs = binascii.hexlify(strtobytes(ev))
log("validate_backend(%s) encrypted(%s)=%s", try_backend, message, evs)
dv = dec.decrypt(ev)
log("validate_backend(%s) decrypted(%s)=%s", try_backend, evs, dv)
assert dv==message
log("validate_backend(%s) passed", try_backend)
开发者ID:svn2github,项目名称:Xpra,代码行数:25,代码来源:crypto.py
示例13: get_pixbuf_from_data
def get_pixbuf_from_data(rgb_data, has_alpha, w, h, rowstride):
if is_gtk3():
data = array.array('B', strtobytes(rgb_data))
return GdkPixbuf.Pixbuf.new_from_data(data, GdkPixbuf.Colorspace.RGB,
True, 8, w, h, rowstride,
None, None)
return gdk.pixbuf_new_from_data(rgb_data, gdk.COLORSPACE_RGB, has_alpha, 8, w, h, rowstride)
开发者ID:svn2github,项目名称:Xpra,代码行数:7,代码来源:gtk_util.py
示例14: capsget
def capsget(self, key, default=None):
v = self.get(key)
#py3k and bytes as keys...
if v is None and type(key)==str:
v = self.get(strtobytes(key), default)
if sys.version >= '3' and type(v)==bytes:
v = bytestostr(v)
return v
开发者ID:svn2github,项目名称:Xpra,代码行数:8,代码来源:util.py
示例15: get_default_systemd_run
def get_default_systemd_run():
#don't use systemd-run on CentOS / RedHat
#(it causes failures with "Failed to create bus connection: No such file or directory")
from xpra.os_util import load_binary_file, strtobytes
data = strtobytes(load_binary_file("/etc/redhat-release") or "")
if data and (data.find(b"RedHat")>=0 or data.find(b"CentOS")>=0):
return "no"
return "auto"
开发者ID:svn2github,项目名称:Xpra,代码行数:8,代码来源:config.py
示例16: test_env
def test_env(self):
for var_name in ("XPRA_PASSWORD", "SOME_OTHER_VAR_NAME"):
password = strtobytes(uuid.uuid4().hex)
os.environ[var_name] = password
try:
kwargs = {}
if var_name!="XPRA_PASSWORD":
kwargs["name"] = var_name
self._test_hmac_auth(env_auth, password, name=var_name)
finally:
del os.environ[var_name]
开发者ID:ljmljz,项目名称:xpra,代码行数:11,代码来源:auth_test.py
示例17: check
def check(self, str_value):
b = strtobytes(str_value)
assert b
s = bytestostr(b)
assert s
assert s==str_value
if not _memoryview:
return
mv = _memoryview(b)
mvb = memoryview_to_bytes(mv)
mvs = bytestostr(mvb)
assert mvs==str_value
开发者ID:svn2github,项目名称:Xpra,代码行数:12,代码来源:os_util_test.py
示例18: _add_chunks_to_queue
def _add_chunks_to_queue(self,
chunks,
proto_flags,
start_send_cb=None,
end_send_cb=None):
""" the write_lock must be held when calling this function """
counter = 0
items = []
for index, level, data in chunks:
scb, ecb = None, None
#fire the start_send_callback just before the first packet is processed:
if counter == 0:
scb = start_send_cb
#fire the end_send callback when the last packet (index==0) makes it out:
if index == 0:
ecb = end_send_cb
payload_size = len(data)
actual_size = payload_size
if self.cipher_out:
proto_flags |= Protocol.FLAGS_CIPHER
#note: since we are padding: l!=len(data)
padding = (self.cipher_out_block_size -
len(data) % self.cipher_out_block_size) * " "
if len(padding) == 0:
padded = data
else:
padded = data + padding
actual_size = payload_size + len(padding)
assert len(padded) == actual_size
data = self.cipher_out.encrypt(padded)
assert len(data) == actual_size
log("sending %s bytes encrypted with %s padding", payload_size,
len(padding))
if proto_flags & Protocol.FLAGS_NOHEADER:
#for plain/text packets (ie: gibberish response)
items.append((data, scb, ecb))
elif pack_header_and_data is not None and actual_size < PACKET_JOIN_SIZE:
if type(data) == unicode:
data = str(data)
header_and_data = pack_header_and_data(
actual_size, proto_flags, level, index, payload_size, data)
items.append((header_and_data, scb, ecb))
else:
header = pack_header(proto_flags, level, index, payload_size)
items.append((header, scb, None))
items.append((strtobytes(data), None, ecb))
counter += 1
self._write_queue.put(items)
self.output_packetcount += 1
开发者ID:svn2github,项目名称:Xpra,代码行数:49,代码来源:protocol.py
示例19: init_client_mmap
def init_client_mmap(token, mmap_group=None, socket_filename=None, size=128*1024*1024):
"""
Initializes an mmap area, writes the token in it and returns:
(success flag, mmap_area, mmap_size, temp_file, mmap_filename)
The caller must keep hold of temp_file to ensure it does not get deleted!
This is used by the client.
"""
if not can_use_mmap():
log.error("cannot use mmap: python version is too old?")
return False, None, 0, None, None
log("init_mmap(%s, %s, %s)", token, mmap_group, socket_filename)
try:
import mmap
import tempfile
from stat import S_IRUSR,S_IWUSR,S_IRGRP,S_IWGRP
mmap_dir = os.getenv("TMPDIR", "/tmp")
if not os.path.exists(mmap_dir):
raise Exception("TMPDIR %s does not exist!" % mmap_dir)
#create the mmap file, the mkstemp that is called via NamedTemporaryFile ensures
#that the file is readable and writable only by the creating user ID
temp = tempfile.NamedTemporaryFile(prefix="xpra.", suffix=".mmap", dir=mmap_dir)
#keep a reference to it so it does not disappear!
mmap_temp_file = temp
mmap_filename = temp.name
fd = temp.file.fileno()
#set the group permissions and gid if the mmap-group option is specified
if mmap_group and type(socket_filename)==str and os.path.exists(socket_filename):
s = os.stat(socket_filename)
os.fchown(fd, -1, s.st_gid)
os.fchmod(fd, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP)
assert size>=1024*1024, "mmap size is too small: %s (minimum is 1MB)" % to_std_unit(size)
assert size<=1024*1024*1024, "mmap is too big: %s (maximum is 1GB)" % to_std_unit(size)
unit = max(4096, mmap.PAGESIZE)
mmap_size = roundup(size + 8, unit)
log("using mmap file %s, fd=%s, size=%s", mmap_filename, fd, mmap_size)
SEEK_SET = 0 #os.SEEK_SET==0 but this is not available in python2.4
os.lseek(fd, mmap_size-1, SEEK_SET)
assert os.write(fd, strtobytes('\x00'))
os.lseek(fd, 0, SEEK_SET)
mmap_area = mmap.mmap(fd, length=mmap_size)
write_mmap_token(mmap_area, token)
return True, mmap_area, mmap_size, mmap_temp_file, mmap_filename
except Exception, e:
log.error("failed to setup mmap: %s", e, exc_info=True)
clean_mmap(mmap_filename)
return False, None, 0, None, None
开发者ID:svn2github,项目名称:Xpra,代码行数:46,代码来源:mmap_pipe.py
示例20: _add_chunks_to_queue
def _add_chunks_to_queue(self, chunks, start_send_cb=None, end_send_cb=None):
""" the write_lock must be held when calling this function """
counter = 0
items = []
for proto_flags,index,level,data in chunks:
scb, ecb = None, None
#fire the start_send_callback just before the first packet is processed:
if counter==0:
scb = start_send_cb
#fire the end_send callback when the last packet (index==0) makes it out:
if index==0:
ecb = end_send_cb
payload_size = len(data)
actual_size = payload_size
if self.cipher_out:
proto_flags |= FLAGS_CIPHER
#note: since we are padding: l!=len(data)
padding_size = self.cipher_out_block_size - (payload_size % self.cipher_out_block_size)
if padding_size==0:
padded = data
else:
# pad byte value is number of padding bytes added
padded = data + pad(self.cipher_out_padding, padding_size)
actual_size += padding_size
assert len(padded)==actual_size, "expected padded size to be %i, but got %i" % (len(padded), actual_size)
data = self.cipher_out.encrypt(padded)
assert len(data)==actual_size, "expected encrypted size to be %i, but got %i" % (len(data), actual_size)
cryptolog("sending %s bytes %s encrypted with %s padding", payload_size, self.cipher_out_name, padding_size)
if proto_flags & FLAGS_NOHEADER:
assert not self.cipher_out
#for plain/text packets (ie: gibberish response)
log("sending %s bytes without header", payload_size)
items.append((data, scb, ecb))
elif actual_size<PACKET_JOIN_SIZE:
if type(data) not in JOIN_TYPES:
data = bytes(data)
header_and_data = pack_header(proto_flags, level, index, payload_size) + data
items.append((header_and_data, scb, ecb))
else:
header = pack_header(proto_flags, level, index, payload_size)
items.append((header, scb, None))
items.append((strtobytes(data), None, ecb))
counter += 1
self._write_queue.put(items)
self.output_packetcount += 1
开发者ID:svn2github,项目名称:Xpra,代码行数:45,代码来源:protocol.py
注:本文中的xpra.os_util.strtobytes函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论