• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python queue.PriorityQueue类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中queue.PriorityQueue的典型用法代码示例。如果您正苦于以下问题:Python PriorityQueue类的具体用法?Python PriorityQueue怎么用?Python PriorityQueue使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了PriorityQueue类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: AStarAlgorithm

class AStarAlgorithm(object):
    def __init__(self, start, goal):
        self.path = []
        self.visitedQueue = []
        self.PriorityQueue = PriorityQueue()
        self.start = start
        self.goal = goal

    def Solve(self):
        startState = StateString(self.start, 0, self.start, self.goal)
        count = 0
        self.PriorityQueue.put((0, count, startState))
        while(not self.path and sel.PriorityQueue.qsize()):
            closestChild = self.PriorityQueue.get()[2]
            closestChild.GenerateChildren()
            slef.visitedQueue.append(closestChild.value)
            for child in closestChild.children:
                if child.value not in self.visitedQueue:
                    count += 1
                    if not child.distance:
                        self.path = child.path
                        break
                    self.PriorityQueue.put((child.distance, count, child))
        if not self.path:
            print ("Goal of " + self.goal + "is not possible")
        return self.path
开发者ID:SckeeDoo,项目名称:LAB4CO,代码行数:26,代码来源:LAB4CO.py


示例2: classify

    def classify(self):
        classified = dict()
        ordered_packages = PriorityQueue()
        for p in self.packages:
            if p.ready_for_upload:
                cls = 'ready_for_upload'
            elif p.has_rc_bugs:
                cls = 'rc_bugs'
            elif p.missing_tag:
                cls = 'missing_tag'
            elif not p.tags:
                cls = 'new'
            elif p.newer_upstream:
                cls = 'new_upstream'
            elif p.watch_problem:
                cls = 'watch_problem'
            elif p.bugs:
                cls = 'bugs'
            elif not p.is_tagged:
                cls = 'wip'
            else:
                cls = 'other'

            package_popcon = popcon.package(p.name)
            if package_popcon:
                ordered_packages.put((-package_popcon[p.name], cls, p))
        while ordered_packages.empty() is False:
            _, cls, package = ordered_packages.get()
            classified.setdefault(cls, []).append(package)

        return classified
开发者ID:PET-UnB,项目名称:pet,代码行数:31,代码来源:classifier.py


示例3: max_spanning_tree

    def max_spanning_tree(self):
        """
        Kruskal's algorithm for min spanning tree, to get most important
        connections in graph.
        -> Graph
        """
        # TODO: Test
        # TODO: Add unit tests
        # TODO: FIx bug, use disjoint set during the test of connection
        pq = PriorityQueue()

        for conn in self.get_connections():
            # Hack negative number used to get use priority queue in inverse order
            # (to get max values first)
            pq.put((-self.get_connection_weight(conn), self.connection_key(conn)))

        min_tree = Graph()
        while not pq.empty():
            curr_weight, curr_connection = pq.get()
            curr_weight = -curr_weight # Hack with negative number
            if min_tree.is_node_in_graph(curr_connection[0]) and \
                min_tree.is_node_in_graph(curr_connection[1]):
                continue

            min_tree.add_connection(*curr_connection)
            min_tree.set_connection_weight(curr_connection, curr_weight)

        for node in self.get_nodes():
            min_tree.set_node_weight(node, self.get_node_weight(node))

        return min_tree
开发者ID:re9ulus,项目名称:story_graph,代码行数:31,代码来源:graph_ops.py


示例4: next_enrichment

    def next_enrichment(self, cookie: Cookie) -> Optional[Enrichment]:
        """
        Loads the next set of data not present in the known data given (the "enrichment").

        Returns `None``if all enrichments have already been applied to the cookie.
        :param cookie: the data already known
        :return: the loaded enrichment
        """
        enrichment_loaders_priority_queue = PriorityQueue()
        for enrichment_loader in self.enrichment_loaders:
            enrichment_loaders_priority_queue.put(enrichment_loader)

        while not enrichment_loaders_priority_queue.empty():
            enrichment_loader = enrichment_loaders_priority_queue.get()

            enrich = False
            try:
                enrich = enrichment_loader.can_enrich(cookie)
            except Exception as e:
                logging.error("Error checking if enrichment can be applied to cookie; Enrichment loader: %s;"
                              "Target Cookie: %s; Error: %s" % (enrichment_loader, cookie.identifier, e))

            if enrich:
                try:
                    return enrichment_loader.load_enrichment(cookie)
                except Exception:
                    logging.error("Error loading enrichment; Enrichment loader: %s; Target Cookie: %s; Error: %s"
                                  % (enrichment_loader, cookie.identifier, traceback.format_exc()))

        return None
开发者ID:MMesbahU,项目名称:cookie-monster,代码行数:30,代码来源:_enrichment.py


示例5: JobQueue

class JobQueue(object):
    def __init__(self):
        self.queue = PriorityQueue()
        self.last_enqueued = None
        self.logger = logging.getLogger(self.__class__.__name__)

    def put(self, job, next_t=0):
        self.logger.debug("Putting a {} with t={}".format(job.__class__.__name__, next_t))
        re_enqueued_last = self.last_enqueued == job
        self.queue.put((next_t, job))
        self.last_enqueued = job
        return re_enqueued_last

    def tick(self):
        now = time.time()

        self.logger.debug("Ticking jobs with t={}".format(now))
        while not self.queue.empty():
            t, j = self.queue.queue[0]
            self.logger.debug("Peeked a {} with t={}".format(j.__class__.__name__, t))

            if t < now:
                self.queue.get()
                self.logger.debug("About time! running")
                j.run()
                self.put(j, now + j.INTERVAL)
                continue

            self.logger.debug("Next task isn't due yet. Finished!")
            break
开发者ID:franciscod,项目名称:telegram-twitter-forwarder-bot,代码行数:30,代码来源:basebot.py


示例6: MultiThreadedWeatherDatabase

class MultiThreadedWeatherDatabase(Thread):
    def __init__(self, file):
        super(MultiThreadedWeatherDatabase, self).__init__()
        self.file = file
        self.queue = PriorityQueue()
        self.event = Event()
        self.create_tables = False
        if not os.path.isfile(file):
            self.create_tables = True
        self.start()  # Threading module start

    def run(self):
        super(MultiThreadedWeatherDatabase, self).run()
        db = sqlite3.connect(self.file)
        cursor = db.cursor()
        if self.create_tables:
            self.create_all_tables()
        while True:
            if self.queue.empty():
                sleep(0.1)  # So the thread doesnt use all of the processor
                continue
            job, sql, arg, result = self.queue.get_nowait()
            if sql == '__close__':
                break
            if arg is None:
                arg = ''
            cursor.execute(sql, arg)
            db.commit()
            if result:
                for rec in cursor:
                    result.put(rec)
                result.put('__last__')
        db.close()
        self.event.set()

    def execute(self, sql, args=None, res=None, priority=2):
        self.queue.put_nowait((priority, sql, args, res))

    def select(self, sql, args=None, priority=2):
        res = Queue()
        self.execute(sql, args, res, priority)
        while True:
            rec = res.get()
            if rec == '__last__':
                break
            yield rec

    def close(self):
        self.execute('__close__')

    def create_all_tables(self):
        command1 = '''CREATE TABLE location (location_id INTEGER PRIMARY KEY , town TEXT, country TEXT, lat REAL, lon REAL, dateadded INTEGER, timezone INTEGER)'''
        self.execute(command1)
        command2 = '''CREATE TABLE "forecast" (forecast_id INTEGER PRIMARY KEY, location_id INTEGER, time INTEGER, temp REAL, pressure INTEGER, humidity INTEGER, clouds INTEGER, windspeed REAL, winddirection INTEGER, symbol INTEGER, FOREIGN KEY (location_id) REFERENCES location (location_id) DEFERRABLE INITIALLY DEFERRED)'''
        self.execute(command2)


    def remove_old_forecasts(self):
        command = '''DELETE FROM forecast WHERE forecast.time < STRFTIME('%s', 'now')'''
        self.execute(command)
开发者ID:terimater,项目名称:WeatherApp,代码行数:60,代码来源:database.py


示例7: findWayByAStar

def findWayByAStar(start, isFinish, getDistance, getIncidenceList, getHeuristicCostEstimate):
    """
    start - start vertex, isFinish - function which returns True only on finish
    getIncidenceList(vertex) - returns incidence list of vertex,
    getDistance(first_vertex, second_vertex) - returns distance from first_vertex to second_vertex,
    getHeuristicCostEstimate(vertex).
    findWayByAStar returns path(list) from start to finish
    """
    processed_vertices = set()
    waiting_vertices = {start, }

    node_queue = PriorityQueue()
    node_storage = dict()

    node_storage[start] = _AStarNode(start)
    node_storage[start].hce = getHeuristicCostEstimate(start)
    node_storage[start].updateSum()
    node_queue.put_nowait(tuple(((node_storage[start].sum, 0), node_storage[start].vertex)))

    while len(waiting_vertices) != 0:
        processing_vertex = node_queue.get()[1]  # item = ((priority number, priority_index), data).
        while processing_vertex in processed_vertices:
            processing_vertex = node_queue.get_nowait()[1]

        if isFinish(processing_vertex):
            return _createPath(processing_vertex, node_storage)

        _processVertex(processing_vertex, getDistance, getIncidenceList, getHeuristicCostEstimate,
                       processed_vertices, waiting_vertices, node_storage, node_queue)

    raise Exception("Path doesn't exist")
开发者ID:IvanShafran,项目名称:mr-ant,代码行数:31,代码来源:algorithm_A_star.py


示例8: Phase

class Phase(object):
    def __init__(self):
        self._priority = 0
        self._actions = PriorityQueue()
        self._log = []
        self._ended = False
        self._checking = False

    def add_action(self, action):
        self._actions.put(action)
        self.check_queue()

    def check_queue(self):
        if not self._checking:
            self._checking = True
            while(not self._actions.empty()
                  and self._actions.queue[0].get_priority() <= self._priority):
                action = self._actions.get()
                action.resolve()
                self._log.append(action)
            self._checking = False

    def increase_priority(self, priority=100):
        self._priority += priority
        self.check_queue()

    def is_ended(self):
        return self._ended

    def is_checking(self):
        return self._checking
开发者ID:BitokuOokami,项目名称:PloungeMafiaToolkit,代码行数:31,代码来源:phases.py


示例9: substitute_once

 def substitute_once(self, string, indices):
     assert isinstance(string, str)
     indices = list(indices)
     assert len(string) == len(indices)
     n = len(string)
     i = 0
     result_string = []
     result_indices = []
     match = re.search(self.pattern, string)
     if match:
         q = PriorityQueue()
         for key, value in self.replacement.items():
             span = match.span(key)
             assert span
             q.put((span, value))
         while not q.empty():
             (start, end), value = q.get()
             assert start >= i
             assert start < len(string)
             # TODO: Allow align to be set in value
             align = self.align
             result_string.append(string[i:start])
             result_indices.append(indices[i:start])
             i = start
             if align == self.Align.left:
                 pretend = indices[start]
             else:
                 pretend = indices[end - 1]
             result_string.append(value)
             result_indices.append([pretend] * len(value))
             i = end
     result_string.append(string[i:n])
     result_indices.append(indices[i:n])
     return ''.join(result_string), itertools.chain.from_iterable(result_indices)
开发者ID:filipbartek,项目名称:nbspacer,代码行数:34,代码来源:transducer.py


示例10: _test_loaded_in_correct_order

    def _test_loaded_in_correct_order(
        self, enrichment_manager: EnrichmentManager, enrichment_loaders: Iterable[EnrichmentLoader]
    ):
        """
        Tests that the given enrichment manager applies enrichments defined be the given loaders in the correct order.
        :param enrichment_manager: enrichment manager
        :param enrichment_loaders: enrichment loaders
        """
        logging.root.setLevel(logging.CRITICAL)
        cookie = Cookie("the_identifier")

        enrichment_loaders_priority_queue = PriorityQueue()
        for enrichment_loader in enrichment_loaders:
            if enrichment_loader.can_enrich(cookie):
                enrichment_loaders_priority_queue.put(enrichment_loader)

        enrichment = enrichment_manager.next_enrichment(cookie)
        while enrichment is not None:
            expected_enrichment_loader = enrichment_loaders_priority_queue.get()  # type: EnrichmentLoader
            expected_enrichment = expected_enrichment_loader.load_enrichment(cookie)
            self.assertEqual(enrichment, expected_enrichment)
            cookie.enrich(enrichment)
            expected_enrichment_loader.can_enrich = MagicMock(return_value=False)
            enrichment = enrichment_manager.next_enrichment(cookie)
        self.assertTrue(enrichment_loaders_priority_queue.empty())
