本文整理汇总了Python中r2.lib.filters._force_utf8函数的典型用法代码示例。如果您正苦于以下问题:Python _force_utf8函数的具体用法?Python _force_utf8怎么用?Python _force_utf8使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了_force_utf8函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: change_to_str
def change_to_str(change_tuple):
attr, newval, oldval = change_tuple
if attr in ('StartDate', 'EndDate'):
newval = date_from_adzerk(newval)
oldval = date_from_adzerk(oldval)
return '%s: %s -> %s' % (attr, _force_utf8(oldval), _force_utf8(newval))
开发者ID:bsdo64,项目名称:reddit-plugin-adzerk,代码行数:8,代码来源:adzerkpromote.py
示例2: _key_from_url
def _key_from_url(cls, url):
if not utils.domain(url) in g.case_sensitive_domains:
keyurl = _force_utf8(UrlParser.base_url(url.lower()))
else:
# Convert only hostname to lowercase
up = UrlParser(url)
up.hostname = up.hostname.lower()
keyurl = _force_utf8(UrlParser.base_url(up.unparse()))
return keyurl
开发者ID:phektus,项目名称:dabuzz,代码行数:9,代码来源:link.py
示例3: send_html_email
def send_html_email(to_addr, from_addr, subject, html, subtype="html", from_full='', session=None):
from r2.lib.filters import _force_utf8
# Open a session if we don't already have one.
if not session:
session = open_smtp_session()
close_session = True
else:
close_session = False
if from_full == '':
from_full = from_addr
# Compose the message headers.
msg = MIMEText(_force_utf8(html), subtype)
msg["Subject"] = subject
msg["From"] = from_full
msg["To"] = to_addr
# Send the mail.
session.sendmail(from_addr, to_addr, msg.as_string())
# Close down the session if we opened it.
if close_session:
session.quit()
开发者ID:new-day-international,项目名称:reddit,代码行数:25,代码来源:emailer.py
示例4: _conv
def _conv(s):
if isinstance(s, str):
return s
elif isinstance(s, unicode):
return _force_utf8(s)
else:
return str(s)
开发者ID:kevinrose,项目名称:diggit,代码行数:7,代码来源:memoize.py
示例5: add_request_info
def add_request_info(select):
from pylons import request
from r2.lib import filters
def sanitize(txt):
return (
_spaces.sub(" ", txt)
.replace("/", "|")
.replace("-", "_")
.replace(";", "")
.replace("*", "")
.replace(r"/", "")
)
s = StringIO.StringIO()
traceback.print_stack(file=s)
tb = s.getvalue()
if tb:
tb = tb.split("\n")[0::2]
tb = [x.split("/")[-1] for x in tb if "/r2/" in x]
tb = "\n".join(tb[-15:-2])
try:
if hasattr(request, "path") and hasattr(request, "ip") and hasattr(request, "user_agent"):
comment = "/*\n%s\n%s\n%s\n*/" % (
tb or "",
filters._force_utf8(sanitize(request.fullpath)),
sanitize(request.ip),
)
return select.prefix_with(comment)
except UnicodeDecodeError:
pass
return select
开发者ID:constantAmateur,项目名称:sciteit,代码行数:33,代码来源:tdb_sql.py
示例6: log_text
def log_text(classification, text=None, level="info"):
"""Send some log text to log_q for appearance in the streamlog.
This is deprecated. All logging should be done through python's stdlib
logging library.
"""
from r2.lib import amqp
from r2.lib.filters import _force_utf8
if text is None:
text = classification
if level not in ("debug", "info", "warning", "error"):
print "What kind of loglevel is %s supposed to be?" % level
level = "error"
d = _default_dict()
d["type"] = "text"
d["level"] = level
d["text"] = _force_utf8(text)
d["classification"] = classification
amqp.add_item(QUEUE_NAME, cPickle.dumps(d))
开发者ID:joealcorn,项目名称:reddit,代码行数:25,代码来源:log.py
示例7: valid_password
def valid_password(a, password):
# bail out early if the account or password's invalid
if not hasattr(a, 'name') or not hasattr(a, 'password') or not password:
return False
# standardize on utf-8 encoding
password = filters._force_utf8(password)
# this is really easy if it's a sexy bcrypt password
if a.password.startswith('$2a$'):
expected_hash = bcrypt.hashpw(password, a.password)
if constant_time_compare(a.password, expected_hash):
return a
return False
# alright, so it's not bcrypt. how old is it?
# if the length of the stored hash is 43 bytes, the sha-1 hash has a salt
# otherwise it's sha-1 with no salt.
salt = ''
if len(a.password) == 43:
salt = a.password[:3]
expected_hash = passhash(a.name, password, salt)
if not constant_time_compare(a.password, expected_hash):
return False
# since we got this far, it's a valid password but in an old format
# let's upgrade it
a.password = bcrypt_password(password)
a._commit()
return a
开发者ID:Chris911,项目名称:reddit,代码行数:31,代码来源:account.py
示例8: process_response
def process_response(self):
data = request.POST
transaction_id = 'RG%s' % data['transaction_id']
pennies = int(data['pennies'])
months = int(data['months'])
status = 'succeeded'
buyer_name = data['buyer']
goldtype = data['goldtype']
buyer = Account._by_name(buyer_name)
blob = {
'goldtype': goldtype,
'account_id': buyer._id,
'account_name': buyer.name,
'status': 'initialized',
}
if goldtype == 'gift':
blob['recipient'] = data['recipient']
giftmessage = data.get('giftmessage', None)
blob['giftmessage'] = _force_utf8(giftmessage)
signed = data.get('signed')
blob['signed'] = True if signed == 'True' else False
passthrough = generate_blob(blob)
return status, passthrough, transaction_id, pennies, months
开发者ID:rolmos,项目名称:reddit,代码行数:30,代码来源:ipn.py
示例9: add_request_info
def add_request_info(select):
from pylons import request
from r2.lib import filters
def sanitize(txt):
return _spaces.sub(' ', txt).replace("/", "|").replace("-", "_").replace(';', "").replace("*", "").replace(r"/", "")
s = StringIO.StringIO()
traceback.print_stack( file = s)
tb = s.getvalue()
if tb:
tb = tb.split('\n')[0::2]
tb = [x.split('/')[-1] for x in tb if "/r2/" in x]
tb = '\n'.join(tb[-15:-2])
try:
if (hasattr(request, 'path') and
hasattr(request, 'ip') and
hasattr(request, 'user_agent')):
comment = '/*\n%s\n%s\n%s\n*/' % (
tb or "",
filters._force_utf8(sanitize(request.fullpath)),
sanitize(request.ip))
return select.prefix_with(comment)
except UnicodeDecodeError:
pass
return select
开发者ID:Chris911,项目名称:reddit,代码行数:25,代码来源:tdb_sql.py
示例10: send_html_email
def send_html_email(to_addr, from_addr, subject, html,
subtype="html", attachments=None):
from r2.lib.filters import _force_utf8
if not attachments:
attachments = []
msg = MIMEMultipart()
msg.attach(MIMEText(_force_utf8(html), subtype))
msg["Subject"] = subject
msg["From"] = from_addr
msg["To"] = to_addr
for attachment in attachments:
part = MIMEBase('application', "octet-stream")
part.set_payload(attachment['contents'])
encoders.encode_base64(part)
part.add_header('Content-Disposition', 'attachment',
filename=attachment['name'])
msg.attach(part)
session = smtplib.SMTP(g.smtp_server)
if g.smtp_username and g.smtp_password:
try:
session.login(g.smtp_username, g.smtp_password)
except (smtplib.SMTPHeloError, smtplib.SMTPAuthenticationError,
smtplib.SMTPException):
print "Failed to login smtp server"
traceback.print_exc(file = sys.stdout)
email.set_sent(rejected = True)
session.sendmail(from_addr, to_addr, msg.as_string())
session.quit()
开发者ID:judys-io,项目名称:reddit,代码行数:31,代码来源:emailer.py
示例11: process_response
def process_response(self):
data = request.POST
transaction_id = 'RG%s' % data['transaction_id']
pennies = int(data['pennies'])
months = int(data['months'])
status = 'succeeded'
goldtype = data['goldtype']
buyer = Account._by_name(data['buyer'])
if goldtype == 'gift':
gift_kw = {
'recipient': Account._by_name(data['recipient']),
'giftmessage': _force_utf8(data.get('giftmessage', None)),
'signed': data.get('signed') == 'True',
}
else:
gift_kw = {}
webhook = Webhook(
transaction_id=transaction_id,
pennies=pennies,
months=months,
goldtype=goldtype,
buyer=buyer,
**gift_kw)
return status, webhook
开发者ID:jakesyl,项目名称:reddit,代码行数:28,代码来源:ipn.py
示例12: process_response
def process_response(self):
data = request.POST
transaction_id = "RG%s" % data["transaction_id"]
pennies = int(data["pennies"])
months = int(data["months"])
status = "succeeded"
buyer_name = data["buyer"]
goldtype = data["goldtype"]
buyer = Account._by_name(buyer_name)
blob = {"goldtype": goldtype, "account_id": buyer._id, "account_name": buyer.name, "status": "initialized"}
if goldtype == "gift":
blob["recipient"] = data["recipient"]
giftmessage = data.get("giftmessage", None)
blob["giftmessage"] = _force_utf8(giftmessage)
signed = data.get("signed")
blob["signed"] = True if signed == "True" else False
passthrough = generate_blob(blob)
return status, passthrough, transaction_id, pennies, months
开发者ID:new-day-international,项目名称:reddit,代码行数:25,代码来源:ipn.py
示例13: by_url_key
def by_url_key(cls, url):
maxlen = 250
template = "byurl(%s,%s)"
keyurl = _force_utf8(base_url(url.lower()))
hexdigest = md5(keyurl).hexdigest()
usable_len = maxlen - len(template) - len(hexdigest)
return template % (hexdigest, keyurl[:usable_len])
开发者ID:szimpatikus,项目名称:szimpatikus.hu,代码行数:7,代码来源:link.py
示例14: get_domain_links
def get_domain_links(domain, sort, time):
from r2.lib.db import operators
q = Link._query(operators.domain(Link.c.url) == filters._force_utf8(domain), sort=db_sort(sort), data=True)
if time != "all":
q._filter(db_times[time])
return make_results(q)
开发者ID:ap0rnnstar,项目名称:reddit,代码行数:8,代码来源:queries.py
示例15: run
def run(self, username):
if username:
try:
name = _force_utf8(username)
return Account._by_name(name)
except (TypeError, UnicodeEncodeError, NotFound):
return self.error(errors.USER_DOESNT_EXIST)
self.error()
开发者ID:camspiers,项目名称:lesswrong,代码行数:8,代码来源:validator.py
示例16: add
def add(cls, link, text):
name = c.user.name if c.user_is_loggedin else "<AUTOMATED>"
now = datetime.now(g.tz).strftime("%Y-%m-%d %H:%M:%S")
text = "[%s: %s] %s" % (name, now, text)
rowkey = cls._rowkey(link)
column = {uuid1(): filters._force_utf8(text)}
cls._set_values(rowkey, column)
return text
开发者ID:dinxx,项目名称:reddit,代码行数:8,代码来源:promo.py
示例17: _get_encrypted_user_slug
def _get_encrypted_user_slug():
"""Return an encrypted string containing context info."""
data = [
c.user._id36 if c.user_is_loggedin else "",
get_srpath(),
c.lang or "",
c.cname,
]
return encrypt("|".join(_force_utf8(s) for s in data))
开发者ID:AlanWasTaken,项目名称:reddit-to-mobile,代码行数:9,代码来源:tracking.py
示例18: add_trackers
def add_trackers(items, sr, adserver_click_urls=None):
"""Add tracking names and hashes to a list of wrapped promoted links."""
adserver_click_urls = adserver_click_urls or {}
for item in items:
if not item.promoted:
continue
if item.campaign is None:
item.campaign = NO_CAMPAIGN
tracking_name_fields = [item.fullname, item.campaign]
if not isinstance(sr, FakeSubreddit):
tracking_name_fields.append(sr.name)
tracking_name = '-'.join(tracking_name_fields)
# construct the impression pixel url
pixel_mac = hmac.new(
g.tracking_secret, tracking_name, hashlib.sha1).hexdigest()
pixel_query = {
"id": tracking_name,
"hash": pixel_mac,
"r": random.randint(0, 2147483647), # cachebuster
}
item.imp_pixel = update_query(g.adtracker_url, pixel_query)
if item.third_party_tracking:
item.third_party_tracking_url = item.third_party_tracking
if item.third_party_tracking_2:
item.third_party_tracking_url_2 = item.third_party_tracking_2
# construct the click redirect url
item_url = adserver_click_urls.get(item.campaign) or item.url
url = _force_utf8(item_url)
hashable = ''.join((url, tracking_name.encode("utf-8")))
click_mac = hmac.new(
g.tracking_secret, hashable, hashlib.sha1).hexdigest()
click_query = {
"id": tracking_name,
"hash": click_mac,
"url": url,
}
click_url = update_query(g.clicktracker_url, click_query)
# overwrite the href_url with redirect click_url
item.href_url = click_url
# also overwrite the permalink url with redirect click_url for selfposts
if item.is_self:
item.permalink = click_url
else:
# add encrypted click url to the permalink for comments->click
item.permalink = update_query(item.permalink, {
"click_url": url,
"click_hash": get_click_url_hmac(item, url),
})
开发者ID:pra85,项目名称:reddit,代码行数:56,代码来源:promote.py
示例19: extract_urls_from_markdown
def extract_urls_from_markdown(md):
"Extract URLs that will be hot links from a piece of raw Markdown."
html = snudown.markdown(_force_utf8(md))
links = SoupStrainer("a")
for link in BeautifulSoup(html, parseOnlyThese=links):
url = link.get('href')
if url:
yield url
开发者ID:HerculesCE,项目名称:reddit,代码行数:10,代码来源:utils.py
示例20: send_html_email
def send_html_email(to_addr, from_addr, subject, html, subtype="html"):
from r2.lib.filters import _force_utf8
msg = MIMEText(_force_utf8(html), subtype)
msg["Subject"] = subject
msg["From"] = from_addr
msg["To"] = to_addr
session = smtplib.SMTP(g.smtp_server)
session.sendmail(from_addr, to_addr, msg.as_string())
session.quit()
开发者ID:0xcd03,项目名称:reddit,代码行数:10,代码来源:emailer.py
注:本文中的r2.lib.filters._force_utf8函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论