本文整理汇总了Python中streamlink.compat.urlparse函数的典型用法代码示例。如果您正苦于以下问题:Python urlparse函数的具体用法?Python urlparse怎么用?Python urlparse使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了urlparse函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: url_equal
def url_equal(first, second, ignore_scheme=False, ignore_netloc=False, ignore_path=False, ignore_params=False,
ignore_query=False, ignore_fragment=False):
"""
Compare two URLs and return True if they are equal, some parts of the URLs can be ignored
:param first: URL
:param second: URL
:param ignore_scheme: ignore the scheme
:param ignore_netloc: ignore the netloc
:param ignore_path: ignore the path
:param ignore_params: ignore the params
:param ignore_query: ignore the query string
:param ignore_fragment: ignore the fragment
:return: result of comparison
"""
# <scheme>://<netloc>/<path>;<params>?<query>#<fragment>
firstp = urlparse(first)
secondp = urlparse(second)
return ((firstp.scheme == secondp.scheme or ignore_scheme) and
(firstp.netloc == secondp.netloc or ignore_netloc) and
(firstp.path == secondp.path or ignore_path) and
(firstp.params == secondp.params or ignore_params) and
(firstp.query == secondp.query or ignore_query) and
(firstp.fragment == secondp.fragment or ignore_fragment))
开发者ID:sheldon0531,项目名称:streamlink,代码行数:25,代码来源:url.py
示例2: _get_streams
def _get_streams(self):
self.session.http.headers.update({'User-Agent': useragents.FIREFOX})
iframe_url = None
page = self.session.http.get(self.url)
for a in itertags(page.text, 'a'):
if a.attributes.get('class') == 'play-live':
iframe_url = update_scheme(self.url, a.attributes['data-url'])
break
if not iframe_url:
raise PluginError('Could not find iframe.')
parsed = urlparse(iframe_url)
path_list = parsed.path.split('/')
if len(path_list) != 6:
# only support a known iframe url style,
# the video id might be on a different spot if the url changes
raise PluginError('unsupported iframe URL: {0}'.format(iframe_url))
res = self.session.http.get(
self.API_URL.format(netloc=parsed.netloc, id=path_list[4]))
data = self.session.http.json(res, schema=self._api_schema)
log.trace('{0!r}'.format(data))
url = self.PLAYLIST_URL.format(
app=data['streamProperties']['application'],
name=data['playStreamName'],
netloc=data['cdnHost'],
)
return HLSStream.parse_variant_playlist(self.session, url)
开发者ID:sheldon0531,项目名称:streamlink,代码行数:32,代码来源:teleclubzoom.py
示例3: _get_streams
def _get_streams(self):
match = _url_re.match(self.url)
video_id = match.group("video_id")
res = http.get(ASSET_URL.format(video_id))
assets = http.xml(res, schema=_asset_schema)
streams = {}
for asset in assets:
base = asset["base"]
url = asset["url"]
if urlparse(url).path.endswith(".f4m"):
streams.update(
HDSStream.parse_manifest(self.session, url, pvswf=SWF_URL)
)
elif base.startswith("rtmp"):
name = "{0}k".format(asset["bitrate"])
params = {
"rtmp": asset["base"],
"playpath": url,
"live": True
}
streams[name] = RTMPStream(self.session, params)
return streams
开发者ID:amadu80,项目名称:repository.xvbmc,代码行数:25,代码来源:tv4play.py
示例4: find_iframe
def find_iframe(self, res):
for url in self.iframe_re.findall(res.text):
if url.startswith("//"):
p = urlparse(self.url)
return "{0}:{1}".format(p.scheme, url)
else:
return url
开发者ID:sheldon0531,项目名称:streamlink,代码行数:7,代码来源:ovvatv.py
示例5: update_qsd
def update_qsd(url, qsd=None, remove=None):
"""
Update or remove keys from a query string in a URL
:param url: URL to update
:param qsd: dict of keys to update, a None value leaves it unchanged
:param remove: list of keys to remove, or "*" to remove all
note: updated keys are never removed, even if unchanged
:return: updated URL
"""
qsd = qsd or {}
remove = remove or []
# parse current query string
parsed = urlparse(url)
current_qsd = OrderedDict(parse_qsl(parsed.query))
# * removes all possible keys
if remove == "*":
remove = list(current_qsd.keys())
# remove keys before updating, but leave updated keys untouched
for key in remove:
if key not in qsd:
del current_qsd[key]
# and update the query string
for key, value in qsd.items():
if value:
current_qsd[key] = value
return parsed._replace(query=urlencode(current_qsd)).geturl()
开发者ID:sheldon0531,项目名称:streamlink,代码行数:32,代码来源:url.py
示例6: __init__
def __init__(self, node, root=None, parent=None, url=None, *args, **kwargs):
# top level has no parent
super(MPD, self).__init__(node, root=self, *args, **kwargs)
# parser attributes
self.url = url
self.timelines = defaultdict(lambda: -1)
self.timelines.update(kwargs.pop("timelines", {}))
self.id = self.attr(u"id")
self.profiles = self.attr(u"profiles", required=True)
self.type = self.attr(u"type", default=u"static", parser=MPDParsers.type)
self.minimumUpdatePeriod = self.attr(u"minimumUpdatePeriod", parser=MPDParsers.duration, default=Duration())
self.minBufferTime = self.attr(u"minBufferTime", parser=MPDParsers.duration, required=True)
self.timeShiftBufferDepth = self.attr(u"timeShiftBufferDepth", parser=MPDParsers.duration)
self.availabilityStartTime = self.attr(u"availabilityStartTime", parser=MPDParsers.datetime,
default=datetime.datetime.fromtimestamp(0, utc), # earliest date
required=self.type == "dynamic")
self.publishTime = self.attr(u"publishTime", parser=MPDParsers.datetime, required=self.type == "dynamic")
self.availabilityEndTime = self.attr(u"availabilityEndTime", parser=MPDParsers.datetime)
self.mediaPresentationDuration = self.attr(u"mediaPresentationDuration", parser=MPDParsers.duration)
self.suggestedPresentationDelay = self.attr(u"suggestedPresentationDelay", parser=MPDParsers.duration)
# parse children
location = self.children(Location)
self.location = location[0] if location else None
if self.location:
self.url = self.location.text
urlp = list(urlparse(self.url))
if urlp[2]:
urlp[2], _ = urlp[2].rsplit("/", 1)
self._base_url = urlunparse(urlp)
self.baseURLs = self.children(BaseURL)
self.periods = self.children(Period, minimum=1)
self.programInformation = self.children(ProgramInformation)
开发者ID:sheldon0531,项目名称:streamlink,代码行数:34,代码来源:dash_manifest.py
示例7: uri
def uri(self, uri):
if uri and urlparse(uri).scheme:
return uri
elif self.base_uri and uri:
return urljoin(self.base_uri, uri)
else:
return uri
开发者ID:amadu80,项目名称:repository.xvbmc,代码行数:7,代码来源:hls_playlist.py
示例8: _get_streams
def _get_streams(self):
self.url = http.resolve_url(self.url)
match = _url_re.match(self.url)
parsed = urlparse(self.url)
if parsed.fragment:
channel_id = parsed.fragment
elif parsed.path[:3] == '/v/':
channel_id = parsed.path.split('/')[-1]
else:
channel_id = match.group("channel")
if not channel_id:
return
channel_id = channel_id.lower().replace("/", "_")
res = http.get(API_URL.format(channel_id))
info = http.json(res, schema=_schema)
if not info["success"]:
return
if info.get("isLive"):
name = "live"
else:
name = "vod"
stream = HTTPStream(self.session, info["payload"])
# Wrap the stream in a FLVPlaylist to verify the FLV tags
stream = FLVPlaylist(self.session, [stream])
return {name: stream}
开发者ID:coder-alpha,项目名称:CcloudTv.bundle,代码行数:31,代码来源:veetle.py
示例9: _find_iframe
def _find_iframe(self, res):
iframe = self.iframe_re.search(res.text)
url = iframe and iframe.group(1)
if url and url.startswith("//"):
p = urlparse(self.url)
url = "{0}:{1}".format(p.scheme, url)
return url
开发者ID:amadu80,项目名称:repository.xvbmc,代码行数:7,代码来源:tv8cat.py
示例10: _get_streams
def _get_streams(self):
res = http.get(self.url)
match = _info_re.search(res.text)
if not match:
return
info = parse_json(match.group(1), schema=_schema)
stream_name = info["mode"]
mp4_url = info.get("mp4_url")
ios_url = info.get("ios_url")
swf_url = info.get("swf_url")
if mp4_url:
stream = HTTPStream(self.session, mp4_url)
yield stream_name, stream
if ios_url:
if urlparse(ios_url).path.endswith(".m3u8"):
streams = HLSStream.parse_variant_playlist(self.session, ios_url)
# TODO: Replace with "yield from" when dropping Python 2.
for stream in streams.items():
yield stream
if swf_url:
stream = self._get_rtmp_stream(swf_url)
if stream:
yield stream_name, stream
开发者ID:coder-alpha,项目名称:CcloudTv.bundle,代码行数:27,代码来源:dmcloud.py
示例11: _get_streams
def _get_streams(self):
self.session.http.headers.update({
"Referer": self.url,
"User-Agent": useragents.FIREFOX
})
iframe_url = None
res = self.session.http.get(self.url)
for iframe in itertags(res.text, "iframe"):
if "embed.lsm.lv" in iframe.attributes.get("src"):
iframe_url = iframe.attributes.get("src")
break
if not iframe_url:
log.error("Could not find player iframe")
return
log.debug("Found iframe: {0}".format(iframe_url))
res = self.session.http.get(iframe_url)
for source in itertags(res.text, "source"):
if source.attributes.get("src"):
stream_url = source.attributes.get("src")
url_path = urlparse(stream_url).path
if url_path.endswith(".m3u8"):
for s in HLSStream.parse_variant_playlist(self.session,
stream_url).items():
yield s
else:
log.debug("Not used URL path: {0}".format(url_path))
开发者ID:sheldon0531,项目名称:streamlink,代码行数:29,代码来源:ltv_lsm_lv.py
示例12: _get_streams
def _get_streams(self):
http.headers.update({"User-Agent": useragents.CHROME,
"Referer": self.referer})
fragment = dict(parse_qsl(urlparse(self.url).fragment))
link = fragment.get("link")
if not link:
link = self._get_tv_link()
if not link:
self.logger.error("Missing link fragment: stream unavailable")
return
player_url = self._api_url.format(link)
self.logger.debug("Requesting player API: {0} (referer={1})", player_url, self.referer)
res = http.get(player_url,
params={"_": int(time.time() * 1000)},
headers={"X-Requested-With": "XMLHttpRequest"})
try:
data = http.json(res, schema=self.api_schema)
except PluginError as e:
print(e)
self.logger.error("Cannot play this stream type")
else:
if data["status"]:
if data["file"].startswith("<"):
self.logger.error("Cannot play embedded streams")
else:
return HLSStream.parse_variant_playlist(self.session, data["file"])
else:
self.logger.error(data["text"])
开发者ID:justastranger,项目名称:Twitchy,代码行数:31,代码来源:seetv.py
示例13: fetch
def fetch(self, segment, retries=None):
if self.closed or not retries:
return
try:
headers = {}
now = datetime.datetime.now(tz=utc)
if segment.available_at > now:
time_to_wait = (segment.available_at - now).total_seconds()
fname = os.path.basename(urlparse(segment.url).path)
log.debug("Waiting for segment: {fname} ({wait:.01f}s)".format(fname=fname, wait=time_to_wait))
sleep_until(segment.available_at)
if segment.range:
start, length = segment.range
if length:
end = start + length - 1
else:
end = ""
headers["Range"] = "bytes={0}-{1}".format(start, end)
return self.session.http.get(segment.url,
timeout=self.timeout,
exception=StreamError,
headers=headers)
except StreamError as err:
log.error("Failed to open segment {0}: {1}", segment.url, err)
return self.fetch(segment, retries - 1)
开发者ID:sheldon0531,项目名称:streamlink,代码行数:28,代码来源:dash.py
示例14: prepend_www
def prepend_www(url):
"""Changes google.com to www.google.com"""
parsed = urlparse(url)
if parsed.netloc.split(".")[0] != "www":
return parsed.scheme + "://www." + parsed.netloc + parsed.path
else:
return url
开发者ID:justastranger,项目名称:Twitchy,代码行数:7,代码来源:__init__.py
示例15: _create_stream
def _create_stream(self, stream, is_live):
stream_name = "{0}p".format(stream["height"])
stream_type = stream["mediaType"]
stream_url = stream["url"]
if stream_type in ("hls", "mp4"):
if urlparse(stream_url).path.endswith("m3u8"):
try:
streams = HLSStream.parse_variant_playlist(self.session, stream_url)
# TODO: Replace with "yield from" when dropping Python 2.
for stream in streams.items():
yield stream
except IOError as err:
self.logger.error("Failed to extract HLS streams: {0}", err)
else:
yield stream_name, HTTPStream(self.session, stream_url)
elif stream_type == "rtmp":
params = {
"rtmp": stream["streamer"],
"playpath": stream["url"],
"swfVfy": SWF_URL,
"pageUrl": self.url,
}
if is_live:
params["live"] = True
else:
params["playpath"] = "mp4:{0}".format(params["playpath"])
stream = RTMPStream(self.session, params)
yield stream_name, stream
开发者ID:coder-alpha,项目名称:CcloudTv.bundle,代码行数:33,代码来源:artetv.py
示例16: _get_streams
def _get_streams(self):
res = http.get(self.url, schema=_live_schema)
if not res:
return
if res["type"] == "hls" and urlparse(res["url"]).path.endswith("m3u8"):
stream = HLSStream(self.session, res["url"])
return dict(hls=stream)
开发者ID:coder-alpha,项目名称:CcloudTv.bundle,代码行数:8,代码来源:ssh101.py
示例17: update_scheme
def update_scheme(current, target):
"""
Take the scheme from the current URL and applies it to the
target URL if the target URL startswith // or is missing a scheme
:param current: current URL
:param target: target URL
:return: target URL with the current URLs scheme
"""
target_p = urlparse(target)
if not target_p.scheme and target_p.netloc:
return "{0}:{1}".format(urlparse(current).scheme,
urlunparse(target_p))
elif not target_p.scheme and not target_p.netloc:
return "{0}://{1}".format(urlparse(current).scheme,
urlunparse(target_p))
else:
return target
开发者ID:sheldon0531,项目名称:streamlink,代码行数:17,代码来源:url.py
示例18: _parse_vod_streams
def _parse_vod_streams(self, vod):
for name, stream in vod["streams"].items():
scheme = urlparse(stream["url"]).scheme
if scheme == "http":
yield name, HLSStream(self.session, stream["url"])
elif scheme == "rtmp":
yield name, self._create_rtmp_stream(stream, live=False)
开发者ID:coder-alpha,项目名称:CcloudTv.bundle,代码行数:8,代码来源:filmon.py
示例19: find_iframe
def find_iframe(self, res):
p = urlparse(self.url)
for url in self.iframe_re.findall(res.text):
if "googletagmanager" not in url:
if url.startswith("//"):
return "{0}:{1}".format(p.scheme, url)
else:
return url
开发者ID:justastranger,项目名称:Twitchy,代码行数:8,代码来源:cdnbg.py
示例20: from_url
def from_url(cls, session, url):
purl = urlparse(url)
querys = dict(parse_qsl(purl.query))
account_id, player_id, _ = purl.path.lstrip("/").split("/", 3)
video_id = querys.get("videoId")
bp = cls(session, account_id=account_id, player_id=player_id)
return bp.get_streams(video_id)
开发者ID:amadu80,项目名称:repository.xvbmc,代码行数:9,代码来源:brightcove.py
注:本文中的streamlink.compat.urlparse函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论