本文整理汇总了Python中pytoolbox.encoding.to_bytes函数的典型用法代码示例。如果您正苦于以下问题:Python to_bytes函数的具体用法?Python to_bytes怎么用?Python to_bytes使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了to_bytes函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: add_media
def add_media(config, media):
if media.status != Media.PENDING:
media_src_path = config.storage_medias_path(media, generate=False)
if media_src_path:
media_dst_path = config.storage_medias_path(media, generate=True)
if media_dst_path != media_src_path:
# Generate media storage uri and move it to media storage path + set permissions
media.uri = config.storage_medias_uri(media)
try_makedirs(os.path.dirname(media_dst_path))
the_error = None
for i in xrange(5):
try:
os.rename(media_src_path, media_dst_path)
# FIXME chown chmod
the_error = None
break
except OSError as error:
the_error = error
time.sleep(1)
if the_error:
raise IndexError(to_bytes(u'An error occured : {0} ({1} -> {2}).'.format(
the_error, media_src_path, media_dst_path)))
try:
size = get_size(os.path.dirname(media_dst_path))
except OSError:
raise ValueError(to_bytes(u'Unable to detect size of media asset {0}.'.format(media_dst_path)))
duration = get_media_duration(media_dst_path)
if duration is None:
raise ValueError(to_bytes(u'Unable to detect duration of media asset {0}.'.format(media_dst_path)))
return (size, duration)
else:
raise NotImplementedError(to_bytes(u'FIXME Add of external URI not implemented.'))
return (0, None)
开发者ID:codecwatch,项目名称:OSCIED,代码行数:33,代码来源:utils.py
示例2: send_email_task
def send_email_task(self, task, status, media=None, media_out=None):
if task.send_email:
user = self.get_user({u'_id': task.user_id}, {u'mail': 1})
if not user:
raise IndexError(to_bytes(u'Unable to find user with id {0}.'.format(task.user_id)))
if isinstance(task, TransformTask):
media_in = self.get_media({u'_id': task.media_in_id})
if not media_in:
# FIXME maybe do not raise but put default value or return ?
raise IndexError(to_bytes(u'Unable to find input media asset with id {0}.'.format(
task.media_in_id)))
profile = self.get_transform_profile({u'_id': task.profile_id})
if not profile:
# FIXME maybe do not raise but put default value or return ?
raise IndexError(to_bytes(u'Unable to find transformation profile with id {0}.'.format(
task.profile_id)))
task.load_fields(user, media_in, media_out, profile)
template, name = self.config.email_ttask_template, u'Transformation'
elif isinstance(task, PublisherTask):
task.load_fields(user, media)
template, name = self.config.email_ptask_template, u'Publication'
else:
return # FIXME oups
task.append_async_result()
with open(template, u'r', u'utf-8') as template_file:
text_plain = Template(template_file.read()).render(object2dict(task, include_properties=True))
# FIXME YourFormatter().format(template_file.read(), task)
self.send_email(task.user.mail, u'OSCIED - {0} task {1} {2}'.format(name, task._id, status), text_plain)
开发者ID:codecwatch,项目名称:OSCIED,代码行数:28,代码来源:server.py
示例3: put_media
def put_media(self, media, onlyMP2TS):
"""Put an incoming media packet."""
if self.flushing:
raise ValueError(to_bytes(FecReceiver.ER_FLUSHING))
if onlyMP2TS:
if not media.validMP2T:
raise ValueError(to_bytes(FecReceiver.ER_VALID_RTP_MP2TS))
else:
if not media.valid:
raise ValueError(to_bytes(FecReceiver.ER_VALID_RTP))
# Put the media packet into medias buffer
if media.sequence in self.medias:
self.media_overwritten += 1
self.medias[media.sequence] = media
if len(self.medias) > self.max_media:
self.max_media = len(self.medias)
self.media_received += 1
cross = self.crosses.get(media.sequence)
if cross:
# Simulate the recovery of a media packet to update buffers and potentially start
self.recover_media_packet(media.sequence, cross, None) # a recovery cascade !
self.out() # FIXME maybe better to call it from another thread
开发者ID:davidfischer-ch,项目名称:pytoolbox,代码行数:25,代码来源:receiver.py
示例4: revoke_publisher_task
def revoke_publisher_task(self, task, callback_url, terminate=False, remove=False):
u"""
This do not delete tasks from tasks database (if remove=False) but set revoked attribute in tasks database and
broadcast revoke request to publication units with celery.
If the task is actually running it will be cancelled if terminated = True.
In any case, the output media asset will be deleted (task running or successfully finished).
"""
if valid_uuid(task, none_allowed=False):
task = self.get_publisher_task({u'_id': task})
task.is_valid(True)
if task.status in PublisherTask.CANCELED_STATUS:
raise ValueError(to_bytes(u'Cannot revoke a publication task with status {0}.'.format(task.status)))
if not self.config.is_mock:
revoke(task._id, terminate=terminate)
if task.status == PublisherTask.SUCCESS and not self.config.is_mock:
# Send revoke task to the worker that published the media
callback = Callback(self.config.api_url + callback_url, u'node', self.config.node_secret)
queue = task.get_hostname()
result = PublisherWorker.revoke_publisher_task.apply_async(
args=(task.publish_uri, object2json(callback, False)), queue=queue)
if not result.id:
raise ValueError(to_bytes(u'Unable to transmit task to queue {0}.'.format(queue)))
logging.info(u'New revoke publication task {0} -> queue {1}.'.format(result.id, queue))
self.update_publisher_task_and_media(task, revoke_task_id=result.id, status=PublisherTask.REVOKING)
else:
self.update_publisher_task_and_media(task, status=PublisherTask.REVOKED)
if remove:
self._db.publisher_tasks.remove({u'_id': task._id})
开发者ID:codecwatch,项目名称:OSCIED,代码行数:28,代码来源:server.py
示例5: revoke_publisher_task
def revoke_publisher_task(publish_uri, callback_json):
def revoke_publish_callback(status, publish_uri):
data = {u'task_id': request.id, u'status': status}
if publish_uri:
data[u'publish_uri'] = publish_uri
data_json = object2json(data, False)
if callback is None:
print(u'{0} [ERROR] Unable to callback orchestrator: {1}'.format(request.id, data_json))
else:
r = callback.post(data_json)
print(u'{0} Code {1} {2} : {3}'.format(request.id, r.status_code, r.reason, r._content))
# ------------------------------------------------------------------------------------------------------------------
# Avoid 'referenced before assignment'
callback = None
request = current_task.request
try:
# Let's the task begin !
print(u'{0} Revoke publication task started'.format(request.id))
# Read current configuration to translate files URIs to local paths
local_config = PublisherLocalConfig.read(LOCAL_CONFIG_FILENAME, inspect_constructor=False)
print(object2json(local_config, True))
# Load and check task parameters
callback = Callback.from_json(callback_json, inspect_constructor=True)
callback.is_valid(True)
# Update callback socket according to configuration
if local_config.api_nat_socket and len(local_config.api_nat_socket) > 0:
callback.replace_netloc(local_config.api_nat_socket)
publish_root = dirname(local_config.publish_uri_to_path(publish_uri))
if not publish_root:
raise ValueError(to_bytes(u'Media asset is not hosted on this publication point.'))
# Remove publication directory
start_date, start_time = datetime_now(), time.time()
shutil.rmtree(publish_root, ignore_errors=True)
if valid_uri(publish_uri, check_404=True):
raise IOError(to_bytes(u'Media asset is reachable from publication URI {0}'.format(publish_uri)))
elapsed_time = time.time() - start_time
# Here all seem okay
print(u'{0} Revoke publication task successful, media asset unpublished from {1}'.format(
request.id, publish_uri))
revoke_publish_callback(PublisherTask.SUCCESS, publish_uri)
return {u'hostname': request.hostname, u'start_date': start_date, u'elapsed_time': elapsed_time, u'eta_time': 0,
u'percent': 100}
except Exception as error:
# Here something went wrong
print(u'{0} Revoke publication task failed'.format(request.id))
revoke_publish_callback(unicode(error), None)
raise
开发者ID:codecwatch,项目名称:OSCIED,代码行数:59,代码来源:PublisherWorker.py
示例6: add
def add(self, *args, **kwargs):
if not(bool(args) ^ bool(kwargs)):
raise ValueError(to_bytes(u'You must set args OR kwargs.'))
if args and len(args) != 1:
raise ValueError(to_bytes(u'args should contain only 1 value.'))
value = args[0] if args else kwargs
response = self.api_client.do_request(post, self.get_url(), data=object2json(value, include_properties=False))
instance = dict2object(self.cls, response, inspect_constructor=True) if self.cls else response
# Recover user's secret
if isinstance(instance, User):
instance.secret = value.secret if args else kwargs[u'secret']
return instance
开发者ID:davidfischer-ch,项目名称:OSCIED,代码行数:12,代码来源:base.py
示例7: validate_task
def validate_task(media_in, profile, media_out):
if media_in.status != Media.READY:
raise NotImplementedError(to_bytes(u"Cannot launch the task, input media asset's status is {0}.".format(
media_in.status)))
if media_in.is_dash and profile.encoder_name != u'copy':
raise NotImplementedError(to_bytes(u'Cannot launch the task, input media asset is MPEG-DASH content and enc'
'oder is not copy.'))
if profile.is_dash and not media_out.is_dash:
raise ValueError(to_bytes(u'Cannot launch the task, output media asset is not a MPD but task is based on a '
'MPEG-DASH encoder called {0}.'.format(profile.encoder_name)))
if not profile.is_dash and media_out.is_dash:
raise ValueError(to_bytes(u'Cannot launch the task, output media asset is a MPD but task is not based on a '
'MPEG-DASH encoder called {0}.'.format(profile.encoder_name)))
开发者ID:davidfischer-ch,项目名称:OSCIED,代码行数:13,代码来源:models.py
示例8: launch_transform_task
def launch_transform_task(
self, user_id, media_in_id, profile_id, filename, metadata, send_email, queue, callback_url
):
if self.is_standalone:
user = self.get_user({"_id": user_id}, {"secret": 0})
if not user:
raise IndexError(to_bytes("No user with id {0}.".format(user_id)))
media_in = self.get_media({"_id": media_in_id})
if not media_in: # FIXME maybe a media access control here
raise IndexError(to_bytes("No media asset with id {0}.".format(media_in_id)))
profile = self.get_transform_profile({"_id": profile_id})
if not profile: # FIXME maybe a profile access control here
raise IndexError(to_bytes("No transformation profile with id {0}.".format(profile_id)))
if not queue in self.config.transform_queues:
raise IndexError(to_bytes("No transformation queue with name {0}.".format(queue)))
media_out = Media(
user_id=user_id, parent_id=media_in_id, filename=filename, metadata=metadata, status=Media.PENDING
)
media_out.uri = self.config.storage_medias_uri(media_out)
TransformTask.validate_task(media_in, profile, media_out)
self.save_media(media_out) # Save pending output media
# FIXME create a one-time password to avoid fixed secret authentication ...
callback = Callback(self.config.api_url + callback_url, "node", self.config.node_secret)
if self.is_mock:
result_id = unicode(uuid.uuid4())
else:
result = TransformWorker.transform_task.apply_async(
args=(
object2json(media_in, False),
object2json(media_out, False),
object2json(profile, False),
object2json(callback, False),
),
queue=queue,
)
result_id = result.id
if not result_id:
raise ValueError(to_bytes("Unable to transmit task to workers of queue {0}.".format(queue)))
logging.info("New transformation task {0} -> queue {1}.".format(result_id, queue))
task = TransformTask(
user_id=user_id,
media_in_id=media_in._id,
media_out_id=media_out._id,
profile_id=profile._id,
send_email=send_email,
_id=result_id,
)
task.statistic["add_date"] = datetime_now()
self._db.transform_tasks.save(task.__dict__, safe=True)
return task
开发者ID:ebu,项目名称:OSCIED,代码行数:50,代码来源:server.py
示例9: ensure_publisher_units
def ensure_publisher_units(self, environment, num_units, terminate, test=False):
u"""
.. warning::
FIXME implement more robust resources listing and removing, sometimes juju fail during a call
(e.g. destroy_transform_units with num_units=10) and then some machines are not destroyed.
* implement a garbage collector method callable by user when he want to destroy useless machines ?
* implement a thread to handle removing unit asynchronously.
"""
if not test:
raise NotImplementedError(u'This method is in development, set test to True to disable this warning.')
environments, default = self.get_environments()
if environment == 'default':
environment = default
same_environment = (environment == default)
config = juju.load_unit_config(self.config.publisher_config)
config[u'rabbit_queues'] = u'publisher_{0}'.format(environment)
if not same_environment:
raise NotImplementedError(to_bytes(u'Unable to setup publication units into non-default environment {0} '
'(default is {1}).'.format(environment, default)))
config[u'mongo_connection'] = self.config.mongo_node_connection
config[u'rabbit_connection'] = self.config.rabbit_connection
# FIXME copy storage configuration, first method
config[u'storage_address'] = self.config.storage_address
config[u'storage_fstype'] = self.config.storage_fstype
config[u'storage_mountpoint'] = self.config.storage_mountpoint
config[u'storage_options'] = self.config.storage_options
juju.save_unit_config(self.config.charms_config, self.config.publisher_service, config)
the_environment = self._get_environment(environment)
the_environment.ensure_num_units(self.config.publisher_service, local=True, num_units=num_units,
terminate=terminate, repository=self.config.charms_repository)
if same_environment and num_units:
try:
try:
the_environment.add_relation(self.config.orchestra_service, self.config.publisher_service,
u'publisher', u'publisher')
except RuntimeError as e:
raise NotImplementedError(to_bytes(u'Orchestra service must be available and running on default '
'environment {0}, reason : {1}'.format(default, e)))
try:
the_environment.add_relation(self.config.storage_service, self.config.publisher_service)
except RuntimeError as e:
raise NotImplementedError(to_bytes(u'Storage service must be available and running on default '
'environment {0}, reason : {1}'.format(default, e)))
except NotImplementedError:
the_environment.destroy_service(self.config.publisher_service)
raise
开发者ID:codecwatch,项目名称:OSCIED,代码行数:49,代码来源:server.py
示例10: delete_media
def delete_media(self, media):
if valid_uuid(media, none_allowed=False):
media = self.get_media({u'_id': media})
media.is_valid(True)
task = self.get_transform_task({u'media_in_id': media._id}, append_result=True)
if task and task.status in TransformTask.WORK_IN_PROGRESS_STATUS:
raise ValueError(to_bytes(u'Cannot delete the media asset, it is actually in use by transformation task wit'
'h id {0} and status {1}.'.format(task._id, task.status)))
task = self.get_publisher_task({u'media_id': media._id}, append_result=True)
if task and task.status in TransformTask.WORK_IN_PROGRESS_STATUS:
raise ValueError(to_bytes(u'Cannot delete the media asset, it is actually in use by publication task with i'
'd {0} and status {1}.'.format(task._id, task.status)))
media.status = Media.DELETED
self.save_media(media)
#self._db.medias.remove({'_id': media._id})
Storage.delete_media(self.config, media)
开发者ID:codecwatch,项目名称:OSCIED,代码行数:16,代码来源:server.py
示例11: api_user_id_delete
def api_user_id_delete(id=None, auth_user=None, api_core=None, request=None):
u"""Delete a user."""
user = api_core.get_user(spec={u'_id': id})
if not user:
raise IndexError(to_bytes(u'No user with id {0}.'.format(id)))
api_core.delete_user(user)
return ok_200(u'The user "{0}" has been deleted.'.format(user.name), include_properties=False)
开发者ID:codecwatch,项目名称:OSCIED,代码行数:7,代码来源:api_user.py
示例12: update_widget_attributes
def update_widget_attributes(widget, updates):
"""
Update attributes of a `widget` with content of `updates` handling classes addition [+],
removal [-] and toggle [^].
**Example usage**
>>> from pytoolbox.unittest import asserts
>>> widget = type(str(''), (), {})
>>> widget.attrs = {'class': 'mondiale'}
>>> update_widget_attributes(
... widget, {'class': '+pigeon +pigeon +voyage -mondiale -mondiale, ^voyage ^voyageur'})
>>> asserts.dict_equal(widget.attrs, {'class': 'pigeon voyageur'})
>>> update_widget_attributes(widget, {'class': '+le', 'cols': 100})
>>> asserts.dict_equal(widget.attrs, {'class': 'le pigeon voyageur', 'cols': 100})
"""
updates = copy(updates)
if 'class' in updates:
class_set = set([c for c in widget.attrs.get('class', '').split(' ') if c])
for cls in set([c for c in updates['class'].split(' ') if c]):
operation, cls = cls[0], cls[1:]
if operation == '+' or (operation == '^' and cls not in class_set):
class_set.add(cls)
elif operation in ('-', '^'):
class_set.discard(cls)
else:
raise ValueError(to_bytes(
'updates must be a valid string with "<op>class <op>..." with op in [+-^].'))
widget.attrs['class'] = ' '.join(sorted(class_set))
del updates['class']
widget.attrs.update(updates)
开发者ID:davidfischer-ch,项目名称:pytoolbox,代码行数:31,代码来源:utils.py
示例13: subordinate_register
def subordinate_register(self, mongo=None, rabbit=None):
local_cfg = self.local_config
if self.subordinate_config_is_enabled:
self.info(u'Override subordinate parameters with charm configuration')
mongo = self.config.mongo_connection
rabbit = self.config.rabbit_connection
socket = self.config.api_nat_socket
elif mongo and rabbit:
self.info(u'Use subordinate parameters from charm relation')
socket = u''
else:
return
self.info(u'Register the Orchestrator')
local_cfg.api_nat_socket = socket
try:
infos = pymongo.uri_parser.parse_uri(mongo)
assert len(infos[u'nodelist']) == 1
infos.update({
u'concurrency': self.config.concurrency, u'directory': self.directory,
u'host': infos[u'nodelist'][0][0], u'port': infos[u'nodelist'][0][1],
u'group': DAEMON_GROUP, u'name': local_cfg.worker_name, u'queues': self.rabbit_queues,
u'rabbit': rabbit, u'user': DAEMON_USER
})
del infos[u'nodelist']
self.info(u'{0}'.format(infos))
for name in (u'concurrency', u'group', u'host', u'name', u'port', u'queues', u'rabbit', u'user'):
assert infos[name], u'Info {0} is empty'.format(name)
except:
raise ValueError(to_bytes(u'Unable to parse MongoDB connection {0}'.format(mongo)))
self.template2config(local_cfg.celery_init_template_file, local_cfg.celery_init_file, {})
self.template2config(local_cfg.celery_default_template_file, local_cfg.celery_default_file, infos)
self.template2config(local_cfg.celery_config_template_file, local_cfg.celery_config_file, infos)
os.chmod(local_cfg.celery_init_file, 755)
self.cmd(u'update-rc.d {0} defaults'.format(local_cfg.worker_name))
self.remark(u'Orchestrator successfully registered')
开发者ID:codecwatch,项目名称:OSCIED,代码行数:35,代码来源:hooks_base.py
示例14: api_transform_profile_id_delete
def api_transform_profile_id_delete(id=None, auth_user=None, api_core=None, request=None):
u"""Delete a transformation profile."""
profile = api_core.get_transform_profile(spec={u'_id': id})
if not profile:
raise IndexError(to_bytes(u'No transformation profile with id {0}.'.format(id)))
api_core.delete_transform_profile(profile)
return ok_200(u'The transformation profile "{0}" has been deleted.'.format(profile.title),
include_properties=False)
开发者ID:codecwatch,项目名称:OSCIED,代码行数:8,代码来源:api_transform.py
示例15: save_transform_profile
def save_transform_profile(self, profile):
profile.is_valid(True)
# FIXME exact matching !
try:
self._db.transform_profiles.save(profile.__dict__, safe=True)
except DuplicateKeyError:
raise ValueError(to_bytes(u'The title {0} is already used by another transformation profile.'.format(
profile.title)))
开发者ID:codecwatch,项目名称:OSCIED,代码行数:8,代码来源:server.py
示例16: get_media_bitrate
def get_media_bitrate(filename):
cmd = u'ffprobe "{0}"'.format(filename)
pipe = Popen(shlex.split(to_bytes(cmd)), stderr=PIPE, close_fds=True)
match = re.search(ur'bitrate: (?P<bitrate>\d+)', unicode(pipe.stderr.read()))
if not match:
return None
bitrate = match.group(u'bitrate')
return bitrate
开发者ID:codecwatch,项目名称:OSCIED,代码行数:8,代码来源:TransformWorker.py
示例17: cleanup
def cleanup(self):
"""Remove FEC packets that are stored / waiting but useless."""
if self.flushing:
raise ValueError(to_bytes(FecReceiver.ER_FLUSHING))
if self.startup:
raise ValueError(to_bytes(FecReceiver.ER_STARTUP))
if self.delay_units == FecReceiver.PACKETS:
start, end = self.position, (self.position + self.delay_value) & RtpPacket.S_MASK
for media_sequence in self.crosses.keys:
if not self.validity_window(media_sequence, start, end):
cross = self.crosses[media_sequence]
del self.cols[cross['col_sequence']]
del self.rows[cross['row_sequence']]
del self.crosses[media_sequence]
elif self.delay_units == FecReceiver.SECONDS:
raise NotImplementedError()
raise ValueError(to_bytes(FecReceiver.ER_DELAY_UNITS.format(self.delay_units)))
开发者ID:davidfischer-ch,项目名称:pytoolbox,代码行数:17,代码来源:receiver.py
示例18: launch_publisher_task
def launch_publisher_task(self, user_id, media_id, send_email, queue, callback_url):
if self.config.is_standalone:
user = self.get_user({u'_id': user_id}, {u'secret': 0})
if not user:
raise IndexError(to_bytes(u'No user with id {0}.'.format(user_id)))
media = self.get_media({u'_id': media_id})
if not media: # FIXME maybe a media access control here
raise IndexError(to_bytes(u'No media asset with id {0}.'.format(media_id)))
if not queue in self.config.publisher_queues:
raise IndexError(to_bytes(u'No publication queue with name {0}.'.format(queue)))
if media.status != Media.READY:
raise NotImplementedError(to_bytes(u"Cannot launch the task, input media asset's status is {0}.".format(
media.status)))
if len(media.public_uris) > 0:
raise NotImplementedError(to_bytes(u'Cannot launch the task, input media asset is already published.'))
other = self.get_publisher_task({u'media_id': media._id})
if other and other.status not in PublisherTask.FINAL_STATUS and other.status != PublisherTask.REVOKED:
raise NotImplementedError(to_bytes(u'Cannot launch the task, input media asset will be published by another'
' task with id {0}.'.format(other._id)))
# FIXME create a one-time password to avoid fixed secret authentication ...
callback = Callback(self.config.api_url + callback_url, u'node', self.config.node_secret)
if self.config.is_mock:
result_id = unicode(uuid.uuid4())
else:
result = PublisherWorker.publisher_task.apply_async(
args=(object2json(media, False), object2json(callback, False)), queue=queue)
result_id = result.id
if not result_id:
raise ValueError(to_bytes(u'Unable to transmit task to workers of queue {0}.'.format(queue)))
logging.info(u'New publication task {0} -> queue {1}.'.format(result_id, queue))
task = PublisherTask(user_id=user_id, media_id=media._id, send_email=send_email, _id=result_id)
task.statistic[u'add_date'] = int(time())
self._db.publisher_tasks.save(task.__dict__, safe=True)
return task
开发者ID:codecwatch,项目名称:OSCIED,代码行数:34,代码来源:server.py
示例19: transform_callback
def transform_callback(self, task_id, status):
task = self.get_transform_task({"_id": task_id})
if not task:
raise IndexError(to_bytes("No transformation task with id {0}.".format(task_id)))
media_out = self.get_media({"_id": task.media_out_id})
if not media_out:
raise IndexError(to_bytes("Unable to find output media asset with id {0}.".format(task.media_out_id)))
if status == TransformTask.SUCCESS:
media_out.status = Media.READY
self.save_media(media_out)
logging.info("{0} Media {1} is now {2}".format(task_id, media_out.filename, media_out.status))
# self.send_email_task(task, TransformTask.SUCCESS, media_out=media_out)
else:
self.delete_media(media_out)
task.statistic["error_details"] = status.replace("\n", "\\n")
self._db.transform_tasks.save(task.__dict__, safe=True)
logging.info("{0} Error: {1}".format(task_id, status))
logging.info("{0} Media {1} is now deleted".format(task_id, media_out.filename))
开发者ID:ebu,项目名称:OSCIED,代码行数:18,代码来源:server.py
示例20: set_delay
def set_delay(self, value, units):
"""Set desired size for the internal media buffer."""
if units == FecReceiver.PACKETS:
self.delay_value = value
self.delay_units = units
elif units == FecReceiver.SECONDS:
raise NotImplementedError()
else:
raise ValueError(to_bytes(FecReceiver.ER_DELAY_UNITS.format(units)))
开发者ID:davidfischer-ch,项目名称:pytoolbox,代码行数:9,代码来源:receiver.py
注:本文中的pytoolbox.encoding.to_bytes函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论