• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python amqp.handle_items函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中r2.lib.amqp.handle_items函数的典型用法代码示例。如果您正苦于以下问题:Python handle_items函数的具体用法?Python handle_items怎么用?Python handle_items使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了handle_items函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: consume_subreddit_query_queue

def consume_subreddit_query_queue(qname="subreddit_query_q", limit=1000):
    @g.stats.amqp_processor(qname)
    def process_message(msgs, chan):
        """Update get_links(), the Links by Subreddit precomputed query.

        get_links() is a CachedResult which is stored in permacache. To
        update these objects we need to do a read-modify-write which requires
        obtaining a lock. Sharding these updates by subreddit allows us to run
        multiple consumers (but ideally just one per shard) to avoid lock
        contention.

        """

        from r2.lib.db.queries import add_queries, get_links

        link_names = {msg.body for msg in msgs}
        links = Link._by_fullname(link_names, return_dict=False)
        print 'Processing %r' % (links,)

        links_by_sr_id = defaultdict(list)
        for link in links:
            links_by_sr_id[link.sr_id].append(link)

        srs_by_id = Subreddit._byID(links_by_sr_id.keys(), stale=True)

        for sr_id, links in links_by_sr_id.iteritems():
            with g.stats.get_timer("link_vote_processor.subreddit_queries"):
                sr = srs_by_id[sr_id]
                add_queries(
                    queries=[get_links(sr, sort, "all") for sort in SORTS],
                    insert_items=links,
                )

    amqp.handle_items(qname, process_message, limit=limit)
开发者ID:13steinj,项目名称:reddit,代码行数:34,代码来源:voting.py


示例2: run_changed

def run_changed(drain=False):
    """
        Run by `cron` (through `paster run`) on a schedule to update
        all Things that have been created or have changed since the
        last run. Note: unlike many queue-using functions, this one is
        run from cron and totally drains the queue before terminating
    """
    @g.stats.amqp_processor('solrsearch_changes')
    def _run_changed(msgs, chan):
        print "changed: Processing %d items" % len(msgs)
        msgs = [strordict_fullname(msg.body)
                for msg in msgs]
        fullnames = set(msg['fullname'] for msg in msgs if not msg.get('boost_only'))

        things = Thing._by_fullname(fullnames, data=True, return_dict=False)
        things = [x for x in things if isinstance(x, indexed_types)]

        update_things = [x for x in things if not x._spam and not x._deleted]
        delete_things = [x for x in things if x._spam or x._deleted]

        with SolrConnection() as s:
            if update_things:
                tokenized = tokenize_things(update_things)
                s.add(tokenized)
            if delete_things:
                for i in delete_things:
                    s.delete(id=i._fullname)

    amqp.handle_items('solrsearch_changes', _run_changed, limit=1000,
                      drain=drain)
开发者ID:ProfNandaa,项目名称:reddit,代码行数:30,代码来源:solrsearch.py


示例3: run_changed

def run_changed(drain=False, limit=100, sleep_time=10, verbose=False):
    """reddit-consumer-update_promos: amqp consumer of update_promos_q
    
    Handles asynch accepting/rejecting of ads that are scheduled to be live
    right now
    
    """

    @g.stats.amqp_processor(UPDATE_QUEUE)
    def _run(msgs, chan):
        items = [json.loads(msg.body) for msg in msgs]
        if QUEUE_ALL in items:
            # QUEUE_ALL is just an indicator to run make_daily_promotions.
            # There's no promotion log to update in this case.
            items.remove(QUEUE_ALL)
        make_daily_promotions()
        links = Link._by_fullname([i["link"] for i in items])
        for item in items:
            PromotionLog.add(
                links[c.link_id],
                "Finished remaking current promotions (this link " " was: %(message)s" % item,
                commit=True,
            )

    amqp.handle_items(UPDATE_QUEUE, _run, limit=limit, drain=drain, sleep_time=sleep_time, verbose=verbose)
开发者ID:chrisrote,项目名称:reddit,代码行数:25,代码来源:promote.py


示例4: process_comment_sorts

def process_comment_sorts(limit=500):
    def _handle_sort(msgs, chan):
        cids = list(set(int(msg.body) for msg in msgs))
        comments = Comment._byID(cids, data = True, return_dict = False)
        print comments
        update_comment_votes(comments)

    amqp.handle_items('commentsort_q', _handle_sort, limit = limit)