开发者ID:wtsi-hgi,项目名称:cookie-monster,代码行数:25,代码来源:test_enrichment.py


示例11: find_path

 def find_path(self, start, end):
     start = start  # .upper()
     # end = list(set([x.upper() for x in end]))  # list comprehension, convert tuple to list. Booyeah.
     end = end  # [x.upper() for x in end]
     while (start in end):
         end.remove(start)
     if (not start in self.graph.keys()) or (not set(end) <= set(self.graph.keys())) or (start == end):
         return []
     self.__reset_visited__()
     from queue import PriorityQueue
     pq = PriorityQueue()
     self.__assign__(pq, [[start, 0]], self.__get_adjacent__(start))
     while not pq.empty():
         shortest = pq.get()
         if (shortest.head() in end):
             end.remove(shortest.head())
             self.__reset_visited__()
             pq = PriorityQueue()
             '''print(shortest.head())
             print(shortest.path())
             print(end)'''
             self.__assign__(pq, shortest.path(), [])
             if not end:
                 return shortest.path()
         self.__assign__(pq, shortest.path(), self.__get_adjacent__(shortest.head()))
     '''print("second")'''
     return []
开发者ID:TheWookie,项目名称:DurchDenMutterland,代码行数:27,代码来源:DDM.py


示例12: giveConclusion

def giveConclusion():       
    foodmap = mapFood()
    print("Foodmap =", foodmap)
    heatmap = mapHeat()
    foodheat = {}
    foods = PriorityQueue()
    for (food, distance) in foodmap:
            foodheat[food] = heatmap[food[1]][food[0]]
            if foodheat[food] < calculateLimit(heatmap):
                foods.put((distance, food))
    if not foods.empty():
        good_food = foods.get()[1]
        head = snakes[speler_nummer].head
        path = givePath(head, good_food)
        direction = giveDirection(path[0], path[1])
    else:
        minimum = wall_value
        direction = -1
        head = snakes[speler_nummer].head
        backuplist = []
        for coordinate in neighbours(head):
            if heatmap[coordinate[1]][coordinate[0]] < minimum:
                direction = giveDirection(head, coordinate)
                minimum = heatmap[coordinate[1]][coordinate[0]]
            elif level[coordinate[1]][coordinate[0]] in ['.','x']:
                backuplist.append(coordinate)
        if direction == -1:
                if len(backuplist)!= 0:
                    direction = giveDirection(head,backuplist[0])
                else:
                    print("Goodbye, cruel world!")
                    direction = 'r'
    return direction
开发者ID:TimdeJonge,项目名称:Snake,代码行数:33,代码来源:Snake2.py


示例13: _process_nonrealtime_stop

    def _process_nonrealtime_stop(self, state):
        import supriya.patterns

        if not state["has_stopped"]:
            state["has_stopped"] = True
        self._debug("UNWINDING")
        assert state["event_queue"].qsize() == 1

        event_tuple = state["event_queue"].get()
        if event_tuple.iterator_index not in state["visited_iterators"]:
            self._debug("    DISCARDING, UNVISITED", event_tuple)
        elif not isinstance(event_tuple.event, supriya.patterns.CompositeEvent):
            self._debug("    DISCARDING, NON-COMPOSITE", event_tuple)
        elif not event_tuple.event.get("is_stop"):
            self._debug("    DISCARDING, NON-STOP", event_tuple)
        else:
            self._debug("    PRESERVING", event_tuple)
            state["event_queue"].put(event_tuple._replace(offset=0.0))

        iterator_queue = PriorityQueue()
        while not state["iterator_queue"].empty():
            iterator_tuple = state["iterator_queue"].get()
            iterator_tuple = iterator_tuple._replace(offset=0.0)
            iterator_queue.put(iterator_tuple)
        state["iterator_queue"] = iterator_queue
开发者ID:josiah-wolf-oberholtzer,项目名称:supriya,代码行数:25,代码来源:Ppar.py


示例14: test_starts_actions_and_adds_back_to_queue

    def test_starts_actions_and_adds_back_to_queue(self):
        # given
        start_time = 0
        deadline = 10
        action_to_start = Action(start_time, deadline+1)
        action_to_start.agent = Mock(name="agent")
        action_to_start.is_applicable = Mock(return_val=True)
        action_to_start.apply = Mock(name="apply")
        model = Mock(name="model")
        execution_queue = PriorityQueue()
        execution_queue.put(ActionState(action_to_start, start_time, ExecutionState.pre_start))

        # when
        actual, _stalled = simulator.execute_action_queue(model, execution_queue,
            break_on_new_knowledge=False, deadline=deadline)

        # then
        assert_that(execution_queue.queue, has_length(1))
        time, state, action = execution_queue.queue[0]
        assert_that(time, equal_to(action_to_start.end_time))
        assert_that(state, equal_to(ExecutionState.executing))
        assert_that(action, equal_to(action_to_start))
        assert_that(actual.executed, is_(empty()))
        assert_that(is_not(action_to_start.apply.called))
        assert_that(actual.simulation_time, equal_to(start_time))
开发者ID:Dunes,项目名称:janitor,代码行数:25,代码来源:test_simulator.py


示例15: parse

 def parse(self, input, fail, pmatch, **kwargs):
   inputs = input.fork(len(self.children))
   queue = PriorityQueue(len(self.children))
   for i, (input,child) in enumerate(zip(inputs,self.children)):
     queue.put((0,i,partial(child.parse,input=input,pmatch=copy(pmatch),**kwargs)))
   current = (-1, -1, badcall)
   
   @assertCont
   def ccont(fail, **kwargs):
     nonlocal current
     return partial(current[2],fail=cfail)
   
   @assertFail
   def cfail(value, cont, **kwargs):
     global DIE
     nonlocal fail, queue, current, ccont
     last = current[0]
     if value is not None:
       queue.put((last+value,current[1],cont))
     if not queue.qsize():
       return partial(fail,value=None,cont=DIE)
     current = queue.get()
     if current[0] == last:
       return partial(current[2],fail=cfail)
     return partial(fail,value=last-current[0],cont=ccont)
   return partial(DIE,fail=cfail)
开发者ID:calcu16,项目名称:PyParser,代码行数:26,代码来源:_choice.py


示例16: EventQueue

class EventQueue(object):
    def __init__(self):
        """Event queue for executing events at 
        specific timepoints.

	In current form it is NOT thread safe."""
        self.q = PriorityQueue()

    def schedule(self, f, ts):
        """Schedule f to be execute at time ts"""
        self.q.put(EqItem(ts, f))

    def schedule_recurring(self, f, interval):
        """Schedule f to be run every interval seconds.

	It will be run for the first time interval seconds
        from now"""

        def recuring_f():
            f()
            self.schedule(recuring_f, time.time() + interval)

        self.schedule(recuring_f, time.time() + interval)

    def run(self):
        """Execute events in the queue as timely as possible."""
        while True:
            event = self.q.get()
            event.f()
开发者ID:ckushner,项目名称:tensorflow-deepq-orbiter,代码行数:29,代码来源:event_queue.py


示例17: Astar

def Astar(start, goal, cost_map, heuristic_map, vehicle, cursor, motion_primitives):
    """
    Open Aera Motion Planning, Static Environment
    """
    pq = PriorityQueue()
    pq.put(start)
    node_dict = {start.index:start, goal.index:goal}
    edge_dict = {}
    # graph = {} # {(state1, state2):trajectory}
    times = 0
    while times<200 and not goal.reach and not pq.empty():
        times += 1
        current = pq.get()
        current.extend = True
        State.RushTowardGoal(current=current, goal=goal, cursor=cursor, edge_dict=edge_dict, pq=pq, cost_map=cost_map, vehicle=vehicle)
        # if traj_g is not None:
        #     edge_dict[(current, goal)] = traj_g
        #     pq.put(goal)
        State.ControlSet(current=current,motion_primitives=motion_primitives, pq=pq, node_dict=node_dict, edge_dict=edge_dict, cost_map=cost_map, heuristic_map=heuristic_map,vehicle=vehicle, goal=goal)
        # control_set = State.ControlSet(current=current,motion_primitives=motion_primitives, cost_map=cost_map, heuristic_map=heuristic_map,vehicle=vehicle)
        # for (successor, traj) in control_set:
        #     edge_dict[(current, successor)] = traj
        #     pq.put(successor)
    if goal.reach:
        return True, node_dict, edge_dict
    else:
        return False, node_dict, edge_dict
开发者ID:bourbakilee,项目名称:PyMPL,代码行数:27,代码来源:OffRoadPlanning.py


示例18: solve

def solve(state):
    solved = False;
    queue = PriorityQueue();
    visited = [];

    visited.append(state);

    # Main loop
    while solved == False:
        empty = state.getEmpty();
        gn = state.getDepth() + 1;

        # Move left
        if 1 <= empty and empty % 3 != 0:
            left = state.move('l', gn);
            left.setParent(state);
            if not any(i == left for i in visited):
                queue.put((left.getValue(), time.time(), left));
                if(left.solved() == True):
                    solved = True;
                    state = left;

        # Move right
        if empty < 8 and empty % 3 != 2 and solved == False:
            right = state.move('r', gn );
            right.setParent(state);
            if not any(i == right for i in visited):
                queue.put((right.getValue(), time.time(), right));
                if(right.solved() == True):
                    solved = True;
                    state = right;

        # Move up
        if 3 <= empty and solved == False:
            up = state.move('u', gn);
            up.setParent(state);
            if not any(i == up for i in visited):
                queue.put((up.getValue(), time.time(), up));
                if(up.solved() == True):
                    solved = True;
                    state = up;

        # Move down
        if empty <= 5 and solved == False:
            down = state.move('d', gn);
            down.setParent(state);
            if not any(i == down for i in visited):
                queue.put((down.getValue(), time.time(), down));
                if(down.solved() == True):
                    solved = True;
                    state = down;

        # Move to next state
        if solved == False:
            state = queue.get()[2];
            visited.append(state);

    # Print steps to solution
    printPath(state);
开发者ID:intrepid94,项目名称:f498q788-cs580-sp2016-git-clone,代码行数:59,代码来源:AI.py


示例19: update_market

def update_market(market):
    new_market = PriorityQueue()
    for i in range(economy[market].qsize()):
        listing = economy[market].get()
        seller = listing[2]
        if seller.alive:
            new_market.put(listing)
    economy[market] = new_market
开发者ID:SageBerg,项目名称:community_simulator,代码行数:8,代码来源:community_simulator.py


示例20: main

def main():
    # Figure out the URL to use
    source_arg = sys.argv[1]
    if source_arg.startswith('http://'):
        url = source_arg
    elif source_arg.startswith('www.'):
        url = 'http://'+source_arg
    elif source_arg.startswith('darklyrics.com'):
        url = 'http://www.'+source_arg
    else:
        source_arg = source_arg.lower().replace(' ', '')
        url = 'http://www.darklyrics.com/{}/{}.html'.format(source_arg[0], source_arg)

    # Read artist page
    print('Accessing {}'.format(url), file=sys.stderr)

    with urlopen(url) as ufd:
        artist_html = ufd.read().decode('utf-8')

    artist_re = re.compile(r'".*#1"')
    artist_mo = artist_re.findall(artist_html)

    album_urls = [s.replace('..', 'http://www.darklyrics.com')[1:-3] for s in artist_mo]

    # Create threads to download and scrape each page
    q = PriorityQueue()
    threads = []
    for i, url in enumerate(album_urls):
        thread = threading.Thread(target=get_url, args=(q, url, i))
        thread.daemon = True
        thread.start()
        threads.append(thread)

    # Wait for all urls to download
    for thread in threads:
        thread.join()

    # File to store output in
    file_name = None
    if len(sys.argv) == 2:
        artist_soup = BeautifulSoup(artist_html, "html.parser")
        file_name = artist_soup.find("title").get_text() + ".txt"
    else:
        file_name = sys.argv[2]
    fd = open(file_name, "w")

    # Go through the queue
    while not q.empty():
        try:
            index, album_html = q.get()
            text = scrape_from_html(album_html)
            print(text, file=fd)
        except Exception as e:
            print("Error scraping from html: {}".format(album_html), file=sys.stderr)
            print(e, file=sys.stderr)


    fd.close()
开发者ID:medakk,项目名称:darklyrics-scraper,代码行数:58,代码来源:darklyrics-scraper.py



注:本文中的queue.PriorityQueue类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python queue.Queue类代码示例发布时间:2022-05-26
下一篇:
Python queue.LifoQueue类代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap