本文整理汇总了Python中simpy.simulate函数的典型用法代码示例。如果您正苦于以下问题:Python simulate函数的具体用法?Python simulate怎么用?Python simulate使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了simulate函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_interrupted_join
def test_interrupted_join(env):
"""Tests that interrupts are raised while the victim is waiting for
another process. The victim should get unregistered from the other
process.
"""
def interruptor(env, process):
yield env.timeout(1)
process.interrupt()
def child(env):
yield env.timeout(2)
def parent(env):
child_proc = env.start(child(env))
try:
yield child_proc
pytest.fail('Did not receive an interrupt.')
except Interrupt:
assert env.now == 1
assert child_proc.is_alive
# We should not get resumed when child terminates.
yield env.timeout(5)
assert env.now == 6
parent_proc = env.start(parent(env))
env.start(interruptor(env, parent_proc))
simulate(env)
开发者ID:dinesh121991,项目名称:csim,代码行数:29,代码来源:test_wait_for_proc.py
示例2: test_unregister_after_interrupt
def test_unregister_after_interrupt(env):
"""If a process is interrupted while waiting for another one, it
should be unregistered from that process.
"""
def interruptor(env, process):
yield env.timeout(1)
process.interrupt()
def child(env):
yield env.timeout(2)
def parent(env):
child_proc = env.start(child(env))
try:
yield child_proc
pytest.fail('Did not receive an interrupt.')
except Interrupt:
assert env.now == 1
assert child_proc.is_alive
yield env.timeout(2)
assert env.now == 3
assert not child_proc.is_alive
parent_proc = env.start(parent(env))
env.start(interruptor(env, parent_proc))
simulate(env)
开发者ID:dinesh121991,项目名称:csim,代码行数:28,代码来源:test_wait_for_proc.py
示例3: test_container
def test_container(env, log):
"""A *container* is a resource (of optinally limited capacity) where
you can put in our take out a discrete or continuous amount of
things (e.g., a box of lump sugar or a can of milk). The *put* and
*get* operations block if the buffer is to full or to empty. If they
return, the process nows that the *put* or *get* operation was
successfull.
"""
def putter(env, buf, log):
yield env.timeout(1)
while True:
yield buf.put(2)
log.append(('p', env.now))
yield env.timeout(1)
def getter(env, buf, log):
yield buf.get(1)
log.append(('g', env.now))
yield env.timeout(1)
yield buf.get(1)
log.append(('g', env.now))
buf = simpy.Container(env, init=0, capacity=2)
env.start(putter(env, buf, log))
env.start(getter(env, buf, log))
simpy.simulate(env, until=5)
assert log == [('g', 1), ('p', 1), ('g', 2), ('p', 2)]
开发者ID:dinesh121991,项目名称:csim,代码行数:30,代码来源:test_resources.py
示例4: test_start_delayed
def test_start_delayed(env):
def pem(env):
assert env.now == 5
yield env.timeout(1)
start_delayed(env, pem(env), delay=5)
simulate(env)
开发者ID:jad-darrous,项目名称:predictsim,代码行数:7,代码来源:test_util.py
示例5: test_blackbox_monitor_processes
def test_blackbox_monitor_processes(env):
"""A :class:`~simpy.monitoring.Monitor` also provides a process
method (:meth:`Monitor.run()`) that collects data from a number of
objects in regular intervals.
"""
class Spam(object):
def __init__(self, env):
self.a = 0
self.process = env.start(self.pem(env))
def pem(self, env):
while True:
self.a += env.now
yield env.timeout(1)
spams = [Spam(env) for i in range(2)]
monitor = monitoring.Monitor()
# configure also accepts a generator that creates a number of
# collector functions:
monitor.configure(lambda: [env.now] + [spam.a for spam in spams])
env.start(monitor.run(env, collect_interval=1))
simpy.simulate(env, 3)
assert monitor.data == [
# (env.now, spam[0].a, spam[1].a)
[0, 0, 0],
[1, 1, 1],
[2, 3, 3],
]
开发者ID:dinesh121991,项目名称:csim,代码行数:32,代码来源:test_monitoring.py
示例6: test_mixed_preemption
def test_mixed_preemption(env, log):
def process(id, env, res, delay, prio, preempt, log):
yield env.timeout(delay)
with res.request(priority=prio, preempt=preempt) as req:
try:
yield req
yield env.timeout(5)
log.append((env.now, id))
except simpy.Interrupt as ir:
log.append((env.now, id, tuple(ir.cause)))
res = simpy.PreemptiveResource(env, 2)
p0 = env.start(process(0, env, res, 0, 1, True, log))
p1 = env.start(process(1, env, res, 0, 1, True, log))
p2 = env.start(process(2, env, res, 1, 0, False, log))
p3 = env.start(process(3, env, res, 1, 0, True, log))
p4 = env.start(process(4, env, res, 2, 2, True, log))
simpy.simulate(env)
assert log == [
(1, 1, (p3, 0)),
(5, 0),
(6, 3),
(10, 2),
(11, 4),
]
开发者ID:dinesh121991,项目名称:csim,代码行数:27,代码来源:test_resources.py
示例7: test_resource_release_after_interrupt
def test_resource_release_after_interrupt(env):
"""A process needs to release a resource, even it it was interrupted
and does not continue to wait for it."""
def pem(env, res):
with res.request() as req:
yield req
yield env.timeout(1)
def victim(env, res):
try:
evt = res.request()
yield evt
pytest.fail('Should not have gotten the resource.')
except simpy.Interrupt:
# Dont wait for the resource
res.release(evt)
assert env.now == 0
env.exit()
def interruptor(env, proc):
proc.interrupt()
yield env.exit(0)
res = simpy.Resource(env, 1)
victim_proc = env.start(victim(env, res))
env.start(interruptor(env, victim_proc))
env.start(pem(env, res))
simpy.simulate(env)
开发者ID:dinesh121991,项目名称:csim,代码行数:28,代码来源:test_resources.py
示例8: test_wait_for_all_with_errors
def test_wait_for_all_with_errors(env):
"""On default wait_for_all should fail immediately if one of its events
fails."""
def child_with_error(env, value):
yield env.timeout(value)
raise RuntimeError('crashing')
def parent(env):
events = [env.timeout(1, value=1),
env.start(child_with_error(env, 2)),
env.timeout(3, value=3)]
try:
condition = all_of(events)
yield condition
assert False, 'There should have been an exception'
except RuntimeError as e:
assert e.args[0] == 'crashing'
# Although the condition has failed, intermediate results are
# available.
assert condition._results[events[0]] == 1
assert condition._results[events[1]].args[0] == 'crashing'
# The last child has not terminated yet.
assert events[2] not in condition._results
env.start(parent(env))
simulate(env)
开发者ID:jad-darrous,项目名称:predictsim,代码行数:28,代码来源:test_util.py
示例9: test_whitebox_monitor_data_object
def test_whitebox_monitor_data_object(env):
"""If the *PEM* is an instance method of an object,
a :class:`~simpy.monitoring.Monitor` can be configured to
automatically collect a nummber of instance attributes.
"""
class Spam(object):
def __init__(self, env):
self.env = env
self.a = 0
self.monitor = monitoring.Monitor()
self.monitor.configure(lambda: (self.env.now, self.a))
self.process = env.start(self.pem())
def pem(self):
while True:
self.a += self.env.now
self.monitor.collect()
yield self.env.timeout(1)
spam = Spam(env) # Spam.__init__ starts the PEM
simpy.simulate(env, 5)
assert spam.monitor.data == [(0, 0), (1, 1), (2, 3), (3, 6), (4, 10)]
开发者ID:dinesh121991,项目名称:csim,代码行数:25,代码来源:test_monitoring.py
示例10: test_exception_chaining
def test_exception_chaining(env):
"""Unhandled exceptions pass through the entire event stack. This must be
visible in the stacktrace of the exception."""
def child(env):
yield env.timeout(1)
raise RuntimeError('foo')
def parent(env):
child_proc = env.start(child(env))
yield child_proc
def grandparent(env):
parent_proc = env.start(parent(env))
yield parent_proc
env.start(grandparent(env))
try:
simpy.simulate(env)
pytest.fail('There should have been an exception')
except RuntimeError:
import traceback
trace = traceback.format_exc()
assert 'raise RuntimeError(\'foo\')' in trace
assert 'yield child_proc' in trace
assert 'yield parent_proc' in trace
开发者ID:dinesh121991,项目名称:csim,代码行数:25,代码来源:test_exceptions.py
示例11: test_interrupted_join_and_rejoin
def test_interrupted_join_and_rejoin(env):
"""Tests that interrupts are raised while the victim is waiting for
another process. The victim tries to join again.
"""
def interruptor(env, process):
yield env.timeout(1)
process.interrupt()
def child(env):
yield env.timeout(2)
def parent(env):
child_proc = env.start(child(env))
try:
yield child_proc
pytest.fail('Did not receive an interrupt.')
except Interrupt:
assert env.now == 1
assert child_proc.is_alive
yield child_proc
assert env.now == 2
parent_proc = env.start(parent(env))
env.start(interruptor(env, parent_proc))
simulate(env)
开发者ID:dinesh121991,项目名称:csim,代码行数:27,代码来源:test_wait_for_proc.py
示例12: test_resource_continue_after_interrupt
def test_resource_continue_after_interrupt(env):
"""A process may be interrupted while waiting for a resource but
should be able to continue waiting afterwards."""
def pem(env, res):
with res.request() as req:
yield req
yield env.timeout(1)
def victim(env, res):
try:
evt = res.request()
yield evt
pytest.fail('Should not have gotten the resource.')
except simpy.Interrupt:
yield evt
res.release(evt)
assert env.now == 1
def interruptor(env, proc):
proc.interrupt()
yield env.exit(0)
res = simpy.Resource(env, 1)
env.start(pem(env, res))
proc = env.start(victim(env, res))
env.start(interruptor(env, proc))
simpy.simulate(env)
开发者ID:dinesh121991,项目名称:csim,代码行数:27,代码来源:test_resources.py
示例13: test_interrupt_self
def test_interrupt_self(env):
"""A processs should not be able to interrupt itself."""
def pem(env):
pytest.raises(RuntimeError, env.active_process.interrupt)
yield env.timeout(0)
env.start(pem(env))
simpy.simulate(env)
开发者ID:dinesh121991,项目名称:csim,代码行数:8,代码来源:test_interrupts.py
示例14: test_resource_with_condition
def test_resource_with_condition(env):
def process(env, resource):
with resource.request() as res_event:
result = yield res_event | env.timeout(1)
assert res_event in result
resource = simpy.Resource(env, 1)
env.start(process(env, resource))
simpy.simulate(env)
开发者ID:dinesh121991,项目名称:csim,代码行数:9,代码来源:test_resources.py
示例15: test_exception_handling
def test_exception_handling(env):
"""If failed events are not defused (which is the default) the simulation
crashes."""
event = env.event()
event.fail(RuntimeError())
try:
simpy.simulate(env, until=1)
assert False, 'There must be a RuntimeError!'
except RuntimeError as e:
pass
开发者ID:dinesh121991,项目名称:csim,代码行数:11,代码来源:test_exceptions.py
示例16: test_stop_self
def test_stop_self(env, log):
"""Process stops itself."""
def pem(env, log):
while env.now < 2:
log.append(env.now)
yield env.timeout(1)
env.start(pem(env, log))
simulate(env, 10)
assert log == [0, 1]
开发者ID:dinesh121991,项目名称:csim,代码行数:11,代码来源:test_single_process.py
示例17: test_shared_timeout
def test_shared_timeout(env, log):
def child(env, timeout, id, log):
yield timeout
log.append((id, env.now))
timeout = env.timeout(1)
for i in range(3):
env.start(child(env, timeout, i, log))
simulate(env)
assert log == [(0, 1), (1, 1), (2, 1)]
开发者ID:dinesh121991,项目名称:csim,代码行数:11,代码来源:test_single_process.py
示例18: test_exit_with_process
def test_exit_with_process(env):
def child(env, fork):
yield env.exit(env.start(child(env, False)) if fork else None)
def parent(env):
result = yield env.start(child(env, True))
assert type(result) is Process
env.start(parent(env))
simulate(env)
开发者ID:dinesh121991,项目名称:csim,代码行数:11,代码来源:test_single_process.py
示例19: test_discrete_time_steps
def test_discrete_time_steps(env, log):
"""envple envulation with discrete time steps."""
def pem(env, log):
while True:
log.append(env.now)
yield env.timeout(delay=1)
env.start(pem(env, log))
simulate(env, until=3)
assert log == [0, 1, 2]
开发者ID:dinesh121991,项目名称:csim,代码行数:11,代码来源:test_single_process.py
示例20: test_operator_or
def test_operator_or(env):
def process(env):
timeout = [env.timeout(delay, value=delay) for delay in range(3)]
results = yield timeout[0] | timeout[1] | timeout[2]
assert results == {
timeout[0]: 0,
}
env.start(process(env))
simulate(env)
开发者ID:dinesh121991,项目名称:csim,代码行数:11,代码来源:test_condition.py
注:本文中的simpy.simulate函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论