开发者ID:sjuxax,项目名称:reddit,代码行数:8,代码来源:queries.py


示例5: run_changed

def run_changed(drain=False, limit=1000):
    """
        Run by `cron` (through `paster run`) on a schedule to send Things to
        IndexTank
    """
    def _run_changed(msgs, chan):
        start = datetime.now(g.tz)

        changed = map(lambda x: strordict_fullname(x.body), msgs)

        boost = set()
        add = set()

        # an item can request that only its boost fields be updated,
        # so we need to separate those out

        for item in changed:
            fname = item['fullname']
            boost_only = item.get('boost_only', False)

            if fname in add:
                # we're already going to do all of the work
                continue

            #boo if boost_only:
            if False:
                boost.add(fname)
            else:
                if fname in boost:
                    # we've previously seen an instance of this fname
                    # that requested that only its boosts be updated,
                    # but now we have to update the whole thing
                    boost.remove(fname)

                add.add(fname)

        things = Thing._by_fullname(boost | add, data=True, return_dict=True)

        boost_time = add_time = 0.0
        if boost:
            boost_time = inject([things[fname] for fname in boost], boost_only=True)
        if add:
            add_time = inject([things[fname] for fname in add])

        totaltime = epoch_seconds(datetime.now(g.tz)) - epoch_seconds(start)

        print ("%s: %d messages: %d docs (%.2fs), %d boosts (%.2fs) in %.2fs (%d duplicates, %s remaining)"
               % (start,
                  len(changed),
                  len(add), add_time,
                  len(boost), boost_time,
                  totaltime,
                  len(changed) - len(things),
                  msgs[-1].delivery_info.get('message_count', 'unknown'),
                  ))

    amqp.handle_items('indextank_changes', _run_changed, limit=limit,
                      drain=drain, verbose=False)
开发者ID:xolar,项目名称:proddit,代码行数:58,代码来源:indextank.py


示例6: process_events

def process_events(g, timeout=5.0, **kw):
    publisher = EventPublisher(
        g.events_collector_url,
        g.secrets["events_collector_key"],
        g.secrets["events_collector_secret"],
        g.useragent,
        g.stats,
        timeout=timeout,
    )
    test_publisher = EventPublisher(
        g.events_collector_test_url,
        g.secrets["events_collector_key"],
        g.secrets["events_collector_secret"],
        g.useragent,
        g.stats,
        timeout=timeout,
    )

    @g.stats.amqp_processor("event_collector")
    def processor(msgs, chan):
        events = []
        test_events = []

        for msg in msgs:
            headers = msg.properties.get("application_headers", {})
            truncatable_field = headers.get("truncatable_field")

            event = PublishableEvent(msg.body, truncatable_field)
            if msg.delivery_info["routing_key"] == "event_collector_test":
                test_events.append(event)
            else:
                events.append(event)

        to_publish = itertools.chain(
            publisher.publish(events),
            test_publisher.publish(test_events),
        )
        for response, sent in to_publish:
            if response.ok:
                g.log.info("Published %s events", len(sent))
            else:
                g.log.warning(
                    "Event send failed %s - %s",
                    response.status_code,
                    _get_reason(response),
                )
                g.log.warning("Response headers: %r", response.headers)

                # if the events were too large, move them into a separate
                # queue to get them out of here, since they'll always fail
                if response.status_code == 413:
                    for event in sent:
                        amqp.add_item("event_collector_failed", event)
                else:
                    response.raise_for_status()

    amqp.handle_items("event_collector", processor, **kw)
开发者ID:HeliumSquid,项目名称:reddit,代码行数:57,代码来源:eventcollector.py


示例7: run_changed

 def run_changed(self, drain=False, min_size=int(getattr(g, 'SOLR_MIN_BATCH', 500)), limit=1000, sleep_time=10, 
         use_safe_get=False, verbose=False):
     '''Run by `cron` (through `paster run`) on a schedule to send Things to Cloud
     '''
     if use_safe_get:
         CloudSearchUploader.use_safe_get = True
     amqp.handle_items('cloudsearch_changes', _run_changed, min_size=min_size,
                       limit=limit, drain=drain, sleep_time=sleep_time,
                       verbose=verbose)
开发者ID:judys-io,项目名称:reddit,代码行数:9,代码来源:cloudsearch.py


示例8: run_commentstree

def run_commentstree(limit=100):
    """Add new incoming comments to their respective comments trees"""

    @g.stats.amqp_processor("commentstree_q")
    def _run_commentstree(msgs, chan):
        comments = Comment._by_fullname([msg.body for msg in msgs], data=True, return_dict=False)
        print "Processing %r" % (comments,)

        add_comment_tree(comments)

    amqp.handle_items("commentstree_q", _run_commentstree, limit=limit)
开发者ID:ap0rnnstar,项目名称:reddit,代码行数:11,代码来源:queries.py


示例9: run_changed

def run_changed(drain=False, min_size=500, limit=1000, sleep_time=10,
                use_safe_get=False, verbose=False):
    '''Run by `cron` (through `paster run`) on a schedule to send Things to
        Amazon CloudSearch
    
    '''
    if use_safe_get:
        CloudSearchUploader.use_safe_get = True
    amqp.handle_items('cloudsearch_changes', _run_changed, min_size=min_size,
                      limit=limit, drain=drain, sleep_time=sleep_time,
                      verbose=verbose)
开发者ID:shannonyu,项目名称:reddit,代码行数:11,代码来源:cloudsearch.py


示例10: run_new_comments

def run_new_comments():
    """Add new incoming comments to the /comments page"""
    # this is done as a queue because otherwise the contention for the
    # lock on the query would be very high

    def _run_new_comments(msgs, chan):
        fnames = [msg.body for msg in msgs]
        comments = Comment._by_fullname(fnames, data=True, return_dict=False)

        add_queries([get_all_comments()], insert_items=comments)

    amqp.handle_items("newcomments_q", _run_new_comments, limit=100)
开发者ID:denrobapps,项目名称:Reddit-VM,代码行数:12,代码来源:queries.py


示例11: run_changed

def run_changed(drain=False):
    """
        Run by `cron` (through `paster run`) on a schedule to send Things to
        IndexTank
    """
    def _run_changed(msgs, chan):
        fullnames = set([x.body for x in msgs])
        things = Thing._by_fullname(fullnames, data=True, return_dict=False)
        inject(things)

    amqp.handle_items('indextank_changes', _run_changed, limit=1000,
                      drain=drain)
开发者ID:codetripping,项目名称:reddit,代码行数:12,代码来源:indextankupdate.py


示例12: run

def run(limit=1000, verbose=False):
    def myfunc(msgs, chan):
        if verbose:
            print "processing a batch"

        incrs = {}

        for msg in msgs:
            try:
                d = check_dict(msg.body)
            except TypeError:
                log_text("usage_q error", "wtf is %r" % msg.body, "error")
                continue

            hund_sec = hund_from_start_and_end(d["start_time"], d["end_time"])

            action = d["action"].replace("-", "_")

            fudged_count   = int(       1 / d["sampling_rate"])
            fudged_elapsed = int(hund_sec / d["sampling_rate"])

            for exp_time, bucket in buckets(d["end_time"]):
                k = "%s-%s" % (bucket, action)
                incrs.setdefault(k, [0, 0, exp_time])
                incrs[k][0] += fudged_count
                incrs[k][1] += fudged_elapsed

        for k, (count, elapsed, exp_time) in incrs.iteritems():
            c_key = "profile_count-" + k
            e_key = "profile_elapsed-" + k

            if verbose:
                c_old = g.hardcache.get(c_key)
                e_old = g.hardcache.get(e_key)

            g.hardcache.accrue(c_key, delta=count,   time=exp_time)
            g.hardcache.accrue(e_key, delta=elapsed, time=exp_time)

            if verbose:
                c_new = g.hardcache.get(c_key)
                e_new = g.hardcache.get(e_key)

                print "%s: %s -> %s" % (c_key, c_old, c_new)
                print "%s: %s -> %s" % (e_key, e_old, e_new)

        if len(msgs) < limit / 2:
            if verbose:
                print "Sleeping..."
            sleep (10)
    amqp.handle_items(q, myfunc, limit=limit, drain=False, verbose=verbose,
                      sleep_time = 30)
开发者ID:3river,项目名称:reddit,代码行数:51,代码来源:usage_q.py


示例13: run_new_comments

def run_new_comments(limit=1000):
    """Add new incoming comments to the /comments page"""
    # this is done as a queue because otherwise the contention for the
    # lock on the query would be very high

    @g.stats.amqp_processor("newcomments_q")
    def _run_new_comments(msgs, chan):
        fnames = [msg.body for msg in msgs]

        comments = Comment._by_fullname(fnames, data=True, return_dict=False)
        add_queries([get_all_comments()], insert_items=comments)

        bysrid = _by_srid(comments, False)
        for srid, sr_comments in bysrid.iteritems():
            add_queries([_get_sr_comments(srid)], insert_items=sr_comments)

    amqp.handle_items("newcomments_q", _run_new_comments, limit=limit)
开发者ID:ap0rnnstar,项目名称:reddit,代码行数:17,代码来源:queries.py


示例14: process_votes

def process_votes(limit=None):
    # limit is taken but ignored for backwards compatibility

    def _handle_vote(msgs, chan):
        assert(len(msgs) == 1)
        msg = msgs[0]

        r = pickle.loads(msg.body)

        uid, tid, dir, ip, organic, cheater = r
        voter = Account._byID(uid, data=True)
        votee = Thing._by_fullname(tid, data = True)

        print (voter, votee, dir, ip, organic, cheater)
        handle_vote(voter, votee, dir, ip, organic,
                    cheater = cheater)

    amqp.handle_items('register_vote_q', _handle_vote)
开发者ID:codetripping,项目名称:reddit,代码行数:18,代码来源:queries.py


示例15: run_changed

def run_changed(drain=False, min_size=1, limit=1000, sleep_time=10,
                use_safe_get=False, verbose=False):
    '''Run by `cron` (through `paster run`) on a schedule to send Things to
        Amazon CloudSearch
    
    '''

    @g.stats.amqp_processor('cloudsearch_changes_q')
    def _run_changed(msgs, chan):
        '''Consume the cloudsearch_changes_q queue, and print reporting information
        on how long it took and how many remain

        '''
        start = datetime.now(g.tz)

        changed = [pickle.loads(msg.body) for msg in msgs]

        fullnames = set()
        fullnames.update(LinkUploader.desired_fullnames(changed))
        fullnames.update(SubredditUploader.desired_fullnames(changed))

        things = Thing._by_fullname(fullnames, data=True, return_dict=False)

        link_uploader = LinkUploader(g.CLOUDSEARCH_DOC_API, things=things)
        subreddit_uploader = SubredditUploader(g.CLOUDSEARCH_SUBREDDIT_DOC_API,
                                               things=things)

        link_time = link_uploader.inject()
        subreddit_time = subreddit_uploader.inject()
        cloudsearch_time = link_time + subreddit_time

        totaltime = (datetime.now(g.tz) - start).total_seconds()

        print ("%s: %d messages in %.2fs seconds (%.2fs secs waiting on "
               "cloudsearch); %d duplicates, %s remaining)" %
               (start, len(changed), totaltime, cloudsearch_time,
                len(changed) - len(things),
                msgs[-1].delivery_info.get('message_count', 'unknown')))

    if use_safe_get:
        CloudSearchUploader.use_safe_get = True
    amqp.handle_items('cloudsearch_changes_q', _run_changed, min_size=min_size,
                      limit=limit, drain=drain, sleep_time=sleep_time,
                      verbose=verbose)
开发者ID:caseypatrickdriscoll,项目名称:reddit,代码行数:44,代码来源:cloudsearch.py


示例16: run

def run():
    def callback(msgs, chan):
        for msg in msgs: # will be len==1
            # cr is a r2.lib.db.queries.CachedResults
            cr = pickle.loads(msg.body)
            iden = cr.query._iden()

            working_key = working_prefix + iden
            key = prefix + iden

            last_time = g.memcache.get(key)
            # check to see if we've computed this job since it was
            # added to the queue
            if last_time and last_time > msg.timestamp:
                print 'skipping, already computed ', key
                return

            if not cr.preflight_check():
                print 'skipping, preflight check failed', key
                return

            # check if someone else is working on this
            elif not g.memcache.add(working_key, 1, TIMEOUT):
                print 'skipping, someone else is working', working_key
                return

            print 'working: ', iden, cr.query._rules, cr.query._sort
            start = datetime.now()
            try:
                cr.update()
                g.memcache.set(key, datetime.now())

                cr.postflight()

            finally:
                g.memcache.delete(working_key)

            done = datetime.now()
            q_time_s = (done - msg.timestamp).seconds
            proc_time_s = (done - start).seconds + ((done - start).microseconds/1000000.0)
            print ('processed %s in %.6f seconds after %d seconds in queue'
                   % (iden, proc_time_s, q_time_s))

    amqp.handle_items('prec_links', callback, limit = 1)
开发者ID:denrobapps,项目名称:Reddit-VM,代码行数:44,代码来源:query_queue.py


示例17: run

def run():
    def process_msgs(msgs, chan):
        def _process_link(fname):
            link = Link._by_fullname(fname, data=True, return_dict=False)
            set_media(link)

        for msg in msgs:
            fname = msg.body
            try:
                TimeoutFunction(_process_link, 30)(fname)
            except TimeoutFunctionException:
                print "Timed out on %s" % fname
            except KeyboardInterrupt:
                raise
            except:
                print "Error fetching %s" % fname
                print traceback.format_exc()

    amqp.handle_items('scraper_q', process_msgs, limit=1)
开发者ID:denrobapps,项目名称:Reddit-VM,代码行数:19,代码来源:media.py


示例18: run_changed

def run_changed(drain=False, limit=100, sleep_time=10, verbose=True):
    """reddit-consumer-update_promos: amqp consumer of update_promos_q
    
    Handles asynch accepting/rejecting of ads that are scheduled to be live
    right now
    
    """
    @g.stats.amqp_processor(UPDATE_QUEUE)
    def _run(msgs, chan):
        items = [json.loads(msg.body) for msg in msgs]
        if QUEUE_ALL in items:
            # QUEUE_ALL is just an indicator to run make_daily_promotions.
            # There's no promotion log to update in this case.
            print "Received %s QUEUE_ALL message(s)" % items.count(QUEUE_ALL)
            items = [i for i in items if i != QUEUE_ALL]
            make_daily_promotions()

    amqp.handle_items(UPDATE_QUEUE, _run, limit=limit, drain=drain,
                      sleep_time=sleep_time, verbose=verbose)
开发者ID:Lax100,项目名称:reddit,代码行数:19,代码来源:promote.py


示例19: process_votes

def process_votes(limit=1000):
    # limit is taken but ignored for backwards compatibility

    def _handle_vote(msgs, chan):
        # assert(len(msgs) == 1)
        comments = []
        for msg in msgs:
            r = pickle.loads(msg.body)

            uid, tid, dir, ip, organic, cheater = r
            voter = Account._byID(uid, data=True)
            votee = Thing._by_fullname(tid, data=True)
            if isinstance(votee, Comment):
                comments.append(votee)

            print (voter, votee, dir, ip, organic, cheater)
            handle_vote(voter, votee, dir, ip, organic, cheater=cheater)

        update_comment_votes(comments)

    amqp.handle_items("register_vote_q", _handle_vote, limit=limit)
开发者ID:rmasters,项目名称:reddit,代码行数:21,代码来源:queries.py


示例20: run_commentstree

def run_commentstree():
    """Add new incoming comments to their respective comments trees"""

    def _run_commentstree(msgs, chan):
        fnames = [msg.body for msg in msgs]
        comments = Comment._by_fullname(fnames, data=True, return_dict=False)

        links = Link._byID(set(cm.link_id for cm in comments), data=True, return_dict=True)

        # add the comment to the comments-tree
        for comment in comments:
            l = links[comment.link_id]
            try:
                add_comment_tree(comment, l)
            except KeyError:
                # Hackity hack. Try to recover from a corrupted
                # comment tree
                print "Trying to fix broken comments-tree."
                link_comments(l._id, _update=True)
                add_comment_tree(comment, l)

    amqp.handle_items("commentstree_q", _run_commentstree, limit=1)
开发者ID:denrobapps,项目名称:Reddit-VM,代码行数:22,代码来源:queries.py



注:本文中的r2.lib.amqp.handle_items函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python authorize.auth_transaction函数代码示例发布时间:2022-05-26
下一篇:
Python amqp.consume_items函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap