• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python compat.urlparse函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中streamlink.compat.urlparse函数的典型用法代码示例。如果您正苦于以下问题:Python urlparse函数的具体用法?Python urlparse怎么用?Python urlparse使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了urlparse函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: url_equal

def url_equal(first, second, ignore_scheme=False, ignore_netloc=False, ignore_path=False, ignore_params=False,
              ignore_query=False, ignore_fragment=False):
    """
    Compare two URLs and return True if they are equal, some parts of the URLs can be ignored
    :param first: URL
    :param second: URL
    :param ignore_scheme: ignore the scheme
    :param ignore_netloc: ignore the netloc
    :param ignore_path: ignore the path
    :param ignore_params: ignore the params
    :param ignore_query: ignore the query string
    :param ignore_fragment: ignore the fragment
    :return: result of comparison
    """
    # <scheme>://<netloc>/<path>;<params>?<query>#<fragment>

    firstp = urlparse(first)
    secondp = urlparse(second)

    return ((firstp.scheme == secondp.scheme or ignore_scheme) and
            (firstp.netloc == secondp.netloc or ignore_netloc) and
            (firstp.path == secondp.path or ignore_path) and
            (firstp.params == secondp.params or ignore_params) and
            (firstp.query == secondp.query or ignore_query) and
            (firstp.fragment == secondp.fragment or ignore_fragment))
开发者ID:sheldon0531,项目名称:streamlink,代码行数:25,代码来源:url.py


示例2: _get_streams

    def _get_streams(self):
        self.session.http.headers.update({'User-Agent': useragents.FIREFOX})

        iframe_url = None
        page = self.session.http.get(self.url)
        for a in itertags(page.text, 'a'):
            if a.attributes.get('class') == 'play-live':
                iframe_url = update_scheme(self.url, a.attributes['data-url'])
                break

        if not iframe_url:
            raise PluginError('Could not find iframe.')

        parsed = urlparse(iframe_url)
        path_list = parsed.path.split('/')
        if len(path_list) != 6:
            # only support a known iframe url style,
            # the video id might be on a different spot if the url changes
            raise PluginError('unsupported iframe URL: {0}'.format(iframe_url))

        res = self.session.http.get(
            self.API_URL.format(netloc=parsed.netloc, id=path_list[4]))

        data = self.session.http.json(res, schema=self._api_schema)
        log.trace('{0!r}'.format(data))

        url = self.PLAYLIST_URL.format(
            app=data['streamProperties']['application'],
            name=data['playStreamName'],
            netloc=data['cdnHost'],
        )
        return HLSStream.parse_variant_playlist(self.session, url)
开发者ID:sheldon0531,项目名称:streamlink,代码行数:32,代码来源:teleclubzoom.py


示例3: _get_streams

    def _get_streams(self):
        match = _url_re.match(self.url)
        video_id = match.group("video_id")
        res = http.get(ASSET_URL.format(video_id))
        assets = http.xml(res, schema=_asset_schema)

        streams = {}
        for asset in assets:
            base = asset["base"]
            url = asset["url"]

            if urlparse(url).path.endswith(".f4m"):
                streams.update(
                    HDSStream.parse_manifest(self.session, url, pvswf=SWF_URL)
                )
            elif base.startswith("rtmp"):
                name = "{0}k".format(asset["bitrate"])
                params = {
                    "rtmp": asset["base"],
                    "playpath": url,
                    "live": True
                }
                streams[name] = RTMPStream(self.session, params)

        return streams
开发者ID:amadu80,项目名称:repository.xvbmc,代码行数:25,代码来源:tv4play.py


示例4: find_iframe

 def find_iframe(self, res):
     for url in self.iframe_re.findall(res.text):
         if url.startswith("//"):
             p = urlparse(self.url)
             return "{0}:{1}".format(p.scheme, url)
         else:
             return url
开发者ID:sheldon0531,项目名称:streamlink,代码行数:7,代码来源:ovvatv.py


示例5: update_qsd

def update_qsd(url, qsd=None, remove=None):
    """
    Update or remove keys from a query string in a URL

    :param url: URL to update
    :param qsd: dict of keys to update, a None value leaves it unchanged
    :param remove: list of keys to remove, or "*" to remove all
                   note: updated keys are never removed, even if unchanged
    :return: updated URL
    """
    qsd = qsd or {}
    remove = remove or []

    # parse current query string
    parsed = urlparse(url)
    current_qsd = OrderedDict(parse_qsl(parsed.query))

    # * removes all possible keys
    if remove == "*":
        remove = list(current_qsd.keys())

    # remove keys before updating, but leave updated keys untouched
    for key in remove:
        if key not in qsd:
            del current_qsd[key]

    # and update the query string
    for key, value in qsd.items():
        if value:
            current_qsd[key] = value

    return parsed._replace(query=urlencode(current_qsd)).geturl()
开发者ID:sheldon0531,项目名称:streamlink,代码行数:32,代码来源:url.py


示例6: __init__

    def __init__(self, node, root=None, parent=None, url=None, *args, **kwargs):
        # top level has no parent
        super(MPD, self).__init__(node, root=self, *args, **kwargs)
        # parser attributes
        self.url = url
        self.timelines = defaultdict(lambda: -1)
        self.timelines.update(kwargs.pop("timelines", {}))
        self.id = self.attr(u"id")
        self.profiles = self.attr(u"profiles", required=True)
        self.type = self.attr(u"type", default=u"static", parser=MPDParsers.type)
        self.minimumUpdatePeriod = self.attr(u"minimumUpdatePeriod", parser=MPDParsers.duration, default=Duration())
        self.minBufferTime = self.attr(u"minBufferTime", parser=MPDParsers.duration, required=True)
        self.timeShiftBufferDepth = self.attr(u"timeShiftBufferDepth", parser=MPDParsers.duration)
        self.availabilityStartTime = self.attr(u"availabilityStartTime", parser=MPDParsers.datetime,
                                               default=datetime.datetime.fromtimestamp(0, utc),  # earliest date
                                               required=self.type == "dynamic")
        self.publishTime = self.attr(u"publishTime", parser=MPDParsers.datetime, required=self.type == "dynamic")
        self.availabilityEndTime = self.attr(u"availabilityEndTime", parser=MPDParsers.datetime)
        self.mediaPresentationDuration = self.attr(u"mediaPresentationDuration", parser=MPDParsers.duration)
        self.suggestedPresentationDelay = self.attr(u"suggestedPresentationDelay", parser=MPDParsers.duration)

        # parse children
        location = self.children(Location)
        self.location = location[0] if location else None
        if self.location:
            self.url = self.location.text
            urlp = list(urlparse(self.url))
            if urlp[2]:
                urlp[2], _ = urlp[2].rsplit("/", 1)
            self._base_url = urlunparse(urlp)

        self.baseURLs = self.children(BaseURL)
        self.periods = self.children(Period, minimum=1)
        self.programInformation = self.children(ProgramInformation)
开发者ID:sheldon0531,项目名称:streamlink,代码行数:34,代码来源:dash_manifest.py


示例7: uri

 def uri(self, uri):
     if uri and urlparse(uri).scheme:
         return uri
     elif self.base_uri and uri:
         return urljoin(self.base_uri, uri)
     else:
         return uri
开发者ID:amadu80,项目名称:repository.xvbmc,代码行数:7,代码来源:hls_playlist.py


示例8: _get_streams

    def _get_streams(self):
        self.url = http.resolve_url(self.url)
        match = _url_re.match(self.url)
        parsed = urlparse(self.url)
        if parsed.fragment:
            channel_id = parsed.fragment
        elif parsed.path[:3] == '/v/':
            channel_id = parsed.path.split('/')[-1]
        else:
            channel_id = match.group("channel")

        if not channel_id:
            return

        channel_id = channel_id.lower().replace("/", "_")
        res = http.get(API_URL.format(channel_id))
        info = http.json(res, schema=_schema)

        if not info["success"]:
            return

        if info.get("isLive"):
            name = "live"
        else:
            name = "vod"

        stream = HTTPStream(self.session, info["payload"])
        # Wrap the stream in a FLVPlaylist to verify the FLV tags
        stream = FLVPlaylist(self.session, [stream])

        return {name: stream}
开发者ID:coder-alpha,项目名称:CcloudTv.bundle,代码行数:31,代码来源:veetle.py


示例9: _find_iframe

 def _find_iframe(self, res):
     iframe = self.iframe_re.search(res.text)
     url = iframe and iframe.group(1)
     if url and url.startswith("//"):
         p = urlparse(self.url)
         url = "{0}:{1}".format(p.scheme, url)
     return url
开发者ID:amadu80,项目名称:repository.xvbmc,代码行数:7,代码来源:tv8cat.py


示例10: _get_streams

    def _get_streams(self):
        res = http.get(self.url)
        match = _info_re.search(res.text)
        if not match:
            return

        info = parse_json(match.group(1), schema=_schema)
        stream_name = info["mode"]
        mp4_url = info.get("mp4_url")
        ios_url = info.get("ios_url")
        swf_url = info.get("swf_url")

        if mp4_url:
            stream = HTTPStream(self.session, mp4_url)
            yield stream_name, stream

        if ios_url:
            if urlparse(ios_url).path.endswith(".m3u8"):
                streams = HLSStream.parse_variant_playlist(self.session, ios_url)
                # TODO: Replace with "yield from" when dropping Python 2.
                for stream in streams.items():
                    yield stream

        if swf_url:
            stream = self._get_rtmp_stream(swf_url)
            if stream:
                yield stream_name, stream
开发者ID:coder-alpha,项目名称:CcloudTv.bundle,代码行数:27,代码来源:dmcloud.py


示例11: _get_streams

    def _get_streams(self):
        self.session.http.headers.update({
           "Referer": self.url,
           "User-Agent": useragents.FIREFOX
        })

        iframe_url = None
        res = self.session.http.get(self.url)
        for iframe in itertags(res.text, "iframe"):
            if "embed.lsm.lv" in iframe.attributes.get("src"):
                iframe_url = iframe.attributes.get("src")
                break

        if not iframe_url:
            log.error("Could not find player iframe")
            return

        log.debug("Found iframe: {0}".format(iframe_url))
        res = self.session.http.get(iframe_url)
        for source in itertags(res.text, "source"):
            if source.attributes.get("src"):
                stream_url = source.attributes.get("src")
                url_path = urlparse(stream_url).path
                if url_path.endswith(".m3u8"):
                    for s in HLSStream.parse_variant_playlist(self.session,
                                                              stream_url).items():
                        yield s
                else:
                    log.debug("Not used URL path: {0}".format(url_path))
开发者ID:sheldon0531,项目名称:streamlink,代码行数:29,代码来源:ltv_lsm_lv.py


示例12: _get_streams

    def _get_streams(self):
        http.headers.update({"User-Agent": useragents.CHROME,
                             "Referer": self.referer})
        fragment = dict(parse_qsl(urlparse(self.url).fragment))
        link = fragment.get("link")
        if not link:
            link = self._get_tv_link()

        if not link:
            self.logger.error("Missing link fragment: stream unavailable")
            return

        player_url = self._api_url.format(link)
        self.logger.debug("Requesting player API: {0} (referer={1})", player_url, self.referer)
        res = http.get(player_url,
                       params={"_": int(time.time() * 1000)},
                       headers={"X-Requested-With": "XMLHttpRequest"})

        try:
            data = http.json(res, schema=self.api_schema)
        except PluginError as e:
            print(e)
            self.logger.error("Cannot play this stream type")
        else:
            if data["status"]:
                if data["file"].startswith("<"):
                    self.logger.error("Cannot play embedded streams")
                else:
                    return HLSStream.parse_variant_playlist(self.session, data["file"])
            else:
                self.logger.error(data["text"])
开发者ID:justastranger,项目名称:Twitchy,代码行数:31,代码来源:seetv.py


示例13: fetch

    def fetch(self, segment, retries=None):
        if self.closed or not retries:
            return

        try:
            headers = {}
            now = datetime.datetime.now(tz=utc)
            if segment.available_at > now:
                time_to_wait = (segment.available_at - now).total_seconds()
                fname = os.path.basename(urlparse(segment.url).path)
                log.debug("Waiting for segment: {fname} ({wait:.01f}s)".format(fname=fname, wait=time_to_wait))
                sleep_until(segment.available_at)

            if segment.range:
                start, length = segment.range
                if length:
                    end = start + length - 1
                else:
                    end = ""
                headers["Range"] = "bytes={0}-{1}".format(start, end)

            return self.session.http.get(segment.url,
                                         timeout=self.timeout,
                                         exception=StreamError,
                                         headers=headers)
        except StreamError as err:
            log.error("Failed to open segment {0}: {1}", segment.url, err)
            return self.fetch(segment, retries - 1)
开发者ID:sheldon0531,项目名称:streamlink,代码行数:28,代码来源:dash.py


示例14: prepend_www

def prepend_www(url):
    """Changes google.com to www.google.com"""
    parsed = urlparse(url)
    if parsed.netloc.split(".")[0] != "www":
        return parsed.scheme + "://www." + parsed.netloc + parsed.path
    else:
        return url
开发者ID:justastranger,项目名称:Twitchy,代码行数:7,代码来源:__init__.py


示例15: _create_stream

    def _create_stream(self, stream, is_live):
        stream_name = "{0}p".format(stream["height"])
        stream_type = stream["mediaType"]
        stream_url = stream["url"]

        if stream_type in ("hls", "mp4"):
            if urlparse(stream_url).path.endswith("m3u8"):
                try:
                    streams = HLSStream.parse_variant_playlist(self.session, stream_url)

                    # TODO: Replace with "yield from" when dropping Python 2.
                    for stream in streams.items():
                        yield stream
                except IOError as err:
                    self.logger.error("Failed to extract HLS streams: {0}", err)
            else:
                yield stream_name, HTTPStream(self.session, stream_url)

        elif stream_type == "rtmp":
            params = {
                "rtmp": stream["streamer"],
                "playpath": stream["url"],
                "swfVfy": SWF_URL,
                "pageUrl": self.url,
            }

            if is_live:
                params["live"] = True
            else:
                params["playpath"] = "mp4:{0}".format(params["playpath"])

            stream = RTMPStream(self.session, params)
            yield stream_name, stream
开发者ID:coder-alpha,项目名称:CcloudTv.bundle,代码行数:33,代码来源:artetv.py


示例16: _get_streams

    def _get_streams(self):
        res = http.get(self.url, schema=_live_schema)
        if not res:
            return

        if res["type"] == "hls" and urlparse(res["url"]).path.endswith("m3u8"):
            stream = HLSStream(self.session, res["url"])
            return dict(hls=stream)
开发者ID:coder-alpha,项目名称:CcloudTv.bundle,代码行数:8,代码来源:ssh101.py


示例17: update_scheme

def update_scheme(current, target):
    """
    Take the scheme from the current URL and applies it to the
    target URL if the target URL startswith // or is missing a scheme
    :param current: current URL
    :param target: target URL
    :return: target URL with the current URLs scheme
    """
    target_p = urlparse(target)
    if not target_p.scheme and target_p.netloc:
        return "{0}:{1}".format(urlparse(current).scheme,
                                urlunparse(target_p))
    elif not target_p.scheme and not target_p.netloc:
        return "{0}://{1}".format(urlparse(current).scheme,
                                  urlunparse(target_p))
    else:
        return target
开发者ID:sheldon0531,项目名称:streamlink,代码行数:17,代码来源:url.py


示例18: _parse_vod_streams

    def _parse_vod_streams(self, vod):
        for name, stream in vod["streams"].items():
            scheme = urlparse(stream["url"]).scheme

            if scheme == "http":
                yield name, HLSStream(self.session, stream["url"])
            elif scheme == "rtmp":
                yield name, self._create_rtmp_stream(stream, live=False)
开发者ID:coder-alpha,项目名称:CcloudTv.bundle,代码行数:8,代码来源:filmon.py


示例19: find_iframe

 def find_iframe(self, res):
     p = urlparse(self.url)
     for url in self.iframe_re.findall(res.text):
         if "googletagmanager" not in url:
             if url.startswith("//"):
                 return "{0}:{1}".format(p.scheme, url)
             else:
                 return url
开发者ID:justastranger,项目名称:Twitchy,代码行数:8,代码来源:cdnbg.py


示例20: from_url

    def from_url(cls, session, url):
        purl = urlparse(url)
        querys = dict(parse_qsl(purl.query))

        account_id, player_id, _ = purl.path.lstrip("/").split("/", 3)
        video_id = querys.get("videoId")

        bp = cls(session, account_id=account_id, player_id=player_id)
        return bp.get_streams(video_id)
开发者ID:amadu80,项目名称:repository.xvbmc,代码行数:9,代码来源:brightcove.py



注:本文中的streamlink.compat.urlparse函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python plugin.Plugin类代码示例发布时间:2022-05-27
下一篇:
Python streamhist.StreamHist类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap