本文整理汇总了Python中streamlink.stream.HLSStream类的典型用法代码示例。如果您正苦于以下问题:Python HLSStream类的具体用法?Python HLSStream怎么用?Python HLSStream使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了HLSStream类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _get_streams
def _get_streams(self):
stream_type = self._url_re.match(self.url).group("type")
hls_re = re.compile(r"""["'](?P<url>[^"']+\.m3u8[^"']*?)["']""")
headers = {
"Origin": "https://www.metube.id",
"User-Agent": useragents.FIREFOX
}
res = self.session.http.get(self.url)
match = hls_re.search(res.text)
if not match:
return
stream_url = match.group("url")
if stream_type == "live":
return HLSStream.parse_variant_playlist(self.session, stream_url,
headers=headers)
else:
streams = {}
for quality, stream in HLSStream.parse_variant_playlist(
self.session,
stream_url,
headers=headers).items():
name = self._VOD_STREAM_NAMES.get(quality, quality)
streams[name] = stream
return streams
开发者ID:sheldon0531,项目名称:streamlink,代码行数:31,代码来源:metube.py
示例2: _get_streams
def _get_streams(self):
api = self._create_api()
match = _url_re.match(self.url)
media_id = int(match.group("media_id"))
try:
info = api.get_info(media_id, fields=["media.stream_data"],
schema=_media_schema)
except CrunchyrollAPIError as err:
raise PluginError(u"Media lookup error: {0}".format(err.msg))
if not info:
return
# The adaptive quality stream contains a superset of all the other streams listeed
has_adaptive = any([s[u"quality"] == u"adaptive" for s in info[u"streams"]])
if has_adaptive:
self.logger.debug(u"Loading streams from adaptive playlist")
for stream in filter(lambda x: x[u"quality"] == u"adaptive", info[u"streams"]):
return HLSStream.parse_variant_playlist(self.session, stream["url"])
else:
streams = {}
# If there is no adaptive quality stream then parse each individual result
for stream in info[u"streams"]:
# the video_encode_id indicates that the stream is not a variant playlist
if u"video_encode_id" in stream:
streams[stream[u"quality"]] = HLSStream(self.session, stream[u"url"])
else:
# otherwise the stream url is actually a list of stream qualities
streams.update(HLSStream.parse_variant_playlist(self.session, stream[u"url"]))
return streams
开发者ID:coder-alpha,项目名称:CcloudTv.bundle,代码行数:32,代码来源:crunchyroll.py
示例3: test_hls_stream
def test_hls_stream(self):
url = "http://test.se/stream.m3u8"
stream = HLSStream(self.session, url, headers={"User-Agent": "Test"})
self.assertEqual(
{"type": "hls",
"url": url,
"headers": {
"User-Agent": "Test",
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Connection": "keep-alive",
}},
stream.__json__()
)
开发者ID:sheldon0531,项目名称:streamlink,代码行数:14,代码来源:test_stream_json.py
示例4: _get_streams
def _get_streams(self):
if self.get_option("email") and self.get_option("password"):
if not self.authenticate(self.get_option("email"), self.get_option("password")):
self.logger.warning("Failed to login as {0}".format(self.get_option("email")))
# find the list of channels from the html in the page
self.url = self.url.replace("https", "http") # https redirects to http
res = http.get(self.url)
if "enter your postcode" in res.text:
self.logger.info("Setting your postcode to: {0}. "
"This can be changed in the settings on tvplayer.com", self.dummy_postcode)
res = http.post(self.update_url,
data=dict(postcode=self.dummy_postcode),
params=dict(return_url=self.url))
stream_attrs = self._get_stream_attrs(res)
if stream_attrs:
stream_data = self._get_stream_data(**stream_attrs)
if stream_data:
if stream_data.get("drmToken"):
self.logger.error("This stream is protected by DRM can cannot be played")
return
else:
return HLSStream.parse_variant_playlist(self.session, stream_data["stream"])
else:
if "need to login" in res.text:
self.logger.error(
"You need to login using --tvplayer-email/--tvplayer-password to view this stream")
开发者ID:justastranger,项目名称:Twitchy,代码行数:30,代码来源:tvplayer.py
示例5: _get_streams
def _get_streams(self):
match = _url_re.match(self.url)
res = http.get(STREAM_INFO_URL,
params=match.groupdict(),
acceptable_status=STATUS_UNAVAILABLE)
if res.status_code in STATUS_UNAVAILABLE:
return
data = http.json(res, schema=_stream_schema)
if data.get("hls_url"):
hls_url = data["hls_url"]
hls_name = "live"
elif data.get("replay_url"):
self.logger.info("Live Stream ended, using replay instead")
hls_url = data["replay_url"]
hls_name = "replay"
else:
raise NoStreamsError(self.url)
streams = HLSStream.parse_variant_playlist(self.session, hls_url)
if not streams:
return {hls_name: HLSStream(self.session, hls_url)}
else:
return streams
开发者ID:amadu80,项目名称:repository.xvbmc,代码行数:25,代码来源:periscope.py
示例6: _get_streams
def _get_streams(self):
match = _url_re.match(self.url)
channel = match.group("channel")
self.session.http.headers.update({'User-Agent': useragents.CHROME})
payload = '{"liveStreamID": "%s"}' % (channel)
res = self.session.http.post(API_URL, data=payload)
status = _status_re.search(res.text)
if not status:
self.logger.info("Stream currently unavailable.")
return
http_url = _rtmp_re.search(res.text).group(1)
http_url = http_url.replace("http:", "https:")
yield "live", HTTPStream(self.session, http_url)
if 'pull-rtmp' in http_url:
url = http_url.replace("https:", "rtmp:").replace(".flv", "")
stream = RTMPStream(self.session, {
"rtmp": url,
"live": True
})
yield "live", stream
if 'wansu-' in http_url:
url = http_url.replace(".flv", "/playlist.m3u8")
for stream in HLSStream.parse_variant_playlist(self.session, url).items():
yield stream
else:
url = http_url.replace("live-hdl", "live-hls").replace(".flv", ".m3u8")
yield "live", HLSStream(self.session, url)
开发者ID:sheldon0531,项目名称:streamlink,代码行数:32,代码来源:app17.py
示例7: _get_vod_streams
def _get_vod_streams(self, params):
manifest_url = params.get("autoURL")
if not manifest_url:
return
res = http.get(manifest_url)
if res.headers.get("Content-Type") == "application/f4m+xml":
streams = HDSStream.parse_manifest(self.session, res.url)
# TODO: Replace with "yield from" when dropping Python 2.
for __ in streams.items():
yield __
elif res.headers.get("Content-Type") == "application/vnd.apple.mpegurl":
streams = HLSStream.parse_variant_playlist(self.session, res.url)
# TODO: Replace with "yield from" when dropping Python 2.
for __ in streams.items():
yield __
else:
manifest = http.json(res, schema=_vod_manifest_schema)
for params in manifest["alternates"]:
name = "{0}p".format(params["height"])
stream = self._create_flv_playlist(params["template"])
yield name, stream
failovers = params.get("failover", [])
for failover in failovers:
stream = self._create_flv_playlist(failover)
yield name, stream
开发者ID:coder-alpha,项目名称:CcloudTv.bundle,代码行数:29,代码来源:dailymotion.py
示例8: _get_streams
def _get_streams(self):
if _stream_url_re.match(self.url):
mode = MODE_STREAM
else:
mode = MODE_VOD
res = http.get(self.url)
match = _json_re.search(res.text)
if match:
data = json.loads(_json_re.search(res.text).group('json').replace('"', '"'))
else:
raise PluginError("Could not extract JSON metadata")
streams = {}
try:
if mode == MODE_STREAM:
sources = data['playlist']['videos'][0]['sources']
elif mode == MODE_VOD:
sources = data['selected_video']['sources']
except (KeyError, IndexError):
raise PluginError("Could not extract sources")
for source in sources:
try:
if source['delivery'] != 'hls':
continue
url = source['src'].replace(r'\/', '/')
except KeyError:
continue
stream = HLSStream.parse_variant_playlist(self.session, url)
# work around broken HTTP connection persistence by acquiring a new connection
http.close()
streams.update(stream)
return streams
开发者ID:,项目名称:,代码行数:35,代码来源:
示例9: _get_streams
def _get_streams(self):
user = self.login(self.options.get("email"), self.options.get("password"))
if user:
self.logger.debug("Logged in to Schoolism as {0}", user)
res = http.get(self.url, headers={"User-Agent": useragents.SAFARI_8})
lesson_playlist = self.playlist_schema.validate(res.text)
part = self.options.get("part")
self.logger.info("Attempting to play lesson Part {0}", part)
found = False
# make request to key-time api, to get key specific headers
res = http.get(self.key_time_url, headers={"User-Agent": useragents.SAFARI_8})
for i, video in enumerate(lesson_playlist, 1):
if video["sources"] and i == part:
found = True
for source in video["sources"]:
for s in HLSStream.parse_variant_playlist(self.session,
source["src"],
headers={"User-Agent": useragents.SAFARI_8,
"Referer": self.url}).items():
yield s
if not found:
self.logger.error("Could not find lesson Part {0}", part)
开发者ID:amadu80,项目名称:repository.xvbmc,代码行数:27,代码来源:schoolism.py
示例10: _get_video_streams
def _get_video_streams(self):
res = self.session.http.get(self.url)
match = self._video_player_re.search(res.text)
if match is None:
return
player_url = match.group('player_url')
stream_data = self.session.http.get(player_url, schema=self._video_stream_schema)
if stream_data is None:
return
# Check geolocation to prevent further errors when stream is parsed
if not self.check_geolocation(stream_data['geoLocRestriction']):
self.logger.error('Stream is geo-restricted')
return
# Check whether streams are DRM-protected
if stream_data.get('drm', False):
self.logger.error('Stream is DRM-protected')
return
now = datetime.datetime.now()
try:
if isinstance(stream_data['sources'], dict):
urls = []
for profile, url in stream_data['sources'].items():
if not url or url in urls:
continue
match = self._stream_size_re.match(url)
if match is not None:
quality = match.group('size')
else:
quality = profile
yield quality, HTTPStream(self.session, url)
urls.append(url)
hls_url = stream_data.get('urlHls') or stream_data.get('streamUrlHls')
if hls_url:
if stream_data.get('isLive', False):
# Live streams require a token
hls_url = self.tokenize_stream(hls_url)
for stream in HLSStream.parse_variant_playlist(self.session, hls_url).items():
yield stream
dash_url = stream_data.get('urlDash') or stream_data.get('streamUrlDash')
if dash_url:
if stream_data.get('isLive', False):
# Live streams require a token
dash_url = self.tokenize_stream(dash_url)
for stream in DASHStream.parse_manifest(self.session, dash_url).items():
yield stream
except IOError as err:
if '403 Client Error' in str(err):
# Check whether video is expired
if 'startDate' in stream_data:
if now < self.iso8601_to_epoch(stream_data['startDate']):
self.logger.error('Stream is not yet available')
elif 'endDate' in stream_data:
if now > self.iso8601_to_epoch(stream_data['endDate']):
self.logger.error('Stream has expired')
开发者ID:sheldon0531,项目名称:streamlink,代码行数:60,代码来源:rtbf.py
示例11: _get_streams
def _get_streams(self):
match = _url_re.match(self.url)
channel = match.group("channel")
channel = channel.replace("_", "-")
playlist_url = PLAYLIST_URL.format(channel)
return HLSStream.parse_variant_playlist(self.session, playlist_url, check_streams=True)
开发者ID:coder-alpha,项目名称:CcloudTv.bundle,代码行数:7,代码来源:oldlivestream.py
示例12: _get_streams
def _get_streams(self):
channel = self.url_re.match(self.url).group(1)
res = http.get(self.api_url.format(channel))
data = http.json(res, schema=self.api_schema)
return HLSStream.parse_variant_playlist(self.session, data["channel_stream_url"])
开发者ID:amadu80,项目名称:repository.xvbmc,代码行数:7,代码来源:powerapp.py
示例13: _get_streams
def _get_streams(self):
# get the HLS xml from the same sub domain as the main url, defaulting to www
sdomain = _url_re.match(self.url).group(1) or "www"
res = self.session.http.get(API_URL.format(sdomain))
stream_url = self.session.http.xml(res, schema=_schema)
return HLSStream.parse_variant_playlist(self.session, stream_url)
开发者ID:sheldon0531,项目名称:streamlink,代码行数:7,代码来源:nhkworld.py
示例14: _get_streams
def _get_streams(self):
if self.is_live:
self.logger.debug("Loading live stream for {0}...", self.channel)
res = self.session.http.get(self.live_api_url, data={"r": random.randint(1, 100000)})
live_data = self.session.http.json(res)
# all the streams are equal for each type, so pick a random one
hls_streams = live_data.get("hls")
if hls_streams:
url = random.choice(hls_streams)
url = url + '&' + urlencode(self.hls_session()) # TODO: use update_qsd
for s in HLSStream.parse_variant_playlist(self.session, url, name_fmt="{pixels}_{bitrate}").items():
yield s
mpd_streams = live_data.get("mpd")
if mpd_streams:
url = random.choice(mpd_streams)
for s in DASHStream.parse_manifest(self.session, url).items():
yield s
elif self.channel == "1tv":
self.logger.debug("Attempting to find VOD stream...", self.channel)
vod_data = self.vod_data()
if vod_data:
self.logger.info(u"Found VOD: {0}".format(vod_data[0]['title']))
for stream in vod_data[0]['mbr']:
yield stream['name'], HTTPStream(self.session, update_scheme(self.url, stream['src']))
开发者ID:sheldon0531,项目名称:streamlink,代码行数:28,代码来源:onetv.py
示例15: _get_streams
def _get_streams(self):
mdata = self._get_movie_data()
if mdata:
log.debug("Found video: {0} ({1})".format(mdata["title"], mdata["id"]))
if mdata["media"]["url"]:
for s in HLSStream.parse_variant_playlist(self.session, mdata["media"]["url"]).items():
yield s
elif self.get_option("email") and self.get_option("password"):
if self.login(self.get_option("email"), self.get_option("password")):
details = self._get_details(mdata["id"])
if details:
for item in details["items"]:
for s in HLSStream.parse_variant_playlist(self.session, item["media"]["url"]).items():
yield s
else:
log.error("You must login to access this stream")
开发者ID:sheldon0531,项目名称:streamlink,代码行数:16,代码来源:openrectv.py
示例16: _create_stream
def _create_stream(self, stream, is_live):
stream_name = "{0}p".format(stream["height"])
stream_type = stream["mediaType"]
stream_url = stream["url"]
if stream_type in ("hls", "mp4"):
if urlparse(stream_url).path.endswith("m3u8"):
try:
streams = HLSStream.parse_variant_playlist(self.session, stream_url)
# TODO: Replace with "yield from" when dropping Python 2.
for stream in streams.items():
yield stream
except IOError as err:
self.logger.error("Failed to extract HLS streams: {0}", err)
else:
yield stream_name, HTTPStream(self.session, stream_url)
elif stream_type == "rtmp":
params = {
"rtmp": stream["streamer"],
"playpath": stream["url"],
"swfVfy": SWF_URL,
"pageUrl": self.url,
}
if is_live:
params["live"] = True
else:
params["playpath"] = "mp4:{0}".format(params["playpath"])
stream = RTMPStream(self.session, params)
yield stream_name, stream
开发者ID:coder-alpha,项目名称:CcloudTv.bundle,代码行数:33,代码来源:artetv.py
示例17: _get_streams
def _get_streams(self):
res = self.session.http.get(self.url)
match = self._video_id_re.search(res.text) or self._video_id_alt_re.search(res.text)
if match is None:
return
broadcaster_id = match.group('broadcaster_id')
video_type = match.group('video_type')
video_id = match.group('video_id')
videos = self.session.http.get(self.DACAST_API_URL.format(broadcaster_id, video_type, video_id), schema=self._api_schema)
token = self.session.http.get(self.DACAST_TOKEN_URL.format(broadcaster_id, video_type, video_id), schema=self._token_schema)
parsed = []
for video_url in videos:
video_url += token
# Ignore duplicate video URLs
if video_url in parsed:
continue
parsed.append(video_url)
# Ignore HDS streams (broken)
if '.m3u8' in video_url:
for s in HLSStream.parse_variant_playlist(self.session, video_url).items():
yield s
开发者ID:sheldon0531,项目名称:streamlink,代码行数:25,代码来源:idf1.py
示例18: _get_streams
def _get_streams(self):
res = http.get(self.url)
match = _info_re.search(res.text)
if not match:
return
info = parse_json(match.group(1), schema=_schema)
stream_name = info["mode"]
mp4_url = info.get("mp4_url")
ios_url = info.get("ios_url")
swf_url = info.get("swf_url")
if mp4_url:
stream = HTTPStream(self.session, mp4_url)
yield stream_name, stream
if ios_url:
if urlparse(ios_url).path.endswith(".m3u8"):
streams = HLSStream.parse_variant_playlist(self.session, ios_url)
# TODO: Replace with "yield from" when dropping Python 2.
for stream in streams.items():
yield stream
if swf_url:
stream = self._get_rtmp_stream(swf_url)
if stream:
yield stream_name, stream
开发者ID:coder-alpha,项目名称:CcloudTv.bundle,代码行数:27,代码来源:dmcloud.py
示例19: _get_streams
def _get_streams(self):
if "eltrecetv.com.ar/vivo" in self.url.lower():
try:
http.headers = {'Referer': self.url,
'User-Agent': useragents.ANDROID}
res = http.get('https://api.iamat.com/metadata/atcodes/eltrece')
yt_id = parse_json(res.text)["atcodes"][0]["context"]["ahora"]["vivo"]["youtubeVideo"]
yt_url = "https://www.youtube.com/watch?v={0}".format(yt_id)
return self.session.streams(yt_url)
except BaseException:
self.logger.info("Live content is temporarily unavailable. Please try again later.")
else:
try:
http.headers = {'Referer': self.url,
'User-Agent': useragents.CHROME}
res = http.get(self.url)
_player_re = re.compile(r'''data-kaltura="([^"]+)"''')
match = _player_re.search(res.text)
if not match:
return
entry_id = parse_json(match.group(1).replace(""", '"'))["entryId"]
hls_url = "https://vodgc.com/p/111/sp/11100/playManifest/entryId/{0}/format/applehttp/protocol/https/a.m3u8".format(entry_id)
return HLSStream.parse_variant_playlist(self.session, hls_url)
except BaseException:
self.logger.error("The requested VOD content is unavailable.")
开发者ID:justastranger,项目名称:Twitchy,代码行数:25,代码来源:eltrecetv.py
示例20: _get_streams
def _get_streams(self):
iframe_url = self._get_iframe_url(self.url)
if iframe_url:
log.debug("Found iframe URL={0}".format(iframe_url))
info_url = self._get_stream_info_url(iframe_url)
if info_url:
log.debug("Getting info from URL: {0}".format(info_url))
res = self.session.http.get(info_url, headers={"Referer": iframe_url})
data = self.session.http.json(res)
if data['status'] == 200:
for media in data['data']['playlist']['medialist']:
if media['errors']:
log.error(media['errors'].replace('\n', '').replace('\r', ''))
for media_type in media.get('sources', []):
if media_type == "m3u8":
hls_url = media['sources'][media_type]['auto']
for s in HLSStream.parse_variant_playlist(self.session, hls_url).items():
yield s
if media_type == "http":
for pix, url in media['sources'][media_type].items():
yield "{0}p".format(pix), HTTPStream(self.session, url)
else:
log.error("An error occurred: {0}".format(data['errors'].replace('\n', '').replace('\r', '')))
else:
log.error("Unable to get stream info URL")
else:
log.error("Could not find video iframe")
开发者ID:sheldon0531,项目名称:streamlink,代码行数:33,代码来源:live_russia_tv.py
注:本文中的streamlink.stream.HLSStream类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论