env =
def test_cond_with_uncaught_error(env):
"""Errors that happen after the condition has been triggered will not be
handled by the condition and cause the simulation to crash."""
def explode(env, delay):
yield env.timeout(delay)
raise ValueError(f'Onoes, failed after {delay}!')
def process(env):
yield env.timeout(1) | env.process(explode(env, 2))
env.process(process(env))
> with pytest.raises(ValueError, match='Onoes, failed after'):
E Failed: DID NOT RAISE
tests/test_condition.py:127: Failed
test_condition.py::test_all_of_empty_list
test_condition.py::test_all_of_empty_list
env =
def test_all_of_empty_list(env):
"""AllOf with an empty list should immediately be triggered."""
evt = env.all_of([])
> assert evt.triggered
E assert None
E + where None = .triggered
tests/test_condition.py:299: AssertionError
test_condition.py::test_any_of_empty_list
test_condition.py::test_any_of_empty_list
env =
def test_any_of_empty_list(env):
"""AnyOf with an empty list should immediately be triggered."""
evt = env.any_of([])
> assert evt.triggered
E assert None
E + where None = .triggered
tests/test_condition.py:305: AssertionError
test_environment.py::test_event_queue_empty
test_environment.py::test_event_queue_empty
env = , log = []
def test_event_queue_empty(env, log):
"""The simulation should stop if there are no more events, that means, no
more active process."""
def pem(env, log):
while env.now < 2:
log.append(env.now)
yield env.timeout(1)
env.process(pem(env, log))
env.run(10)
> assert log == [0, 1]
E assert [] == [0, 1]
E
E Right contains 2 more items, first extra item: 0
E Use -v to get more diff
tests/test_environment.py:21: AssertionError
test_environment.py::test_run_negative_until
test_environment.py::test_run_negative_until
env =
def test_run_negative_until(env):
"""Test passing a negative time to run."""
> with pytest.raises(
ValueError, match='must be greater than the current simulation time'
):
E Failed: DID NOT RAISE
tests/test_environment.py:26: Failed
test_environment.py::test_run_resume
test_environment.py::test_run_resume
env =
def test_run_resume(env):
"""Stopped simulation can be resumed."""
events = [env.timeout(t) for t in (5, 10, 15)]
> assert env.now == 0
E assert None == 0
E + where None = .now
tests/test_environment.py:36: AssertionError
test_environment.py::test_run_until_value
test_environment.py::test_run_until_value
env =
def test_run_until_value(env):
"""Anything that can be converted to a float is a valid until value."""
env.run(until='3.141592')
> assert env.now == 3.141592
E assert None == 3.141592
E + where None = .now
tests/test_environment.py:57: AssertionError
env =
def test_run_with_processed_event(env):
"""An already processed event may also be passed as until value."""
timeout = env.timeout(1, value='spam')
> assert env.run(until=timeout) == 'spam'
E AssertionError: assert None == 'spam'
E + where None = run(until=)
E + where run = .run
tests/test_environment.py:63: AssertionError
env =
def test_run_with_untriggered_event(env):
> excinfo = pytest.raises(RuntimeError, env.run, until=env.event())
E Failed: DID NOT RAISE
tests/test_environment.py:74: Failed
test_event.py::test_names
test_event.py::test_names
env =
def test_names(env):
def pem():
return
yield
> assert re.match(r'', str(env.event()))
E AssertionError: assert None
E + where None = ('', '')
E + where = re.match
E + and '' = str()
E + where = Event()
E + where Event = .event
tests/test_event.py:52: AssertionError
test_event.py::test_value
test_event.py::test_value
env =
def test_value(env):
"""After an event has been triggered, its value becomes accessible."""
event = env.timeout(0, 'I am the value')
env.run()
> assert event.value == 'I am the value'
E AssertionError: assert None == 'I am the value'
E + where None = .value
tests/test_event.py:74: AssertionError
test_event.py::test_unavailable_value
test_event.py::test_unavailable_value
env =
def test_unavailable_value(env):
"""If an event has not yet been triggered, its value is not available and
trying to access it will result in a AttributeError."""
event = env.event()
> with pytest.raises(AttributeError, match='.* is not yet available$'):
E Failed: DID NOT RAISE
tests/test_event.py:82: Failed
test_event.py::test_triggered
test_event.py::test_triggered
env =
def test_triggered(env):
def pem(env, event):
value = yield event
return value
event = env.event()
event.succeed('i was already done')
result = env.run(env.process(pem(env, event)))
> assert result == 'i was already done'
E AssertionError: assert None == 'i was already done'
tests/test_event.py:96: AssertionError
test_event.py::test_condition_callback_removal
test_event.py::test_condition_callback_removal
env =
def test_condition_callback_removal(env):
"""A condition will remove all outstanding callbacks from its events."""
a, b = env.event(), env.event()
a.succeed()
env.run(until=a | b)
# The condition has removed its callback from event b.
> assert not a.callbacks
E assert not [>]
E + where [>] = .callbacks
tests/test_event.py:118: AssertionError
env =
def test_condition_nested_callback_removal(env):
"""A condition will remove all outstanding callbacks from its events (even
if nested)."""
a, b, c = env.event(), env.event(), env.event()
b_and_c = b & c
a_or_b_and_c = a | b_and_c
a.succeed()
env.run(until=a_or_b_and_c)
# Callbacks from nested conditions are also removed.
> assert not a.callbacks
E assert not [>]
E + where [>] = .callbacks
tests/test_event.py:131: AssertionError
test_exceptions.py::test_no_parent_process
test_exceptions.py::test_no_parent_process
env =
def test_no_parent_process(env):
"""Exceptions should be normally raised if there are no processes waiting
for the one that raises something.
"""
def child(env):
raise ValueError('Onoes!')
yield env.timeout(1)
def parent(env):
try:
env.process(child(env))
yield env.timeout(1)
except Exception as err:
pytest.fail(f'There should be no error ({err}).')
env.process(parent(env))
> with pytest.raises(ValueError, match='Onoes!'):
E Failed: DID NOT RAISE
tests/test_exceptions.py:49: Failed
test_exceptions.py::test_exception_chaining
test_exceptions.py::test_exception_chaining
env =
def test_exception_chaining(env):
"""Unhandled exceptions pass through the entire event stack. This must be
visible in the stacktrace of the exception.
"""
def child(env):
yield env.timeout(1)
raise RuntimeError('foo')
def parent(env):
child_proc = env.process(child(env))
yield child_proc
def grandparent(env):
parent_proc = env.process(parent(env))
yield parent_proc
env.process(grandparent(env))
try:
env.run()
> pytest.fail('There should have been an exception')
E Failed: There should have been an exception
tests/test_exceptions.py:93: Failed
test_exceptions.py::test_invalid_event
test_exceptions.py::test_invalid_event
env =
def test_invalid_event(env):
"""Invalid yield values will cause the simulation to fail."""
def root(_):
yield None
env.process(root(env))
> with pytest.raises(RuntimeError, match='Invalid yield value "None"'):
E Failed: DID NOT RAISE
tests/test_exceptions.py:150: Failed
test_exceptions.py::test_exception_handling
test_exceptions.py::test_exception_handling
env =
def test_exception_handling(env):
"""If failed events are not defused (which is the default) the simulation
crashes."""
event = env.event()
event.fail(RuntimeError())
> with pytest.raises(RuntimeError):
E Failed: DID NOT RAISE
tests/test_exceptions.py:160: Failed
env =
def test_callback_exception_handling(env):
"""Callbacks of events may handle exception by setting the ``defused``
attribute of ``event`` to ``True``."""
def callback(event):
event.defused = True
event = env.event()
event.callbacks.append(callback)
event.fail(RuntimeError())
assert not event.defused, 'Event has been defused immediately'
env.run(until=1)
> assert event.defused, 'Event has not been defused'
E AssertionError: Event has not been defused
E assert None
E + where None = .defused
tests/test_exceptions.py:176: AssertionError
env =
def test_process_exception_handling(env):
"""Processes can't ignore failed events and auto-handle exceptions."""
def pem(_, event):
try:
yield event
pytest.fail('Hey, the event should fail!')
except RuntimeError:
pass
event = env.event()
env.process(pem(env, event))
event.fail(RuntimeError())
assert not event.defused, 'Event has been defused immediately'
env.run(until=1)
> assert event.defused, 'Event has not been defused'
E AssertionError: Event has not been defused
E assert None
E + where None = .defused
tests/test_exceptions.py:195: AssertionError
test_interrupts.py::test_concurrent_interrupts
test_interrupts.py::test_concurrent_interrupts
env = , log = []
def test_concurrent_interrupts(env, log):
"""Concurrent interrupts are scheduled in the order in which they
occurred.
"""
def fox(env, log):
while True:
try:
yield env.timeout(10)
except simpy.Interrupt as interrupt:
log.append((env.now, interrupt.cause))
def farmer(env, name, fox):
fox.interrupt(name)
yield env.timeout(1)
fantastic_mr_fox = env.process(fox(env, log))
for name in ('boggis', 'bunce', 'beans'):
env.process(farmer(env, name, fantastic_mr_fox))
env.run(20)
> assert log == [(0, 'boggis'), (0, 'bunce'), (0, 'beans')]
E AssertionError: assert [] == [(0, 'boggis'... (0, 'beans')]
E
E Right contains 3 more items, first extra item: (0, 'boggis')
E Use -v to get more diff
tests/test_interrupts.py:50: AssertionError
env = , log = []
def test_concurrent_interrupts_and_events(env, log):
"""Interrupts interrupt a process while waiting for an event. Even if the
event has happened concurrently with the interrupt."""
def fox(env, coup, log):
while True:
try:
yield coup
log.append(f'coup completed at {env.now}')
except simpy.Interrupt:
log.append(f'coup interrupted at {env.now}')
else:
return
def master_plan(env, fox, coup):
yield env.timeout(1)
# Succeed and interrupt concurrently.
coup.succeed()
fox.interrupt()
coup = env.event()
fantastic_mr_fox = env.process(fox(env, coup, log))
env.process(master_plan(env, fantastic_mr_fox, coup))
env.run(5)
> assert log == ['coup interrupted at 1', 'coup completed at 1']
E AssertionError: assert [] == ['coup interr...mpleted at 1']
E
E Right contains 2 more items, first extra item: 'coup interrupted at 1'
E Use -v to get more diff
tests/test_interrupts.py:78: AssertionError
test_interrupts.py::test_immediate_interrupt
test_interrupts.py::test_immediate_interrupt
env = , log = []
def test_immediate_interrupt(env, log):
"""Processes are immediately interruptable."""
def child(env, log):
try:
yield env.event()
except simpy.Interrupt:
log.append(env.now)
def parent(env, log):
child_proc = env.process(child(env, log))
child_proc.interrupt()
return
yield
env.process(parent(env, log))
env.run()
# Confirm that child has been interrupted immediately at timestep 0.
> assert log == [0]
E assert [] == [0]
E
E Right contains one more item: 0
E Use -v to get more diff
tests/test_interrupts.py:182: AssertionError
test_process.py::test_target
test_process.py::test_target
env =
def test_target(env):
def pem(env, event):
yield event
event = env.timeout(5)
proc = env.process(pem(env, event))
# Wait until "proc" is initialized and yielded the event
> while env.peek() < 5:
E TypeError: '<' not supported between instances of 'NoneType' and 'int'
tests/test_process.py:47: TypeError
test_process.py::test_error_and_interrupted_join
test_process.py::test_error_and_interrupted_join
env =
def test_error_and_interrupted_join(env):
def child_a(env, process):
process.interrupt()
return
yield # Dummy yield
def child_b(env):
raise AttributeError('spam')
yield # Dummy yield
def parent(env):
env.process(child_a(env, env.active_process))
b = env.process(child_b(env))
try:
yield b
# This interrupt unregisters me from b so I won't receive its
# AttributeError
except Interrupt:
pass
yield env.timeout(0)
env.process(parent(env))
> pytest.raises(AttributeError, env.run)
E Failed: DID NOT RAISE
tests/test_process.py:183: Failed
test_resources.py::test_resource
test_resources.py::test_resource
env = , log = []
def test_resource(env, log):
"""A *resource* is something with a limited numer of slots that need
to be requested before and released after the usage (e.g., gas pumps
at a gas station).
"""
def pem(env, name, resource, log):
req = resource.request()
yield req
assert resource.count == 1
yield env.timeout(1)
resource.release(req)
log.append((name, env.now))
resource = simpy.Resource(env, capacity=1)
> assert resource.capacity == 1
E assert None == 1
E + where None = .capacity
tests/test_resources.py:34: AssertionError
test_resources.py::test_resource_context_manager
test_resources.py::test_resource_context_manager
env = , log = []
def test_resource_context_manager(env, log):
"""The event that ``Resource.request()`` returns can be used as
Context Manager."""
def pem(env, name, resource, log):
with resource.request() as request:
yield request
yield env.timeout(1)
log.append((name, env.now))
resource = simpy.Resource(env, capacity=1)
env.process(pem(env, 'a', resource, log))
env.process(pem(env, 'b', resource, log))
env.run()
> assert log == [('a', 1), ('b', 2)]
E AssertionError: assert [] == [('a', 1), ('b', 2)]
E
E Right contains 2 more items, first extra item: ('a', 1)
E Use -v to get more diff
tests/test_resources.py:64: AssertionError
test_resources.py::test_resource_slots
test_resources.py::test_resource_slots
env = , log = []
def test_resource_slots(env, log):
def pem(env, name, resource, log):
with resource.request() as req:
yield req
log.append((name, env.now))
yield env.timeout(1)
resource = simpy.Resource(env, capacity=3)
for i in range(9):
env.process(pem(env, str(i), resource, log))
env.run()
> assert log == [
('0', 0),
('1', 0),
('2', 0),
('3', 1),
('4', 1),
('5', 1),
('6', 2),
('7', 2),
('8', 2),
]
E AssertionError: assert [] == [('0', 0), ('...('5', 1), ...]
E
E Right contains 9 more items, first extra item: ('0', 0)
E Use -v to get more diff
tests/test_resources.py:79: AssertionError
test_resources.py::test_resource_cm_exception
test_resources.py::test_resource_cm_exception
env = , log = []
def test_resource_cm_exception(env, log):
"""Resource with context manager receives an exception."""
def process(env, resource, log, raise_):
with resource.request() as req:
yield req
yield env.timeout(1)
log.append(env.now)
if raise_:
with pytest.raises(ValueError, match='Foo'):
raise ValueError('Foo')
resource = simpy.Resource(env, 1)
env.process(process(env, resource, log, True))
# The second process is used to check if it was able to access the
# resource:
env.process(process(env, resource, log, False))
env.run()
> assert log == [1, 2]
E assert [] == [1, 2]
E
E Right contains 2 more items, first extra item: 1
E Use -v to get more diff
tests/test_resources.py:201: AssertionError
test_resources.py::test_get_users
test_resources.py::test_get_users
env =
def test_get_users(env):
def process(env, resource):
with resource.request() as req:
yield req
yield env.timeout(1)
resource = simpy.Resource(env, 1)
procs = [env.process(process(env, resource)) for _ in range(3)]
env.run(until=1)
> assert [evt.proc for evt in resource.users] == procs[0:1]
E assert [] == []
E
E Right contains one more item:
E Use -v to get more diff
tests/test_resources.py:261: AssertionError
test_resources.py::test_mixed_preemption
test_resources.py::test_mixed_preemption
env = , log = []
def test_mixed_preemption(env, log):
def p(id, env, res, delay, prio, preempt, log):
yield env.timeout(delay)
with res.request(priority=prio, preempt=preempt) as req:
try:
yield req
yield env.timeout(2)
log.append((env.now, id))
except simpy.Interrupt as ir:
assert ir is not None # noqa: PT017
assert isinstance(ir.cause, Preempted) # noqa: PT017
log.append((env.now, id, (ir.cause.by, ir.cause.usage_since)))
res = simpy.PreemptiveResource(env, 1)
# p0: First user:
env.process(p(0, env, res, delay=0, prio=2, preempt=True, log=log))
# p1: Waits (cannot preempt):
env.process(p(1, env, res, delay=0, prio=2, preempt=True, log=log))
# p2: Waits later, but has a higher prio:
env.process(p(2, env, res, delay=1, prio=1, preempt=False, log=log))
# p3: Preempt the above proc:
p3 = env.process(p(3, env, res, delay=3, prio=0, preempt=True, log=log))
# p4: Wait again:
env.process(p(4, env, res, delay=4, prio=3, preempt=True, log=log))
env.run()
> assert log == [
(2, 0), # p0 done
(3, 2, (p3, 2)), # p2 got it next, but got interrupted by p3
(5, 3), # p3 done
(7, 1), # p1 done (finally got the resource)
(9, 4), # p4 done
]
E assert [] == [(2, 0), (3, ...7, 1), (9, 4)]
E
E Right contains 5 more items, first extra item: (2, 0)
E Use -v to get more diff
tests/test_resources.py:345: AssertionError
env = , log = []
def test_container(env, log):
"""A *container* is a resource (of optionally limited capacity) where
you can put in our take-out a discrete or continuous amount of
things (e.g., a box of lump sugar or a can of milk). The *put* and
*get* operations block if the buffer is to full or to empty. If they
return, the process knows that the *put* or *get* operation was
successful.
"""
def putter(env, buf, log):
yield env.timeout(1)
while True:
yield buf.put(2)
log.append(('p', env.now))
yield env.timeout(1)
def getter(env, buf, log):
yield buf.get(1)
log.append(('g', env.now))
yield env.timeout(1)
yield buf.get(1)
log.append(('g', env.now))
buf = simpy.Container(env, init=0, capacity=2)
env.process(putter(env, buf, log))
env.process(getter(env, buf, log))
env.run(until=5)
> assert log == [('p', 1), ('g', 1), ('g', 2), ('p', 2)]
E AssertionError: assert [] == [('p', 1), ('... 2), ('p', 2)]
E
E Right contains 4 more items, first extra item: ('p', 1)
E Use -v to get more diff
tests/test_resources.py:452: AssertionError
test_resources.py::test_container_get_queued
test_resources.py::test_container_get_queued
env =
def test_container_get_queued(env):
def proc(env, wait, container, what):
yield env.timeout(wait)
with getattr(container, what)(1) as req:
yield req
container = simpy.Container(env, 1)
p0 = env.process(proc(env, 0, container, 'get'))
env.process(proc(env, 1, container, 'put'))
env.process(proc(env, 1, container, 'put'))
p3 = env.process(proc(env, 1, container, 'put'))
env.run(until=1)
assert [ev.proc for ev in container.put_queue] == []
> assert [ev.proc for ev in container.get_queue] == [p0]
E assert [] == []
E
E Right contains one more item:
E Use -v to get more diff
tests/test_resources.py:469: AssertionError
env =
store_type =
@pytest.mark.parametrize(
'store_type',
[
simpy.Store,
simpy.FilterStore,
],
)
def test_initial_store_capacity(env, store_type):
store = store_type(env)
> assert store.capacity == float('inf')
E AssertionError: assert None == inf
E + where None = .capacity
E + and inf = float('inf')
tests/test_resources.py:547: AssertionError
test_resources.py::test_store_capacity
test_resources.py::test_store_capacity
env =
def test_store_capacity(env):
with pytest.raises(ValueError, match='"capacity" must be > 0'):
simpy.Store(env, 0)
with pytest.raises(ValueError, match='"capacity" must be > 0'):
simpy.Store(env, -1)
capacity = 2
store = simpy.Store(env, capacity)
env.process(store.put(i) for i in range(capacity + 1))
env.run()
# Ensure store is filled to capacity
> assert len(store.items) == capacity
E assert 0 == 2
E + where 0 = len([])
E + where [] = .items
tests/test_resources.py:562: AssertionError
env =
def test_priority_store_item_priority(env):
pstore = simpy.PriorityStore(env, 3)
log = []
def getter(wait):
yield env.timeout(wait)
item = yield pstore.get()
log.append(item)
# Do not specify priority; the items themselves will be compared to
# determine priority.
env.process(pstore.put(s) for s in 'bcadefg')
env.process(getter(1))
env.process(getter(2))
env.process(getter(3))
env.run()
> assert log == ['a', 'b', 'c']
E AssertionError: assert [] == ['a', 'b', 'c']
E
E Right contains 3 more items, first extra item: 'a'
E Use -v to get more diff
tests/test_resources.py:593: AssertionError
env =
def test_priority_store_stable_order(env):
pstore = simpy.PriorityStore(env, 3)
log = []
def getter(wait):
yield env.timeout(wait)
_, item = yield pstore.get()
log.append(item)
items = [object() for _ in range(3)]
# Unorderable items are inserted with same priority.
env.process(pstore.put(simpy.PriorityItem(0, item)) for item in items)
env.process(getter(1))
env.process(getter(2))
env.process(getter(3))
env.run()
# Since the priorities were the same for all items, ensure that items are
# retrieved in insertion order.
> assert log == items
E assert [] == [
test_resources.py::test_filter_calls_best_case
test_resources.py::test_filter_calls_best_case
env =
def test_filter_calls_best_case(env):
"""The filter function is called every item in the store until a match is
found. In the best case the first item already matches."""
log = []
def log_filter(item):
log.append(f'check {item}')
return True
store = simpy.FilterStore(env)
store.items = [1, 2, 3]
def getter(store):
log.append(f'get {yield store.get(log_filter)}')
log.append(f'get {yield store.get(log_filter)}')
log.append(f'get {yield store.get(log_filter)}')
env.process(getter(store))
env.run()
> assert log == ['check 1', 'get 1', 'check 2', 'get 2', 'check 3', 'get 3']
E AssertionError: assert [] == ['check 1', '...k 3', 'get 3']
E
E Right contains 6 more items, first extra item: 'check 1'
E Use -v to get more diff
tests/test_resources.py:686: AssertionError
test_resources.py::test_filter_calls_worst_case
test_resources.py::test_filter_calls_worst_case
env =
def test_filter_calls_worst_case(env):
"""In the worst case the filter function is being called for items multiple
times."""
log = []
store = simpy.FilterStore(env)
def putter(store):
for i in range(4):
log.append(f'put {i}')
yield store.put(i)
def log_filter(item):
log.append(f'check {item}')
return item >= 3
def getter(store):
log.append(f'get {yield store.get(log_filter)}')
env.process(getter(store))
env.process(putter(store))
env.run()
# The filter function is repeatedly called for every item in the store
# until a match is found.
# fmt: off
> assert log == [
'put 0', 'check 0',
'put 1', 'check 0', 'check 1',
'put 2', 'check 0', 'check 1', 'check 2',
'put 3', 'check 0', 'check 1', 'check 2', 'check 3', 'get 3',
]
E AssertionError: assert [] == ['put 0', 'ch... 'put 2', ...]
E
E Right contains 15 more items, first extra item: 'put 0'
E Use -v to get more diff
tests/test_resources.py:715: AssertionError
test_resources.py::test_immediate_put_request
test_resources.py::test_immediate_put_request
env =
def test_immediate_put_request(env):
"""Put requests that can be fulfilled immediately do not enter the put
queue."""
resource = simpy.Resource(env, capacity=1)
assert len(resource.users) == 0
assert len(resource.queue) == 0
# The resource is empty, the first request will succeed immediately without
# entering the queue.
request = resource.request()
> assert request.triggered
E assert None
E + where None = .triggered
tests/test_resources.py:734: AssertionError
test_resources.py::test_immediate_get_request
test_resources.py::test_immediate_get_request
env =
def test_immediate_get_request(env):
"""Get requests that can be fulfilled immediately do not enter the get
queue."""
container = simpy.Container(env)
# Put something in the container, this request is triggered immediately
# without entering the queue.
request = container.put(1)
> assert request.triggered
E assert None
E + where None = .triggered
tests/test_resources.py:752: AssertionError
log = []
def test_rt_slow_sim_default_behavior(log):
"""By default, SimPy should raise an error if a simulation is too
slow for the selected real-time factor."""
env = RealtimeEnvironment(factor=0.05)
env.process(process(env, log, 0.1, 1))
> err = pytest.raises(RuntimeError, env.run, 3)
E Failed: DID NOT RAISE
tests/test_rt.py:67: Failed
def test_rt_illegal_until():
"""Test illegal value for *until*."""
env = RealtimeEnvironment()
> with pytest.raises(
ValueError,
match=r'until \(-1\) must be greater than the current simulation time',
):
E Failed: DID NOT RAISE
tests/test_rt.py:88: Failed
test_rt.py::test_run_with_untriggered_event
test_rt.py::test_run_with_untriggered_event
env =
def test_run_with_untriggered_event(env):
env = RealtimeEnvironment(factor=0.05)
> excinfo = pytest.raises(RuntimeError, env.run, until=env.event())
E Failed: DID NOT RAISE
tests/test_rt.py:106: Failed
test_timeout.py::test_discrete_time_steps
test_timeout.py::test_discrete_time_steps
env = , log = []
def test_discrete_time_steps(env, log):
"""envple envulation with discrete time steps."""
def pem(env, log):
while True:
log.append(env.now)
yield env.timeout(delay=1)
env.process(pem(env, log))
env.run(until=3)
> assert log == [0, 1, 2]
E assert [] == [0, 1, 2]
E
E Right contains 3 more items, first extra item: 0
E Use -v to get more diff
tests/test_timeout.py:20: AssertionError
test_timeout.py::test_negative_timeout
test_timeout.py::test_negative_timeout
env =
def test_negative_timeout(env):
"""Don't allow negative timeout times."""
def pem(env):
yield env.timeout(-1)
env.process(pem(env))
> with pytest.raises(ValueError, match='Negative delay'):
E Failed: DID NOT RAISE
tests/test_timeout.py:30: Failed
test_timeout.py::test_shared_timeout
test_timeout.py::test_shared_timeout
env = , log = []
def test_shared_timeout(env, log):
def child(env, timeout, id, log):
yield timeout
log.append((id, env.now))
timeout = env.timeout(1)
for i in range(3):
env.process(child(env, timeout, i, log))
env.run()
> assert log == [(0, 1), (1, 1), (2, 1)]
E assert [] == [(0, 1), (1, 1), (2, 1)]
E
E Right contains 3 more items, first extra item: (0, 1)
E Use -v to get more diff
tests/test_timeout.py:61: AssertionError
test_util.py::test_start_delayed_error
test_util.py::test_start_delayed_error
env =
def test_start_delayed_error(env):
"""Check if delayed() raises an error if you pass a negative dt."""
def pem(env):
yield env.timeout(1)
> with pytest.raises(ValueError, match='delay.*must be > 0'):
E Failed: DID NOT RAISE
tests/test_util.py:27: Failed