本文整理汇总了Python中wechat_sdk.WechatBasic类的典型用法代码示例。如果您正苦于以下问题:Python WechatBasic类的具体用法?Python WechatBasic怎么用?Python WechatBasic使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了WechatBasic类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: wrapper
def wrapper(*args, **kwargs):
global cur_token
if not cur_token or cur_token["access_token_expires_at"] - 60 < int(time.time()):
wechat = WechatBasic(appid=wechat_appid, appsecret=wechat_appsecret)
wechat.grant_token(True)
cur_token = wechat.get_access_token()
return wechat_function(*args, **kwargs)
开发者ID:talway,项目名称:wechat_base,代码行数:7,代码来源:token_tool.py
示例2: init_wechat_sdk
def init_wechat_sdk():
"""初始化微信 SDK"""
access_token = redis.get("wechat:access_token")
jsapi_ticket = redis.get("wechat:jsapi_ticket")
token_expires_at = redis.get("wechat:access_token_expires_at")
ticket_expires_at = redis.get("wechat:jsapi_ticket_expires_at")
if access_token and jsapi_ticket and token_expires_at and ticket_expires_at:
wechat = WechatBasic(appid=app.config['APP_ID'],
appsecret=app.config['APP_SECRET'],
token=app.config['TOKEN'],
access_token=access_token,
access_token_expires_at=int(token_expires_at),
jsapi_ticket=jsapi_ticket,
jsapi_ticket_expires_at=int(ticket_expires_at))
else:
wechat = WechatBasic(appid=app.config['APP_ID'],
appsecret=app.config['APP_SECRET'],
token=app.config['TOKEN'])
access_token = wechat.get_access_token()
redis.set("wechat:access_token", access_token['access_token'], 7000)
redis.set("wechat:access_token_expires_at",
access_token['access_token_expires_at'], 7000)
jsapi_ticket = wechat.get_jsapi_ticket()
redis.set("wechat:jsapi_ticket", jsapi_ticket['jsapi_ticket'], 7000)
redis.set("wechat:jsapi_ticket_expires_at",
jsapi_ticket['jsapi_ticket_expires_at'], 7000)
return wechat
开发者ID:DogLi,项目名称:gxgk-wechat-server,代码行数:28,代码来源:utils.py
示例3: test_parse_data_location_message
def test_parse_data_location_message(self):
message = """<xml>
<ToUserName><![CDATA[toUser]]></ToUserName>
<FromUserName><![CDATA[fromUser]]></FromUserName>
<CreateTime>1351776360</CreateTime>
<MsgType><![CDATA[location]]></MsgType>
<Location_X>23.134521</Location_X>
<Location_Y>113.358803</Location_Y>
<Scale>20</Scale>
<Label><![CDATA[位置信息]]></Label>
<MsgId>1234567890123456</MsgId>
</xml>"""
wechat = WechatBasic()
wechat.parse_data(data=message)
message = wechat.message
self.assertIsInstance(message, LocationMessage)
self.assertEqual(message.id, 1234567890123456)
self.assertEqual(message.target, 'toUser')
self.assertEqual(message.source, 'fromUser')
self.assertEqual(message.time, 1351776360)
self.assertEqual(message.type, 'location')
self.assertEqual(message.location, (23.134521, 113.358803))
self.assertEqual(message.scale, 20)
self.assertEqual(message.label, '位置信息')
开发者ID:14F,项目名称:wechat-python-sdk,代码行数:26,代码来源:test_basic.py
示例4: create_qrcode
def create_qrcode(cls, name):
from app import Qrcode
token, expired_at = _wechat.get_access_token()
wechat = WechatBasic(
appid=settings.app_id,
appsecret=settings.secret,
access_token=token,
access_token_expires_at=expired_at)
payload = {
"action_name": "QR_LIMIT_STR_SCENE",
"action_info": {"scene": {"scene_str": name}}}
resp = wechat.create_qrcode(payload)
ticket = resp.get('ticket', '')
url = resp.get('url', '')
data = cls.show_qrcode(ticket)
if not data:
raise
output = cStringIO.StringIO()
output.write(data)
# upload
p = qiniu.PutPolicy(settings.bucket)
path = '/qrcode/%s' % name
path, hash_key = p.upload(output, path)
output.close()
Qrcode.create_code(name, ticket, url, path, hash_key)
p = qiniu.PublicGetPolicy(settings.bucket, path)
return p.get_url()
开发者ID:southwolf,项目名称:wechat-admin,代码行数:29,代码来源:qrcode.py
示例5: check_saler_info
def check_saler_info():
"""确认收银员信息"""
brand_id = int(request.args.get("bid", 0))
openid = session.get("openid")
do = request.args.get("do")
form = SalerInfoForm()
if not openid:
code = request.args.get("code")
if not code:
print "not code"
# print "/check_saler_info"
return redirect(WeixinHelper.oauth3(request.url))
else:
wechat_login_fun(code)
if do == 'check':
# 绑定店员
mobile = request.args.get("mobile")
exist_saler = Saler.query.filter(Saler.user_id == g.user.id, Saler.brand_id == brand_id).first()
if not exist_saler:
g.user.mobile = mobile
saler = Saler(user_id=g.user.id, brand_id=brand_id)
db.session.add(saler)
db.session.commit()
wechat = WechatBasic(appid=appid, appsecret=appsecret)
wechat.send_text_message(openid, "您已成功绑定门店")
return json.dumps({"message": "提交成功", "type": "success"})
else:
return json.dumps({"message": "您已绑定门店,不用再次绑定", "type": "error"})
return render_template('mobile/check_saler_info.html', brand_id=brand_id, form=form)
开发者ID:yyt030,项目名称:quanduoduo,代码行数:29,代码来源:site.py
示例6: home
def home(request):
token = TOKEN
if request.method == 'POST':
signature = request.POST.get('signature', '')
timestamp = request.POST.get('timestamp', '')
nonce = request.POST.get('nonce')
elif request.method == 'GET':
signature = request.GET.get('signature', '')
timestamp = request.GET.get('timestamp', '')
nonce = request.GET.get('nonce')
echostr = request.GET.get('echostr', '')
if DEBUG:
debug_log('==========')
debug_log('token: '+str(token))
debug_log('signature: '+str(signature))
debug_log('timestamp: '+str(timestamp))
debug_log('nonce: '+str(nonce))
if token and timestamp and nonce and echostr:
s = ''.join(sorted([token,timestamp,nonce]))
debug_log('calculated sign: '+hashlib.sha1(s.encode('utf-8')).hexdigest())
debug_log('echostr: '+str(echostr))
wechat = WechatBasic(token=token)
# 对签名进行校验
if wechat.check_signature(signature, timestamp, nonce):
return HttpResponse(echostr)
else:
return HttpResponse('')
if signature:
return weixin_response(token, signature, timestamp, nonce, request.body)
return HttpResponse('<h1>微信开发中 ...</h1>')
"""
开发者ID:bearicc,项目名称:geohomeusa,代码行数:35,代码来源:views.py
示例7: test_parse_data_voice_recognition
def test_parse_data_voice_recognition(self):
message = """<xml>
<ToUserName><![CDATA[toUser]]></ToUserName>
<FromUserName><![CDATA[fromUser]]></FromUserName>
<CreateTime>1357290913</CreateTime>
<MsgType><![CDATA[voice]]></MsgType>
<MediaId><![CDATA[media_id]]></MediaId>
<Format><![CDATA[Format]]></Format>
<Recognition><![CDATA[腾讯微信团队]]></Recognition>
<MsgId>1234567890123456</MsgId>
</xml>"""
wechat = WechatBasic()
wechat.parse_data(data=message)
message = wechat.message
self.assertIsInstance(message, VoiceMessage)
self.assertEqual(message.id, 1234567890123456)
self.assertEqual(message.target, 'toUser')
self.assertEqual(message.source, 'fromUser')
self.assertEqual(message.time, 1357290913)
self.assertEqual(message.type, 'voice')
self.assertEqual(message.media_id, 'media_id')
self.assertEqual(message.format, 'Format')
self.assertEqual(message.recognition, '腾讯微信团队')
开发者ID:14F,项目名称:wechat-python-sdk,代码行数:25,代码来源:test_basic.py
示例8: test_send_article_message
def test_send_article_message(self):
article_info = [
{
'title': '第一条新闻标题',
'description': '第一条新闻描述,这条新闻没有预览图',
'url': 'http://www.google.com.hk/',
}, {
'title': '第二条新闻标题, 这条新闻无描述',
'picurl': 'http://doraemonext.oss-cn-hangzhou.aliyuncs.com/test/wechat-test.jpg',
'url': 'http://www.github.com/',
}, {
'title': '第三条新闻标题',
'description': '第三条新闻描述',
'picurl': 'http://doraemonext.oss-cn-hangzhou.aliyuncs.com/test/wechat-test.jpg',
'url': 'http://www.v2ex.com/',
}
]
# 测试无 appid 和 appsecret 初始化
wechat = WechatBasic()
with self.assertRaises(NeedParamError):
wechat.send_article_message('12341234234', article_info)
# 测试有 appid 和 appsecret 初始化
wechat = WechatBasic(appid=self.appid, appsecret=self.appsecret)
with HTTMock(wechat_api_mock):
resp = wechat.send_article_message('safasf', article_info)
self.assertEqual(resp['errcode'], 0)
self.assertEqual(resp['errmsg'], 'ok')
开发者ID:14F,项目名称:wechat-python-sdk,代码行数:29,代码来源:test_basic.py
示例9: api_wechat
def api_wechat(request):
# 从 request 中提取基本信息 (signature, timestamp, nonce, xml)
signature = request.GET.get('signature')
timestamp = request.GET.get('timestamp')
nonce = request.GET.get('nonce')
xml = request.body
# 实例化 WechatBasic 并检验合法性
wechat_instance = WechatBasic(token='CCPL')
# if not wechat_instance.check_signature(signature=signature, timestamp=timestamp, nonce=nonce):
# return HttpResponseBadRequest('Verify Failed')
# 解析本次请求的 XML 数据
try:
wechat_instance.parse_data(data=xml)
message = wechat_instance.get_message() # 获取解析好的微信请求信息
print message
except ParseError:
print xml
return HttpResponseBadRequest('Invalid XML Data')
ret = u'感谢关注“心理地图”!欢迎您登陆我们的网站:http://ccpl.psych.ac.cn/PsyMap/ (网站功能还在开发中,敬请期待。)' #message.type
response = wechat_instance.response_text(ret)
return HttpResponse(response)
开发者ID:CCPLab,项目名称:PsyMap,代码行数:25,代码来源:wechat.py
示例10: sendMessage
def sendMessage():
global conversationStatusList
body = request.get_json()
platform = body['platform']
conversationID = body['conversation_id']
contentType = body['content_type']
content = body['content']
appID = body['app_id']
secret = body['secret']
userID = body['user_id']
temp = conversationID.split(',')
if temp[0] == userID:
accountID = temp[1]
else:
accountID = temp[0]
if conversationID not in conversationStatusList[platform]:
return '', 503
elif conversationStatusList[platform][conversationID] == 'agent':
if contentType.lower() == 'text':
if platform == 'WeChat':
tempConf = WechatConf(appid=appID, appsecret=secret, encrypt_mode='normal')
tempWechat = WechatBasic(conf=tempConf)
tempWechat.send_text_message(user_id=userID, content=content)
print '[' + platform + ']Message sent to user: ' + userID
t = Thread(target=utilities.forwardUserMessage,
args=(platform, 'agent', conversationID, '', accountID, userID, content, str(datetime.now().isoformat()[:-7]) + 'Z'))
t.start()
return '', 200
else:
return '', 503
else:
return '', 503
开发者ID:renhaocui,项目名称:Social_Conversation_Connector,代码行数:34,代码来源:wechatProcess.py
示例11: bind_saler
def bind_saler():
"""绑定收银台"""
bid = int(request.args.get("bid", 0))
do = request.args.get("do", 0)
brand = Brand.query.get(bid)
salers = brand.brand_salers.count()
users = brand.brandaccounts
shop_id = 0
# 获取二维码
wechat = WechatBasic(
appid=current_app.config.get("WECHAT_APPID"), appsecret=current_app.config.get("WECHAT_APPSECRET")
)
temp_data = {
"expire_seconds": 604800,
"action_name": "QR_SCENE",
"action_info": {"scene": {"scene_id": int(str("11") + str(brand.id))}},
}
# data = {"action_name": "QR_LIMIT_SCENE", "action_info": {"scene": {"scene_id": int(str("11") + str(brand.id))}}}
get_ticket_data = wechat.create_qrcode(temp_data)
ticket = get_ticket_data.get("ticket")
if do == "download_qrcode":
logging.info("download_qrcode")
response = wechat.show_qrcode(ticket)
response = make_response(response.content)
response.headers["Content-Type"] = "image/jpg"
attachment_name = "attachment; filename=汝州百事优惠圈绑定收银台专用二维码.jpg"
response.headers["Content-Disposition"] = attachment_name
return response
return render_template("shop/bind_saler.html", brand=brand, ticket=ticket, users=users)
开发者ID:yyt030,项目名称:quanduoduo,代码行数:30,代码来源:shop.py
示例12: test_get_group
def test_get_group(self):
# 测试无 appid 和 appsecret 初始化
wechat = WechatBasic()
with self.assertRaises(NeedParamError):
wechat.get_groups()
# 测试有 appid 和 appsecret 初始化
wechat = WechatBasic(appid=self.appid, appsecret=self.appsecret)
with HTTMock(wechat_api_mock):
resp = wechat.get_groups()
self.assertEqual(resp['groups'][0]['id'], 0)
self.assertEqual(resp['groups'][0]['name'], '未分组')
self.assertEqual(resp['groups'][0]['count'], 72596)
self.assertEqual(resp['groups'][1]['id'], 1)
self.assertEqual(resp['groups'][1]['name'], '黑名单')
self.assertEqual(resp['groups'][1]['count'], 36)
self.assertEqual(resp['groups'][2]['id'], 2)
self.assertEqual(resp['groups'][2]['name'], '星标组')
self.assertEqual(resp['groups'][2]['count'], 8)
self.assertEqual(resp['groups'][3]['id'], 104)
self.assertEqual(resp['groups'][3]['name'], '华东媒')
self.assertEqual(resp['groups'][3]['count'], 4)
self.assertEqual(resp['groups'][4]['id'], 106)
self.assertEqual(resp['groups'][4]['name'], '★不测试组★')
self.assertEqual(resp['groups'][4]['count'], 1)
开发者ID:14F,项目名称:wechat-python-sdk,代码行数:25,代码来源:test_basic.py
示例13: weixin
def weixin(request):
wechat = WechatBasic(token=token)
if wechat.check_signature(signature=request.GET['signature'],
timestamp=request.GET['timestamp'],
nonce=request.GET['nonce']):
if request.method == 'GET':
rsp = request.GET.get('echostr', 'error')
else:
wechat.parse_data(request.body)
message = wechat.get_message()
# write_log(message.type)
if message.type == u'text':
response_content = 'this is text'
# response_content = handle_text(message.content)
if message.content == u'wechat':
# response_content = 'hello wechat'
rsp = wechat.response_text(u'^_^')
elif re.findall(u'锐捷', message.content):
rsp = wechat.response_news(ruijie_response)
else:
rsp = wechat.response_news(default_response)
else:
# rsp = wechat.response_text(u'消息类型: {}'.format(message.type))
rsp = wechat.response_news(default_response)
else:
rsp = wechat.response_text('check error')
return HttpResponse(rsp)
开发者ID:pierre94,项目名称:python-django-cd-sample,代码行数:28,代码来源:views.py
示例14: init_wechat_sdk
def init_wechat_sdk():
"""
初始化微信sdk
:return: WechatBasic
"""
access_token = db.get("wechat:access_token")
jsapi_ticket = db.get("wechat:jsapi_ticket")
token_expires_at = db.get("wechat:access_token_expires_at")
ticket_expires_at = db.get("wechat:jsapi_ticket_expires_at")
if all((access_token, jsapi_ticket, token_expires_at, ticket_expires_at)):
wechat = WechatBasic(appid=app.config['APP_ID'],
appsecret=app.config['APP_SECRET'],
token=app.config['TOKEN'],
access_token=access_token,
access_token_expires_at=int(token_expires_at),
jsapi_ticket=jsapi_ticket,
jsapi_ticket_expires_at=int(ticket_expires_at))
else:
wechat = WechatBasic(appid=app.config['APP_ID'],
appsecret=app.config['APP_SECRET'],
token=app.config['TOKEN'])
access_token = wechat.get_access_token()
db.set("wechat:access_token", access_token['access_token'], 7000)
db.set("wechat:access_token_expires_at",
access_token['access_token_expires_at'], 7000)
jsapi_ticket = wechat.get_jsapi_ticket()
db.set("wechat:jsapi_ticket", jsapi_ticket['jsapi_ticket'], 7000)
db.set("wechat:jsapi_ticket_expires_at",
jsapi_ticket['jsapi_ticket_expires_at'], 7000)
return wechat
开发者ID:defhook,项目名称:flask-blog,代码行数:31,代码来源:wx_tool.py
示例15: test_parse_data_link_message
def test_parse_data_link_message(self):
message = """<xml>
<ToUserName><![CDATA[toUser]]></ToUserName>
<FromUserName><![CDATA[fromUser]]></FromUserName>
<CreateTime>1351776360</CreateTime>
<MsgType><![CDATA[link]]></MsgType>
<Title><![CDATA[公众平台官网链接]]></Title>
<Description><![CDATA[公众平台官网链接]]></Description>
<Url><![CDATA[url]]></Url>
<MsgId>1234567890123456</MsgId>
</xml>"""
wechat = WechatBasic()
wechat.parse_data(data=message)
message = wechat.message
self.assertIsInstance(message, LinkMessage)
self.assertEqual(message.id, 1234567890123456)
self.assertEqual(message.target, 'toUser')
self.assertEqual(message.source, 'fromUser')
self.assertEqual(message.time, 1351776360)
self.assertEqual(message.type, 'link')
self.assertEqual(message.title, '公众平台官网链接')
self.assertEqual(message.description, '公众平台官网链接')
self.assertEqual(message.url, 'url')
开发者ID:14F,项目名称:wechat-python-sdk,代码行数:25,代码来源:test_basic.py
示例16: test_group_transfer_message
def test_group_transfer_message(self):
wechat = WechatBasic()
wechat.parse_data(data=self.test_message)
resp_xml = wechat.group_transfer_message()
resp = xmltodict.parse(resp_xml)
self.assertEqual(resp['xml']['ToUserName'], 'fromUser')
self.assertEqual(resp['xml']['FromUserName'], 'toUser')
self.assertEqual(resp['xml']['MsgType'], 'transfer_customer_service')
开发者ID:14F,项目名称:wechat-python-sdk,代码行数:9,代码来源:test_basic.py
示例17: post
def post(self):
# 实例化 wechat
wechat = WechatBasic(token=token)
signature = self.get_argument("signature", None)
timestamp = self.get_argument("timestamp", None)
nonce = self.get_argument("nonce", None)
# 对签名进行校验
if wechat.check_signature(signature=signature, timestamp=timestamp, nonce=nonce):
# 对 XML 数据进行解析 (必要, 否则不可执行 response_text, response_image 等操作)
# look request method
# print dir(self.request)
content = self.request.body
wechat.parse_data(content)
# 获得解析结果, message 为 WechatMessage 对象 (wechat_sdk.messages中定义)
message = wechat.get_message()
response = None
if message.type == 'text':
if message.content == 'wechat':
response = wechat.response_text(u'''
<a href="https://open.weixin.qq.com/connect/oauth2/authorize?
appid=wxafac6b4bc457eb26&redirect_uri=http://tuteng.info/login_access&
response_type=code&scope=snsapi_userinfo&state=14#wechat_redirect">testtt</a>
''')
else:
response = wechat.response_text(u'world')
elif message.type == 'image':
response = wechat.response_text(u'image')
else:
response = wechat.response_text(u'no image')
开发者ID:tuteng,项目名称:authorization,代码行数:30,代码来源:test_success.py
示例18: weixin_response
def weixin_response(token, signature, timestamp, nonce, body_text=''):
debug_log('check sign ...\n')
# 用户的请求内容 (Request 中的 Body)
# 请更改 body_text 的内容来测试下面代码的执行情况
# 实例化 wechat
wechat = WechatBasic(token=token)
# 对签名进行校验
if wechat.check_signature(signature, timestamp, nonce):
# 对 XML 数据进行解析 (必要, 否则不可执行 response_text, response_image 等操作)
wechat.parse_data(body_text)
# 获得解析结果, message 为 WechatMessage 对象 (wechat_sdk.messages中定义)
message = wechat.get_message()
response = None
if message.type == 'text':
if message.content == 'wechat':
response = wechat.response_text(u'^_^')
else:
response = wechat.response_text(u'文字')
elif message.type == 'image':
response = wechat.response_text(u'图片')
else:
response = wechat.response_text(u'未知')
# 现在直接将 response 变量内容直接作为 HTTP Response 响应微信服务器即可,此处为了演示返回内容,直接将响应进行输出
return response
return HttpResponse('')
开发者ID:bearicc,项目名称:geohomeusa,代码行数:28,代码来源:views.py
示例19: test_get_group_by_id
def test_get_group_by_id(self):
# 测试无 appid 和 appsecret 初始化
wechat = WechatBasic()
with self.assertRaises(NeedParamError):
wechat.get_group_by_id('13441123412341')
# 测试有 appid 和 appsecret 初始化
wechat = WechatBasic(appid=self.appid, appsecret=self.appsecret)
with HTTMock(wechat_api_mock):
resp = wechat.get_group_by_id('12554647777')
self.assertEqual(resp['groupid'], 102)
开发者ID:14F,项目名称:wechat-python-sdk,代码行数:11,代码来源:test_basic.py
示例20: checkout
def checkout():
verify = False
is_expire = False
usedit = False
discount_id = request.args.get("discount_id", 0, type=int)
record_id = request.args.get("record_id", 0, type=int) # 领取id
do = request.args.get("do")
discount = Discount.query.get_or_404(discount_id)
shops = discount.shops
record = GetTicketRecord.query.get_or_404(record_id)
if record.status == "usedit":
usedit = True
if discount:
expire_date = discount.create_at + timedelta(days=discount.usable)
print discount.create_at, discount.usable
expire_date = expire_date.date()
if datetime.datetime.now().date() > expire_date:
is_expire = True
# 获取二维码ticket
if do == "get_qrcode":
if record.status == "verify":
verify = True
if not record.ticket:
# 获取永久二维码 scene_id前缀12表示是优惠券类型的二维码
wechat = WechatBasic(
appid=current_app.config.get("WECHAT_APPID"), appsecret=current_app.config.get("WECHAT_APPSECRET")
)
data = {
"action_name": "QR_LIMIT_SCENE",
"action_info": {"scene": {"scene_id": int(str("12") + str(record.id))}},
}
get_ticket_data = wechat.create_qrcode(data)
ticket = get_ticket_data.get("ticket")
session["ticket"] = ticket
# 写入数据库
record.ticket = ticket
db.session.add(record)
db.session.commit()
else:
ticket = record.ticket
return json.dumps({"message": {"verify": verify, "ticket": ticket, "expire": 0}})
elif do == "download_qrcode":
return ""
return render_template(
"shop/checkout.html",
shops=shops,
expire_date=expire_date,
is_expire=is_expire,
record_id=record_id,
record=record,
usedit=usedit,
discount=discount,
)
开发者ID:yyt030,项目名称:quanduoduo,代码行数:54,代码来源:shop.py
注:本文中的wechat_sdk.WechatBasic类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论