本文整理汇总了Python中wechatpy.utils.to_binary函数的典型用法代码示例。如果您正苦于以下问题:Python to_binary函数的具体用法?Python to_binary怎么用?Python to_binary使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了to_binary函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: rsa_decrypt
def rsa_decrypt(encrypted_data, pem, password=None):
"""
rsa 解密
:param encrypted_data: 待解密 bytes
:param pem: RSA private key 内容/binary
:param password: RSA private key pass phrase
:return: 解密后的 binary
"""
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
encrypted_data = to_binary(encrypted_data)
pem = to_binary(pem)
private_key = serialization.load_pem_private_key(pem, password, backend=default_backend())
data = private_key.decrypt(
encrypted_data,
padding=padding.OAEP(
mgf=padding.MGF1(hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None,
)
)
return data
开发者ID:Brightcells,项目名称:wechatpy,代码行数:25,代码来源:utils.py
示例2: rsa_encrypt
def rsa_encrypt(data, pem, b64_encode=True):
"""
rsa 加密
:param data: 待加密字符串/binary
:param pem: RSA public key 内容/binary
:param b64_encode: 是否对输出进行 base64 encode
:return: 如果 b64_encode=True 的话,返回加密并 base64 处理后的 string;否则返回加密后的 binary
"""
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
encoded_data = to_binary(data)
pem = to_binary(pem)
public_key = serialization.load_pem_public_key(pem, backend=default_backend())
encrypted_data = public_key.encrypt(
encoded_data,
padding=padding.OAEP(
mgf=padding.MGF1(hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None,
)
)
if b64_encode:
encrypted_data = base64.b64encode(encrypted_data).decode('utf-8')
return encrypted_data
开发者ID:Brightcells,项目名称:wechatpy,代码行数:26,代码来源:utils.py
示例3: _encrypt
def _encrypt(self, text, _id):
text = to_binary(text)
tmp_list = []
tmp_list.append(to_binary(self.get_random_string()))
length = struct.pack(b'I', socket.htonl(len(text)))
tmp_list.append(length)
tmp_list.append(text)
tmp_list.append(to_binary(_id))
text = b''.join(tmp_list)
text = PKCS7Encoder.encode(text)
ciphertext = to_binary(self.cipher.encrypt(text))
return base64.b64encode(ciphertext)
开发者ID:JoneXiong,项目名称:wechatpy,代码行数:14,代码来源:base.py
示例4: encode
def encode(cls, text):
length = len(text)
padding_count = cls.block_size - length % cls.block_size
if padding_count == 0:
padding_count = cls.block_size
padding = to_binary(chr(padding_count))
return text + padding * padding_count
开发者ID:Brightcells,项目名称:wechatpy,代码行数:7,代码来源:pkcs7.py
示例5: _encrypt_message
def _encrypt_message(self,
msg,
nonce,
timestamp=None,
crypto_class=None):
from wechatpy.replies import BaseReply
xml = """<xml>
<Encrypt><![CDATA[{encrypt}]]></Encrypt>
<MsgSignature><![CDATA[{signature}]]></MsgSignature>
<TimeStamp>{timestamp}</TimeStamp>
<Nonce><![CDATA[{nonce}]]></Nonce>
</xml>"""
if isinstance(msg, BaseReply):
msg = msg.render()
timestamp = timestamp or to_binary(int(time.time()))
pc = crypto_class(self.key)
encrypt = to_text(pc.encrypt(msg, self._id))
signature = _get_signature(self.token, timestamp, nonce, encrypt)
return to_text(xml.format(
encrypt=encrypt,
signature=signature,
timestamp=timestamp,
nonce=nonce
))
开发者ID:Dingqs,项目名称:oejia_wx,代码行数:25,代码来源:__init__.py
示例6: __repr__
def __repr__(self):
_repr = "{klass}({msg})".format(
klass=self.__class__.__name__,
msg=repr(self._data)
)
if six.PY2:
return to_binary(_repr)
else:
return to_text(_repr)
开发者ID:Dingqs,项目名称:oejia_wx,代码行数:9,代码来源:component.py
示例7: __repr__
def __repr__(self):
_repr = '{klass}({name})'.format(
klass=self.__class__.__name__,
name=repr(self.name)
)
if six.PY2:
return to_binary(_repr)
else:
return to_text(_repr)
开发者ID:Brightcells,项目名称:wechatpy,代码行数:9,代码来源:fields.py
示例8: __str__
def __str__(self):
_repr = 'Error code: {code}, message: {msg}'.format(
code=self.errcode,
msg=self.errmsg
)
if six.PY2:
return to_binary(_repr)
else:
return to_text(_repr)
开发者ID:JoneXiong,项目名称:wechatpy,代码行数:9,代码来源:exceptions.py
示例9: __repr__
def __repr__(self):
_repr = '{klass}({code}, {msg})'.format(
klass=self.__class__.__name__,
code=self.errcode,
msg=self.errmsg
)
if six.PY2:
return to_binary(_repr)
else:
return to_text(_repr)
开发者ID:JoneXiong,项目名称:wechatpy,代码行数:10,代码来源:exceptions.py
示例10: __str__
def __str__(self):
if six.PY2:
return to_binary('Error code: {code}, message: {msg}'.format(
code=self.return_code,
msg=self.return_msg
))
else:
return to_text('Error code: {code}, message: {msg}'.format(
code=self.return_code,
msg=self.return_msg
))
开发者ID:MOODOO-SH,项目名称:wechatpy,代码行数:11,代码来源:exceptions.py
示例11: _decrypt
def _decrypt(self, text, _id, exception=None):
text = to_binary(text)
plain_text = self.cipher.decrypt(base64.b64decode(text))
padding = byte2int(plain_text[-1])
content = plain_text[16:-padding]
xml_length = socket.ntohl(struct.unpack(b'I', content[:4])[0])
xml_content = to_text(content[4:xml_length + 4])
from_id = to_text(content[xml_length + 4:])
if from_id != _id:
exception = exception or Exception
raise exception()
return xml_content
开发者ID:JoneXiong,项目名称:wechatpy,代码行数:12,代码来源:base.py
示例12: _request
def _request(self, method, url_or_endpoint, **kwargs):
http_client = AsyncHTTPClient()
if not url_or_endpoint.startswith(('http://', 'https://')):
api_base_url = kwargs.pop('api_base_url', self.API_BASE_URL)
url = '{base}{endpoint}'.format(
base=api_base_url,
endpoint=url_or_endpoint
)
else:
url = url_or_endpoint
headers = {}
params = kwargs.pop('params', {})
params = urlencode(dict((k, to_binary(v)) for k, v in params.items()))
url = '{0}?{1}'.format(url, params)
data = kwargs.get('data')
if isinstance(data, dict):
data = optionaldict(data)
if 'mchid' not in data:
# Fuck Tencent
data.setdefault('mch_id', self.mch_id)
data.setdefault('sub_mch_id', self.sub_mch_id)
data.setdefault('nonce_str', random_string(32))
sign = calculate_signature(data, self.api_key)
body = dict_to_xml(data, sign)
body = body.encode('utf-8')
else:
body = data
req = HTTPRequest(
url=url,
method=method.upper(),
headers=headers,
body=body
)
res = yield http_client.fetch(req)
if res.error is not None:
raise WeChatClientException(
errcode=None,
errmsg=None,
client=self,
request=req,
response=res
)
result = self._handle_result(res)
raise Return(result)
开发者ID:bluehawksky,项目名称:TornadoWeb,代码行数:49,代码来源:tornado.py
示例13: get_qrcode_url
def get_qrcode_url(self, ticket, data=None):
"""
通过 ticket 换取二维码地址
详情请参考
http://iot.weixin.qq.com/wiki/new/index.html?page=3-4-4
:param ticket: 二维码 ticket
:param data: 额外数据
:return: 二维码地址
"""
url = 'http://we.qq.com/d/{ticket}'.format(ticket=ticket)
if data:
if isinstance(data, (dict, tuple, list)):
data = urllib.urlencode(data)
data = to_text(base64.b64encode(to_binary(data)))
url = '{base}#{data}'.format(base=url, data=data)
return url
开发者ID:raptorz,项目名称:wechatpy,代码行数:17,代码来源:device.py
示例14: delete_account
def delete_account(self, account):
"""
删除客服账号
详情请参考
http://mp.weixin.qq.com/wiki/1/70a29afed17f56d537c833f89be979c9.html
:param account: 完整客服账号,格式为:账号前缀@公众号微信号
:return: 返回的 JSON 数据包
"""
params_data = [
'access_token={0}'.format(quote(self.access_token)),
'kf_account={0}'.format(quote(to_binary(account), safe=b'/@')),
]
params = '&'.join(params_data)
return self._get(
'https://api.weixin.qq.com/customservice/kfaccount/del',
params=params
)
开发者ID:navcat,项目名称:wechatpy,代码行数:18,代码来源:customservice.py
示例15: _fetch_access_token
def _fetch_access_token(self, url, params):
"""
替代 requests 版本 _fetch_access_token
"""
http_client = AsyncHTTPClient()
params = urlencode(dict((k, to_binary(v)) for k, v in params.items()))
_url = '{0}?{1}'.format(url, params)
req = HTTPRequest(
url=_url,
method="GET",
request_timeout=self.timeout
)
res = yield http_client.fetch(req)
if res.error is not None:
raise WeChatClientException(
errcode=None,
errmsg=None,
client=self,
request=req,
response=res
)
result = self._decode_result(res)
if 'errcode' in result and result['errcode'] != 0:
raise WeChatClientException(
result['errcode'],
result['errmsg'],
client=self,
request=res.request,
response=res
)
expires_in = 7200
if 'expires_in' in result:
expires_in = result['expires_in']
self.session.set(
self.access_token_key,
result['access_token'],
expires_in
)
self.expires_at = int(time.time()) + expires_in
raise Return(result)
开发者ID:cloverstd,项目名称:wechatpy.async.tornado,代码行数:44,代码来源:tornado.py
示例16: update_account
def update_account(self, account, nickname, password):
"""
更新客服账号
详情请参考
http://mp.weixin.qq.com/wiki/1/70a29afed17f56d537c833f89be979c9.html
:param account: 完整客服账号,格式为:账号前缀@公众号微信号
:param nickname: 客服昵称,最长6个汉字或12个英文字符
:param password: 客服账号登录密码
:return: 返回的 JSON 数据包
"""
password = to_binary(password)
password = hashlib.md5(password).hexdigest()
return self._post(
'https://api.weixin.qq.com/customservice/kfaccount/update',
data={
'kf_account': account,
'nickname': nickname,
'password': password
}
)
开发者ID:navcat,项目名称:wechatpy,代码行数:21,代码来源:customservice.py
示例17: send_message
def send_message(self, device_type, device_id, user_id, content):
"""
主动发送消息给设备
详情请参考
http://iot.weixin.qq.com/wiki/new/index.html?page=3-4-3
:param device_type: 设备类型,目前为“公众账号原始ID”
:param device_id: 设备ID
:param user_id: 微信用户账号的openid
:param content: 消息内容,BASE64编码
:return: 返回的 JSON 数据包
"""
content = to_text(base64.b64encode(to_binary(content)))
return self._post(
'transmsg',
data={
'device_type': device_type,
'device_id': device_id,
'openid': user_id,
'content': content
}
)
开发者ID:raptorz,项目名称:wechatpy,代码行数:22,代码来源:device.py
示例18: format_url
def format_url(params, api_key=None):
data = [to_binary('{0}={1}'.format(k, params[k])) for k in sorted(params) if params[k]]
if api_key:
data.append(to_binary('key={0}'.format(api_key)))
return b"&".join(data)
开发者ID:messense,项目名称:wechatpy,代码行数:5,代码来源:utils.py
示例19: __init__
def __init__(self, token, encoding_aes_key, _id):
encoding_aes_key = to_binary(encoding_aes_key + '=')
self.key = base64.b64decode(encoding_aes_key)
assert len(self.key) == 32
self.token = token
self._id = _id
开发者ID:Dingqs,项目名称:oejia_wx,代码行数:6,代码来源:__init__.py
示例20: __str__
def __str__(self):
if six.PY2:
return to_binary(self.render())
else:
return to_text(self.render())
开发者ID:cysnake4713,项目名称:wechatpy,代码行数:5,代码来源:replies.py
注:本文中的wechatpy.utils.to_binary函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论