• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python schedule.run_all函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中schedule.run_all函数的典型用法代码示例。如果您正苦于以下问题:Python run_all函数的具体用法?Python run_all怎么用?Python run_all使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了run_all函数的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: add_group

def add_group(group_id):
    print(group_id)
    type = request.args.get('type')
    if type == "group":
        group = groupy.Group.list().filter(id=group_id).first
    elif type == "member":
        group = groupy.Member.list().filter(user_id=group_id).first
    if not group:
        return render_template(
            "layout.html", message="Error! Group ID not found."), 404
    if group_id in group_jobs:
        return render_template(
            "layout.html",
            message="Error! Group already added.")
    schedule.every(1).minutes.do(
        handle_update_group_async,
        group_id=group_id,
        type=type,
        lock=threading.Lock())
    group_jobs.append(group_id)
    schedule.run_all()
    if type == "group":
        return render_template(
            "layout.html",
            message="Fetching group history, please wait. <br> Number of messages: {0}. <br> Estimated time for processing: {1}.".format(
                group.message_count,
                verbose_timedelta(
                    timedelta(
                        seconds=group.message_count /
                        100 *
                        1.1))))
    elif type == "member":
        return render_template(
            "layout.html",
            message="Fetching message history, please wait.")
开发者ID:jordanbuchman,项目名称:groupme_archiver,代码行数:35,代码来源:app.py


示例2: test_run_all

 def test_run_all(self):
     mock_job = make_mock_job()
     every().minute.do(mock_job)
     every().hour.do(mock_job)
     every().day.at('11:00').do(mock_job)
     schedule.run_all()
     assert mock_job.call_count == 3
开发者ID:Chirgal,项目名称:schedule,代码行数:7,代码来源:test_schedule.py


示例3: loop

 def loop(self):
     if self.test_mode:
         schedule.run_all()
     else:
         self.log_time_of_next_run()
     while True:
         schedule.run_pending()
         time.sleep(60)
开发者ID:cocasema,项目名称:stock_bot,代码行数:8,代码来源:stock_bot.py


示例4: test_cancel_jobs

    def test_cancel_jobs(self):
        def stop_job():
            return schedule.CancelJob

        every().second.do(stop_job)
        every().second.do(stop_job)
        every().second.do(stop_job)
        assert len(schedule.jobs) == 3

        schedule.run_all()
        assert len(schedule.jobs) == 0
开发者ID:RHagenaars,项目名称:schedule,代码行数:11,代码来源:test_schedule.py


示例5: handle

    def handle(self, flush, *args, **kwargs):
        schedule.every().monday.at('00:30').do(self.new_sheet_job)

        if flush:
            print "Flushing all scheduled jobs..."
            schedule.run_all()
            return

        print "Running schedule..."
        while True:
            schedule.run_pending()
            time.sleep(60)
开发者ID:xdlailai,项目名称:tchelper,代码行数:12,代码来源:runschedule.py


示例6: main

def main():
    # initialize garduino watcher
    arduino.run()

    ## schedule waits so do first run immediately
    # schedule.every(15).seconds.do(run_threaded, test_updates) # debugger
    # schedule.every(5).minutes.do(run_threaded, fiveminute_updates)
    schedule.every(15).minutes.do(run_threaded, fifteenminute_updates)
    schedule.every(12).hours.do(run_threaded, halfday_updates)
    schedule.run_all()    
    schedule.every().day.at('6:05').do(run_threaded, waterlevel_update)
    while True:
        schedule.run_pending()
        time.sleep(5)
开发者ID:kafitz,项目名称:logbot,代码行数:14,代码来源:logbot.py


示例7: test_run_all_with_decorator

    def test_run_all_with_decorator(self):
        mock_job = make_mock_job()

        @repeat(every().minute)
        def _job1():
            mock_job()

        @repeat(every().hour)
        def _job2():
            mock_job()

        @repeat(every().day.at('11:00'))
        def _job3():
            mock_job()
        schedule.run_all()
        assert mock_job.call_count == 3
开发者ID:RHagenaars,项目名称:schedule,代码行数:16,代码来源:test_schedule.py


示例8: test_clear_by_tag

 def test_clear_by_tag(self):
     every().second.do(make_mock_job(name='job1')).tag('tag1')
     every().second.do(make_mock_job(name='job2')).tag('tag1', 'tag2')
     every().second.do(make_mock_job(name='job3')).tag('tag3', 'tag3',
                                                       'tag3', 'tag2')
     assert len(schedule.jobs) == 3
     schedule.run_all()
     assert len(schedule.jobs) == 3
     schedule.clear('tag3')
     assert len(schedule.jobs) == 2
     schedule.clear('tag1')
     assert len(schedule.jobs) == 0
     every().second.do(make_mock_job(name='job1'))
     every().second.do(make_mock_job(name='job2'))
     every().second.do(make_mock_job(name='job3'))
     schedule.clear()
     assert len(schedule.jobs) == 0
开发者ID:RHagenaars,项目名称:schedule,代码行数:17,代码来源:test_schedule.py


示例9: schedule_updates

def schedule_updates():

    # EDGAR
    schedule.every(1).days.at("04:30").do(_crawler('sec-edgar'))

    schedule.every(1).days.at("01:00").do(_crawler('openoil-internal-documents'))
    # SEDAR
    # Sedar website stops updating at 11pm ET, i.e. 0500 CET
    # We start our scrape just after, at 0511 CET, and allow 3 hours for it to
    # upload
    schedule.every(1).days.at("08:00").do(_crawler('sedar-partial-content'))

    schedule.every(1).days.at("16:00").do(check_alerts)

    schedule.run_all()
    
    while(1):
        schedule.run_pending()
        time.sleep(1)
开发者ID:datastark,项目名称:aleph,代码行数:19,代码来源:scheduled_updates.py


示例10: test_cancel_job

    def test_cancel_job(self):
        def stop_job():
            return schedule.CancelJob
        mock_job = make_mock_job()

        every().second.do(stop_job)
        mj = every().second.do(mock_job)
        assert len(schedule.jobs) == 2

        schedule.run_all()
        assert len(schedule.jobs) == 1
        assert schedule.jobs[0] == mj

        schedule.cancel_job('Not a job')
        assert len(schedule.jobs) == 1
        schedule.default_scheduler.cancel_job('Not a job')
        assert len(schedule.jobs) == 1

        schedule.cancel_job(mj)
        assert len(schedule.jobs) == 0
开发者ID:Chirgal,项目名称:schedule,代码行数:20,代码来源:test_schedule.py


示例11: test_daily_job

    def test_daily_job(self):
        zcml.load_string(self.zcml_template % '''
        <schedule:job
            view="dummy-view"
            unit="day"
            at="3:00"
            />
        ''')

        jobs = schedule.jobs
        self.assertEquals(len(jobs), 1)
        job = jobs[0]
        self.assertEquals(job.interval, 1)
        self.assertEquals(job.unit, 'days')
        self.assertEquals(job.at_time, datetime.time(3, 0))

        self.assertFalse(self.request.get(VIEW_MARKER))

        schedule.run_all()

        self.assertTrue(self.request.get(VIEW_MARKER))
开发者ID:netsight,项目名称:collective.schedule,代码行数:21,代码来源:test_zcml.py


示例12: main

def main():

    args = parse_cmd_args()
    logger.setLevel(logging.__dict__[args.verbosity.upper()])

    if args.log_file:
        file_handler = logging.handlers.TimedRotatingFileHandler(
            args.log_file, when='D')
        file_handler.setFormatter(formatter)
        logger.addHandler(file_handler)

    logger.info("Beginning rendezvous circuit monitoring."
                "Status output every %d seconds", args.tick)

    with Controller.from_port(port=args.port) as controller:
        # Create a connection to the Tor control port
        controller.authenticate()

        # Add event listeners for HS_DESC and HS_DESC_CONTENT
        controller.add_event_listener(circ_event_handler,
                                      stem.control.EventType.CIRC)
        controller.add_event_listener(circ_event_handler,
                                      stem.control.EventType.CIRC_MINOR)

        # Schedule rendezvous status output.
        schedule.every(args.tick).seconds.do(output_status, controller)
        schedule.run_all()

        try:
            while True:
                schedule.run_pending()
                time.sleep(1)
        except KeyboardInterrupt:
            logger.info("Stopping rendezvous circuit monitoring.")

    sys.exit(0)
开发者ID:DonnchaC,项目名称:onionbalance,代码行数:36,代码来源:rend-connection-stats.py


示例13: test_job_func_args_are_passed_on

 def test_job_func_args_are_passed_on(self):
     mock_job = make_mock_job()
     every().second.do(mock_job, 1, 2, 'three', foo=23, bar={})
     schedule.run_all()
     mock_job.assert_called_once_with(1, 2, 'three', foo=23, bar={})
开发者ID:Chirgal,项目名称:schedule,代码行数:5,代码来源:test_schedule.py


示例14: len

        parsed_items_count = len(obj_ids)

        if parsed_items_count < max_item:
            itera =  range(0, parsed_items_count)
        else:
            itera = range(parsed_items_count - max_item, parsed_items_count)

        for i in itera:
            # [6:] remove "entry-" from id value
            obj_id = obj_ids[i].get('id')[6:]
            # cursor.execute("INSERT INTO RSS (fileid, filename, filedate, source)  SELECT (%s, %s, %s, %s) WHERE NOT EXISTS (SELECT * FROM RSS WHERE fileid=%s);", (obj_ids[i].get('id')[6:], obj_names[i], str(datetime.now()), rss,))
            cursor.execute("SELECT id FROM RSS WHERE fileid = %s;", (obj_id,))
            if not cursor.fetchone():
                cursor.execute("SELECT count(*) FROM RSS WHERE source = %s;", (rss,))
                if int(get_config((rss))['max_item']) == int(cursor.fetchone()[0]):
                    print("Limit reached, deleting oldest item from " + rss)
                    cursor.execute("DELETE FROM rss WHERE ctid in (SELECT ctid FROM rss WHERE source = %s ORDER BY filedate LIMIT 1);", (rss,))
                    #db.commit()
                print(obj_id + " - " + obj_names[i] + " - " + str(datetime.now()) + " - " + rss)
                cursor.execute("INSERT INTO RSS (fileid, filename, filedate, source) VALUES (%s, %s, %s, %s);", (obj_id, obj_names[i], str(datetime.now()), rss,))
        db.commit()

for rss in rss_list:
    schedule.every(float(get_config((rss))['check'])).minutes.do(update, rss)

schedule.run_all(10)

while 1:
    schedule.run_pending()
    time.sleep(1)
开发者ID:Ale46,项目名称:google-drive-to-rss,代码行数:30,代码来源:worker.py


示例15: main

def main():
    """
    Entry point when invoked over the command line.
    """
    args = parse_cmd_args().parse_args()
    config_file_options = settings.parse_config_file(args.config)

    # Update global configuration with options specified in the config file
    for setting in dir(config):
        if setting.isupper() and config_file_options.get(setting):
            setattr(config, setting, config_file_options.get(setting))

    # Override the log level if specified on the command line.
    if args.verbosity:
        config.LOG_LEVEL = args.verbosity.upper()

    # Write log file if configured in environment variable or config file
    if config.LOG_LOCATION:
        log.setup_file_logger(config.LOG_LOCATION)

    logger.setLevel(logging.__dict__[config.LOG_LEVEL.upper()])

    # Create a connection to the Tor control port
    try:
        controller = Controller.from_port(address=args.ip, port=args.port)
    except stem.SocketError as exc:
        logger.error("Unable to connect to Tor control port: %s", exc)
        sys.exit(1)
    else:
        logger.debug("Successfully connected to the Tor control port.")

    try:
        controller.authenticate()
    except stem.connection.AuthenticationFailure as exc:
        logger.error("Unable to authenticate to Tor control port: %s", exc)
        sys.exit(1)
    else:
        logger.debug("Successfully authenticated to the Tor control port.")

    # Disable no-member due to bug with "Instance of 'Enum' has no * member"
    # pylint: disable=no-member

    # Check that the Tor client supports the HSPOST control port command
    if not controller.get_version() >= stem.version.Requirement.HSPOST:
        logger.error("A Tor version >= %s is required. You may need to "
                     "compile Tor from source or install a package from "
                     "the experimental Tor repository.",
                     stem.version.Requirement.HSPOST)
        sys.exit(1)

    # Load the keys and config for each onion service
    settings.initialize_services(controller,
                                 config_file_options.get('services'))

    # Finished parsing all the config file.

    handler = eventhandler.EventHandler()
    controller.add_event_listener(handler.new_desc,
                                  EventType.HS_DESC)
    controller.add_event_listener(handler.new_desc_content,
                                  EventType.HS_DESC_CONTENT)

    # Schedule descriptor fetch and upload events
    schedule.every(config.REFRESH_INTERVAL).seconds.do(
        onionbalance.instance.fetch_instance_descriptors, controller)
    schedule.every(config.PUBLISH_CHECK_INTERVAL).seconds.do(
        onionbalance.service.publish_all_descriptors)

    try:
        # Run initial fetch of HS instance descriptors
        schedule.run_all(delay_seconds=30)

        # Begin main loop to poll for HS descriptors
        while True:
            schedule.run_pending()
            time.sleep(1)
    except KeyboardInterrupt:
        logger.info("Keyboard interrupt received. Stopping the "
                    "management server.")
    return 0
开发者ID:mscherer,项目名称:onionbalance,代码行数:80,代码来源:manager.py


示例16: set

            locations.append(recent_location)

    return set(locations)


def log_run_metrics(cycle_start_time, recent_media_added, users_added, users_updated, user_recent_media_added):
    print '\nDONE: inserting new records into database at %s.' % datetime.datetime.now()
    print 'Inserted: RecentMedia: %d -- Users Added: %d -- Users Updated: %d -- UserRecentMedia: %d'\
          % (recent_media_added, users_added, users_updated, user_recent_media_added)
    print 'Cycle run time taken: %s' % (datetime.datetime.now() - cycle_start_time)
    print 'Process start time was: %s' % process_start_time
    print 'Process run time currently: %s' % (datetime.datetime.now() - process_start_time)


if __name__ == '__main__':
    api = get_instagram_api()

    # Open cluster connection to the raw keyspace, and build/connect to our tables
    open_cassandra_session()

    # Schedule our job, and begin it, sleeping for 120 seconds between each job before rerunning
    print 'Scheduling job for every 140 seconds, time is %s.' % datetime.datetime.now()
    schedule.every(140).seconds.do(produce_raw_layer)

    # Process start time
    process_start_time = datetime.datetime.now()

    while True:
        schedule.run_all()
        time.sleep(120)
开发者ID:AndrewZurn,项目名称:instagram-analytics,代码行数:30,代码来源:raw_main.py


示例17: set

    if table['name'] not in r.table_list().run():
        logging.info('Creating table %s' % table['name'])
        if 'primary' in table:
            r.table_create(table['name'], primary_key=table['primary']).run()
        else:
            r.table_create(table['name']).run()
    if not 'indexes' in table: continue
    indexes = set(r.table(table['name']).index_list().run())
    indexes = set(table['indexes']) - indexes
    for index in indexes:
        logging.info('Creating index %s on %s' % (index, table['name']))
        r.table(table['name']).index_create(index).run()
r.wait()
connection.close()
logging.info('Finished preparing')

# https://github.com/dbader/schedule/issues/55
logging.info('Started scheduling jobs')
schedule.every().day.at('00:00').do(lambda: logRecentScrape('launched', 24 * 60 + 5))
schedule.every().day.at('00:05').do(lambda: logRecentScrape('funded', 24 * 60 + 5))
schedule.every().day.at('00:10').do(scrapeLive)
if os.environ.get('PROD') is None:
    logging.info('Running all jobs and exiting')
    schedule.run_all(0)
    sys.exit(0)
logging.info('Finished scheduling jobs')

while True:
    schedule.run_pending()
    time.sleep(1)
开发者ID:ITUPythonStudyGroup,项目名称:scraper,代码行数:30,代码来源:scrape.py



注:本文中的schedule.run_all函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python schedule.run_pending函数代码示例发布时间:2022-05-27
下一篇:
Python schedule.every函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap