本文整理汇总了Python中r2.lib.amqp.handle_items函数的典型用法代码示例。如果您正苦于以下问题:Python handle_items函数的具体用法?Python handle_items怎么用?Python handle_items使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了handle_items函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: consume_subreddit_query_queue
def consume_subreddit_query_queue(qname="subreddit_query_q", limit=1000):
@g.stats.amqp_processor(qname)
def process_message(msgs, chan):
"""Update get_links(), the Links by Subreddit precomputed query.
get_links() is a CachedResult which is stored in permacache. To
update these objects we need to do a read-modify-write which requires
obtaining a lock. Sharding these updates by subreddit allows us to run
multiple consumers (but ideally just one per shard) to avoid lock
contention.
"""
from r2.lib.db.queries import add_queries, get_links
link_names = {msg.body for msg in msgs}
links = Link._by_fullname(link_names, return_dict=False)
print 'Processing %r' % (links,)
links_by_sr_id = defaultdict(list)
for link in links:
links_by_sr_id[link.sr_id].append(link)
srs_by_id = Subreddit._byID(links_by_sr_id.keys(), stale=True)
for sr_id, links in links_by_sr_id.iteritems():
with g.stats.get_timer("link_vote_processor.subreddit_queries"):
sr = srs_by_id[sr_id]
add_queries(
queries=[get_links(sr, sort, "all") for sort in SORTS],
insert_items=links,
)
amqp.handle_items(qname, process_message, limit=limit)
开发者ID:13steinj,项目名称:reddit,代码行数:34,代码来源:voting.py
示例2: run_changed
def run_changed(drain=False):
"""
Run by `cron` (through `paster run`) on a schedule to update
all Things that have been created or have changed since the
last run. Note: unlike many queue-using functions, this one is
run from cron and totally drains the queue before terminating
"""
@g.stats.amqp_processor('solrsearch_changes')
def _run_changed(msgs, chan):
print "changed: Processing %d items" % len(msgs)
msgs = [strordict_fullname(msg.body)
for msg in msgs]
fullnames = set(msg['fullname'] for msg in msgs if not msg.get('boost_only'))
things = Thing._by_fullname(fullnames, data=True, return_dict=False)
things = [x for x in things if isinstance(x, indexed_types)]
update_things = [x for x in things if not x._spam and not x._deleted]
delete_things = [x for x in things if x._spam or x._deleted]
with SolrConnection() as s:
if update_things:
tokenized = tokenize_things(update_things)
s.add(tokenized)
if delete_things:
for i in delete_things:
s.delete(id=i._fullname)
amqp.handle_items('solrsearch_changes', _run_changed, limit=1000,
drain=drain)
开发者ID:ProfNandaa,项目名称:reddit,代码行数:30,代码来源:solrsearch.py
示例3: run_changed
def run_changed(drain=False, limit=100, sleep_time=10, verbose=False):
"""reddit-consumer-update_promos: amqp consumer of update_promos_q
Handles asynch accepting/rejecting of ads that are scheduled to be live
right now
"""
@g.stats.amqp_processor(UPDATE_QUEUE)
def _run(msgs, chan):
items = [json.loads(msg.body) for msg in msgs]
if QUEUE_ALL in items:
# QUEUE_ALL is just an indicator to run make_daily_promotions.
# There's no promotion log to update in this case.
items.remove(QUEUE_ALL)
make_daily_promotions()
links = Link._by_fullname([i["link"] for i in items])
for item in items:
PromotionLog.add(
links[c.link_id],
"Finished remaking current promotions (this link " " was: %(message)s" % item,
commit=True,
)
amqp.handle_items(UPDATE_QUEUE, _run, limit=limit, drain=drain, sleep_time=sleep_time, verbose=verbose)
开发者ID:chrisrote,项目名称:reddit,代码行数:25,代码来源:promote.py
示例4: process_comment_sorts
def process_comment_sorts(limit=500):
def _handle_sort(msgs, chan):
cids = list(set(int(msg.body) for msg in msgs))
comments = Comment._byID(cids, data = True, return_dict = False)
print comments
update_comment_votes(comments)
amqp.handle_items('commentsort_q', _handle_sort, limit = limit)
开发者ID:sjuxax,项目名称:reddit,代码行数:8,代码来源:queries.py
示例5: run_changed
def run_changed(drain=False, limit=1000):
"""
Run by `cron` (through `paster run`) on a schedule to send Things to
IndexTank
"""
def _run_changed(msgs, chan):
start = datetime.now(g.tz)
changed = map(lambda x: strordict_fullname(x.body), msgs)
boost = set()
add = set()
# an item can request that only its boost fields be updated,
# so we need to separate those out
for item in changed:
fname = item['fullname']
boost_only = item.get('boost_only', False)
if fname in add:
# we're already going to do all of the work
continue
#boo if boost_only:
if False:
boost.add(fname)
else:
if fname in boost:
# we've previously seen an instance of this fname
# that requested that only its boosts be updated,
# but now we have to update the whole thing
boost.remove(fname)
add.add(fname)
things = Thing._by_fullname(boost | add, data=True, return_dict=True)
boost_time = add_time = 0.0
if boost:
boost_time = inject([things[fname] for fname in boost], boost_only=True)
if add:
add_time = inject([things[fname] for fname in add])
totaltime = epoch_seconds(datetime.now(g.tz)) - epoch_seconds(start)
print ("%s: %d messages: %d docs (%.2fs), %d boosts (%.2fs) in %.2fs (%d duplicates, %s remaining)"
% (start,
len(changed),
len(add), add_time,
len(boost), boost_time,
totaltime,
len(changed) - len(things),
msgs[-1].delivery_info.get('message_count', 'unknown'),
))
amqp.handle_items('indextank_changes', _run_changed, limit=limit,
drain=drain, verbose=False)
开发者ID:xolar,项目名称:proddit,代码行数:58,代码来源:indextank.py
示例6: process_events
def process_events(g, timeout=5.0, **kw):
publisher = EventPublisher(
g.events_collector_url,
g.secrets["events_collector_key"],
g.secrets["events_collector_secret"],
g.useragent,
g.stats,
timeout=timeout,
)
test_publisher = EventPublisher(
g.events_collector_test_url,
g.secrets["events_collector_key"],
g.secrets["events_collector_secret"],
g.useragent,
g.stats,
timeout=timeout,
)
@g.stats.amqp_processor("event_collector")
def processor(msgs, chan):
events = []
test_events = []
for msg in msgs:
headers = msg.properties.get("application_headers", {})
truncatable_field = headers.get("truncatable_field")
event = PublishableEvent(msg.body, truncatable_field)
if msg.delivery_info["routing_key"] == "event_collector_test":
test_events.append(event)
else:
events.append(event)
to_publish = itertools.chain(
publisher.publish(events),
test_publisher.publish(test_events),
)
for response, sent in to_publish:
if response.ok:
g.log.info("Published %s events", len(sent))
else:
g.log.warning(
"Event send failed %s - %s",
response.status_code,
_get_reason(response),
)
g.log.warning("Response headers: %r", response.headers)
# if the events were too large, move them into a separate
# queue to get them out of here, since they'll always fail
if response.status_code == 413:
for event in sent:
amqp.add_item("event_collector_failed", event)
else:
response.raise_for_status()
amqp.handle_items("event_collector", processor, **kw)
开发者ID:HeliumSquid,项目名称:reddit,代码行数:57,代码来源:eventcollector.py
示例7: run_changed
def run_changed(self, drain=False, min_size=int(getattr(g, 'SOLR_MIN_BATCH', 500)), limit=1000, sleep_time=10,
use_safe_get=False, verbose=False):
'''Run by `cron` (through `paster run`) on a schedule to send Things to Cloud
'''
if use_safe_get:
CloudSearchUploader.use_safe_get = True
amqp.handle_items('cloudsearch_changes', _run_changed, min_size=min_size,
limit=limit, drain=drain, sleep_time=sleep_time,
verbose=verbose)
开发者ID:judys-io,项目名称:reddit,代码行数:9,代码来源:cloudsearch.py
示例8: run_commentstree
def run_commentstree(limit=100):
"""Add new incoming comments to their respective comments trees"""
@g.stats.amqp_processor("commentstree_q")
def _run_commentstree(msgs, chan):
comments = Comment._by_fullname([msg.body for msg in msgs], data=True, return_dict=False)
print "Processing %r" % (comments,)
add_comment_tree(comments)
amqp.handle_items("commentstree_q", _run_commentstree, limit=limit)
开发者ID:ap0rnnstar,项目名称:reddit,代码行数:11,代码来源:queries.py
示例9: run_changed
def run_changed(drain=False, min_size=500, limit=1000, sleep_time=10,
use_safe_get=False, verbose=False):
'''Run by `cron` (through `paster run`) on a schedule to send Things to
Amazon CloudSearch
'''
if use_safe_get:
CloudSearchUploader.use_safe_get = True
amqp.handle_items('cloudsearch_changes', _run_changed, min_size=min_size,
limit=limit, drain=drain, sleep_time=sleep_time,
verbose=verbose)
开发者ID:shannonyu,项目名称:reddit,代码行数:11,代码来源:cloudsearch.py
示例10: run_new_comments
def run_new_comments():
"""Add new incoming comments to the /comments page"""
# this is done as a queue because otherwise the contention for the
# lock on the query would be very high
def _run_new_comments(msgs, chan):
fnames = [msg.body for msg in msgs]
comments = Comment._by_fullname(fnames, data=True, return_dict=False)
add_queries([get_all_comments()], insert_items=comments)
amqp.handle_items("newcomments_q", _run_new_comments, limit=100)
开发者ID:denrobapps,项目名称:Reddit-VM,代码行数:12,代码来源:queries.py
示例11: run_changed
def run_changed(drain=False):
"""
Run by `cron` (through `paster run`) on a schedule to send Things to
IndexTank
"""
def _run_changed(msgs, chan):
fullnames = set([x.body for x in msgs])
things = Thing._by_fullname(fullnames, data=True, return_dict=False)
inject(things)
amqp.handle_items('indextank_changes', _run_changed, limit=1000,
drain=drain)
开发者ID:codetripping,项目名称:reddit,代码行数:12,代码来源:indextankupdate.py
示例12: run
def run(limit=1000, verbose=False):
def myfunc(msgs, chan):
if verbose:
print "processing a batch"
incrs = {}
for msg in msgs:
try:
d = check_dict(msg.body)
except TypeError:
log_text("usage_q error", "wtf is %r" % msg.body, "error")
continue
hund_sec = hund_from_start_and_end(d["start_time"], d["end_time"])
action = d["action"].replace("-", "_")
fudged_count = int( 1 / d["sampling_rate"])
fudged_elapsed = int(hund_sec / d["sampling_rate"])
for exp_time, bucket in buckets(d["end_time"]):
k = "%s-%s" % (bucket, action)
incrs.setdefault(k, [0, 0, exp_time])
incrs[k][0] += fudged_count
incrs[k][1] += fudged_elapsed
for k, (count, elapsed, exp_time) in incrs.iteritems():
c_key = "profile_count-" + k
e_key = "profile_elapsed-" + k
if verbose:
c_old = g.hardcache.get(c_key)
e_old = g.hardcache.get(e_key)
g.hardcache.accrue(c_key, delta=count, time=exp_time)
g.hardcache.accrue(e_key, delta=elapsed, time=exp_time)
if verbose:
c_new = g.hardcache.get(c_key)
e_new = g.hardcache.get(e_key)
print "%s: %s -> %s" % (c_key, c_old, c_new)
print "%s: %s -> %s" % (e_key, e_old, e_new)
if len(msgs) < limit / 2:
if verbose:
print "Sleeping..."
sleep (10)
amqp.handle_items(q, myfunc, limit=limit, drain=False, verbose=verbose,
sleep_time = 30)
开发者ID:3river,项目名称:reddit,代码行数:51,代码来源:usage_q.py
示例13: run_new_comments
def run_new_comments(limit=1000):
"""Add new incoming comments to the /comments page"""
# this is done as a queue because otherwise the contention for the
# lock on the query would be very high
@g.stats.amqp_processor("newcomments_q")
def _run_new_comments(msgs, chan):
fnames = [msg.body for msg in msgs]
comments = Comment._by_fullname(fnames, data=True, return_dict=False)
add_queries([get_all_comments()], insert_items=comments)
bysrid = _by_srid(comments, False)
for srid, sr_comments in bysrid.iteritems():
add_queries([_get_sr_comments(srid)], insert_items=sr_comments)
amqp.handle_items("newcomments_q", _run_new_comments, limit=limit)
开发者ID:ap0rnnstar,项目名称:reddit,代码行数:17,代码来源:queries.py
示例14: process_votes
def process_votes(limit=None):
# limit is taken but ignored for backwards compatibility
def _handle_vote(msgs, chan):
assert(len(msgs) == 1)
msg = msgs[0]
r = pickle.loads(msg.body)
uid, tid, dir, ip, organic, cheater = r
voter = Account._byID(uid, data=True)
votee = Thing._by_fullname(tid, data = True)
print (voter, votee, dir, ip, organic, cheater)
handle_vote(voter, votee, dir, ip, organic,
cheater = cheater)
amqp.handle_items('register_vote_q', _handle_vote)
开发者ID:codetripping,项目名称:reddit,代码行数:18,代码来源:queries.py
示例15: run_changed
def run_changed(drain=False, min_size=1, limit=1000, sleep_time=10,
use_safe_get=False, verbose=False):
'''Run by `cron` (through `paster run`) on a schedule to send Things to
Amazon CloudSearch
'''
@g.stats.amqp_processor('cloudsearch_changes_q')
def _run_changed(msgs, chan):
'''Consume the cloudsearch_changes_q queue, and print reporting information
on how long it took and how many remain
'''
start = datetime.now(g.tz)
changed = [pickle.loads(msg.body) for msg in msgs]
fullnames = set()
fullnames.update(LinkUploader.desired_fullnames(changed))
fullnames.update(SubredditUploader.desired_fullnames(changed))
things = Thing._by_fullname(fullnames, data=True, return_dict=False)
link_uploader = LinkUploader(g.CLOUDSEARCH_DOC_API, things=things)
subreddit_uploader = SubredditUploader(g.CLOUDSEARCH_SUBREDDIT_DOC_API,
things=things)
link_time = link_uploader.inject()
subreddit_time = subreddit_uploader.inject()
cloudsearch_time = link_time + subreddit_time
totaltime = (datetime.now(g.tz) - start).total_seconds()
print ("%s: %d messages in %.2fs seconds (%.2fs secs waiting on "
"cloudsearch); %d duplicates, %s remaining)" %
(start, len(changed), totaltime, cloudsearch_time,
len(changed) - len(things),
msgs[-1].delivery_info.get('message_count', 'unknown')))
if use_safe_get:
CloudSearchUploader.use_safe_get = True
amqp.handle_items('cloudsearch_changes_q', _run_changed, min_size=min_size,
limit=limit, drain=drain, sleep_time=sleep_time,
verbose=verbose)
开发者ID:caseypatrickdriscoll,项目名称:reddit,代码行数:44,代码来源:cloudsearch.py
示例16: run
def run():
def callback(msgs, chan):
for msg in msgs: # will be len==1
# cr is a r2.lib.db.queries.CachedResults
cr = pickle.loads(msg.body)
iden = cr.query._iden()
working_key = working_prefix + iden
key = prefix + iden
last_time = g.memcache.get(key)
# check to see if we've computed this job since it was
# added to the queue
if last_time and last_time > msg.timestamp:
print 'skipping, already computed ', key
return
if not cr.preflight_check():
print 'skipping, preflight check failed', key
return
# check if someone else is working on this
elif not g.memcache.add(working_key, 1, TIMEOUT):
print 'skipping, someone else is working', working_key
return
print 'working: ', iden, cr.query._rules, cr.query._sort
start = datetime.now()
try:
cr.update()
g.memcache.set(key, datetime.now())
cr.postflight()
finally:
g.memcache.delete(working_key)
done = datetime.now()
q_time_s = (done - msg.timestamp).seconds
proc_time_s = (done - start).seconds + ((done - start).microseconds/1000000.0)
print ('processed %s in %.6f seconds after %d seconds in queue'
% (iden, proc_time_s, q_time_s))
amqp.handle_items('prec_links', callback, limit = 1)
开发者ID:denrobapps,项目名称:Reddit-VM,代码行数:44,代码来源:query_queue.py
示例17: run
def run():
def process_msgs(msgs, chan):
def _process_link(fname):
link = Link._by_fullname(fname, data=True, return_dict=False)
set_media(link)
for msg in msgs:
fname = msg.body
try:
TimeoutFunction(_process_link, 30)(fname)
except TimeoutFunctionException:
print "Timed out on %s" % fname
except KeyboardInterrupt:
raise
except:
print "Error fetching %s" % fname
print traceback.format_exc()
amqp.handle_items('scraper_q', process_msgs, limit=1)
开发者ID:denrobapps,项目名称:Reddit-VM,代码行数:19,代码来源:media.py
示例18: run_changed
def run_changed(drain=False, limit=100, sleep_time=10, verbose=True):
"""reddit-consumer-update_promos: amqp consumer of update_promos_q
Handles asynch accepting/rejecting of ads that are scheduled to be live
right now
"""
@g.stats.amqp_processor(UPDATE_QUEUE)
def _run(msgs, chan):
items = [json.loads(msg.body) for msg in msgs]
if QUEUE_ALL in items:
# QUEUE_ALL is just an indicator to run make_daily_promotions.
# There's no promotion log to update in this case.
print "Received %s QUEUE_ALL message(s)" % items.count(QUEUE_ALL)
items = [i for i in items if i != QUEUE_ALL]
make_daily_promotions()
amqp.handle_items(UPDATE_QUEUE, _run, limit=limit, drain=drain,
sleep_time=sleep_time, verbose=verbose)
开发者ID:Lax100,项目名称:reddit,代码行数:19,代码来源:promote.py
示例19: process_votes
def process_votes(limit=1000):
# limit is taken but ignored for backwards compatibility
def _handle_vote(msgs, chan):
# assert(len(msgs) == 1)
comments = []
for msg in msgs:
r = pickle.loads(msg.body)
uid, tid, dir, ip, organic, cheater = r
voter = Account._byID(uid, data=True)
votee = Thing._by_fullname(tid, data=True)
if isinstance(votee, Comment):
comments.append(votee)
print (voter, votee, dir, ip, organic, cheater)
handle_vote(voter, votee, dir, ip, organic, cheater=cheater)
update_comment_votes(comments)
amqp.handle_items("register_vote_q", _handle_vote, limit=limit)
开发者ID:rmasters,项目名称:reddit,代码行数:21,代码来源:queries.py
示例20: run_commentstree
def run_commentstree():
"""Add new incoming comments to their respective comments trees"""
def _run_commentstree(msgs, chan):
fnames = [msg.body for msg in msgs]
comments = Comment._by_fullname(fnames, data=True, return_dict=False)
links = Link._byID(set(cm.link_id for cm in comments), data=True, return_dict=True)
# add the comment to the comments-tree
for comment in comments:
l = links[comment.link_id]
try:
add_comment_tree(comment, l)
except KeyError:
# Hackity hack. Try to recover from a corrupted
# comment tree
print "Trying to fix broken comments-tree."
link_comments(l._id, _update=True)
add_comment_tree(comment, l)
amqp.handle_items("commentstree_q", _run_commentstree, limit=1)
开发者ID:denrobapps,项目名称:Reddit-VM,代码行数:22,代码来源:queries.py
注:本文中的r2.lib.amqp.handle_items函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论