back to Claude Sonnet 3.5 - Fill-in + Unit Test Feedback summary
Claude Sonnet 3.5 - Fill-in + Unit Test Feedback: simpy
Pytest Summary for test tests
status | count |
---|---|
passed | 73 |
failed | 67 |
total | 140 |
collected | 150 |
deselected | 10 |
Failed pytests:
test_condition.py::test_operator_and_blocked
test_condition.py::test_operator_and_blocked
env =def test_operator_and_blocked(env): def process(env): timeout = env.timeout(1) event = env.event() yield env.timeout(1) condition = timeout & event assert not condition.triggered env.process(process(env)) > env.run() tests/test_condition.py:29: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ env = def process(env): timeout = env.timeout(1) event = env.event() yield env.timeout(1) condition = timeout & event > assert not condition.triggered E assert not True E + where True = , )) object at 0x7fc610ecc940>.triggered tests/test_condition.py:26: AssertionError
test_condition.py::test_nested_cond_with_error
test_condition.py::test_nested_cond_with_error
env =def test_nested_cond_with_error(env): def explode(env): yield env.timeout(1) raise ValueError('Onoes!') def process(env): with pytest.raises(ValueError, match='Onoes!'): yield env.process(explode(env)) & env.timeout(1) env.process(process(env)) > env.run() tests/test_condition.py:86: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ env = def explode(env): yield env.timeout(1) > raise ValueError('Onoes!') E ValueError: Onoes! tests/test_condition.py:79: ValueError
test_condition.py::test_cond_with_error
test_condition.py::test_cond_with_error
env =def test_cond_with_error(env): def explode(env, delay): yield env.timeout(delay) raise ValueError(f'Onoes, failed after {delay}!') def process(env): with pytest.raises(ValueError, match='Onoes, failed after 0!'): yield env.process(explode(env, 0)) | env.timeout(1) env.process(process(env)) > env.run() tests/test_condition.py:99: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ env = , delay = 0 def explode(env, delay): yield env.timeout(delay) > raise ValueError(f'Onoes, failed after {delay}!') E ValueError: Onoes, failed after 0! tests/test_condition.py:92: ValueError
test_condition.py::test_cond_with_nested_error
test_condition.py::test_cond_with_nested_error
env =def test_cond_with_nested_error(env): def explode(env, delay): yield env.timeout(delay) raise ValueError(f'Onoes, failed after {delay}!') def process(env): with pytest.raises(ValueError, match='Onoes, failed after 0!'): yield env.process(explode(env, 0)) & env.timeout(1) | env.timeout(1) env.process(process(env)) > env.run() tests/test_condition.py:112: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ env = , delay = 0 def explode(env, delay): yield env.timeout(delay) > raise ValueError(f'Onoes, failed after {delay}!') E ValueError: Onoes, failed after 0! tests/test_condition.py:105: ValueError
test_environment.py::test_event_queue_empty
test_environment.py::test_event_queue_empty
env =, log = [0, 1, 1] def test_event_queue_empty(env, log): """The simulation should stop if there are no more events, that means, no more active process.""" def pem(env, log): while env.now < 2: log.append(env.now) yield env.timeout(1) env.process(pem(env, log)) env.run(10) > assert log == [0, 1] E assert [0, 1, 1] == [0, 1] E E Left contains one more item: 1 E Use -v to get more diff tests/test_environment.py:21: AssertionError
test_environment.py::test_run_resume
test_environment.py::test_run_resume
env =def test_run_resume(env): """Stopped simulation can be resumed.""" events = [env.timeout(t) for t in (5, 10, 15)] assert env.now == 0 assert not any(event.processed for event in events) env.run(until=10) assert env.now == 10 > assert all(event.processed for event in events[:1]) E assert False E + where False = all( . at 0x7fc61004d930>) tests/test_environment.py:41: AssertionError
test_environment.py::test_run_with_processed_event
test_environment.py::test_run_with_processed_event
env =def test_run_with_processed_event(env): """An already processed event may also be passed as until value.""" timeout = env.timeout(1, value='spam') assert env.run(until=timeout) == 'spam' > assert env.now == 1 E AssertionError: assert 'spam' == 1 E + where 'spam' = .now tests/test_environment.py:64: AssertionError
test_event.py::test_fail
test_event.py::test_fail
env =event = def child(env, event): with pytest.raises(ValueError, match='ohai'): > yield event E ValueError: ohai tests/test_event.py:34: ValueError During handling of the above exception, another exception occurred: env = def test_fail(env): """Test for the Environment.event() helper function.""" def child(env, event): with pytest.raises(ValueError, match='ohai'): yield event assert env.now == 5 def parent(env): event = env.event() env.process(child(env, event)) yield env.timeout(5) event.fail(ValueError('ohai')) env.process(parent(env)) > env.run() tests/test_event.py:44: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:173: in _process_event callback(event) src/simpy/events.py:360: in _resume self.fail(e) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = , exception = ValueError('ohai') def fail(self, exception: Any) ->Event: """Set *exception* as the events value, mark it as failed and schedule it for processing by the environment. Returns the event instance. If *exception* is not an :exc:`Exception` instance, it will be wrapped in a :exc:`RuntimeError`. Raises :exc:`RuntimeError` if this event has already been triggered. """ if self._value is not PENDING: > raise RuntimeError('Event has already been triggered') E RuntimeError: Event has already been triggered src/simpy/events.py:179: RuntimeError
test_event.py::test_triggered
test_event.py::test_triggered
env =def test_triggered(env): def pem(env, event): value = yield event return value event = env.event() event.succeed('i was already done') result = env.run(env.process(pem(env, event))) > assert result == 'i was already done' E AssertionError: assert None == 'i was already done' tests/test_event.py:96: AssertionError
test_event.py::test_condition_callback_removal
test_event.py::test_condition_callback_removal
env =def test_condition_callback_removal(env): """A condition will remove all outstanding callbacks from its events.""" a, b = env.event(), env.event() a.succeed() env.run(until=a | b) # The condition has removed its callback from event b. > assert not a.callbacks E assert not [ , )) object at 0x7fc610e684f0>>] E + where [ , )) object at 0x7fc610e684f0>>] = .callbacks tests/test_event.py:118: AssertionError
test_event.py::test_condition_nested_callback_removal
test_event.py::test_condition_nested_callback_removal
env =def test_condition_nested_callback_removal(env): """A condition will remove all outstanding callbacks from its events (even if nested).""" a, b, c = env.event(), env.event(), env.event() b_and_c = b & c a_or_b_and_c = a | b_and_c a.succeed() env.run(until=a_or_b_and_c) # Callbacks from nested conditions are also removed. > assert not a.callbacks E assert not [ , , )) object at 0x7fc610c1b370>)) object at 0x7fc610c18520>>] E + where [ , , )) object at 0x7fc610c1b370>)) object at 0x7fc610c18520>>] = .callbacks tests/test_event.py:131: AssertionError
test_exceptions.py::test_error_forwarding
test_exceptions.py::test_error_forwarding
env =def parent(env): with pytest.raises(ValueError, match='Onoes!'): > yield env.process(child(env)) tests/test_exceptions.py:25: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ env = def child(env): > raise ValueError('Onoes!') E ValueError: Onoes! tests/test_exceptions.py:20: ValueError During handling of the above exception, another exception occurred: env = def test_error_forwarding(env): """Exceptions are forwarded from child to parent processes if there are any. """ def child(env): raise ValueError('Onoes!') yield env.timeout(1) def parent(env): with pytest.raises(ValueError, match='Onoes!'): yield env.process(child(env)) env.process(parent(env)) > env.run() tests/test_exceptions.py:28: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:173: in _process_event callback(event) src/simpy/events.py:360: in _resume self.fail(e) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = exception = ValueError('Onoes!') def fail(self, exception: Any) ->Event: """Set *exception* as the events value, mark it as failed and schedule it for processing by the environment. Returns the event instance. If *exception* is not an :exc:`Exception` instance, it will be wrapped in a :exc:`RuntimeError`. Raises :exc:`RuntimeError` if this event has already been triggered. """ if self._value is not PENDING: > raise RuntimeError('Event has already been triggered') E RuntimeError: Event has already been triggered src/simpy/events.py:179: RuntimeError
test_exceptions.py::test_crashing_child_traceback
test_exceptions.py::test_crashing_child_traceback
env =def root(env): try: > yield env.process(panic(env)) tests/test_exceptions.py:60: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ env = def panic(env): yield env.timeout(1) > raise RuntimeError('Oh noes, roflcopter incoming... BOOM!') E RuntimeError: Oh noes, roflcopter incoming... BOOM! tests/test_exceptions.py:56: RuntimeError During handling of the above exception, another exception occurred: env = def test_crashing_child_traceback(env): def panic(env): yield env.timeout(1) raise RuntimeError('Oh noes, roflcopter incoming... BOOM!') def root(env): try: yield env.process(panic(env)) pytest.fail("Hey, where's the roflcopter?") except RuntimeError: # The current frame must be visible in the stacktrace. stacktrace = traceback.format_exc() assert 'yield env.process(panic(env))' in stacktrace assert "raise RuntimeError('Oh noes," in stacktrace env.process(root(env)) > env.run() tests/test_exceptions.py:69: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:173: in _process_event callback(event) src/simpy/events.py:360: in _resume self.fail(e) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = exception = RuntimeError('Oh noes, roflcopter incoming... BOOM!') def fail(self, exception: Any) ->Event: """Set *exception* as the events value, mark it as failed and schedule it for processing by the environment. Returns the event instance. If *exception* is not an :exc:`Exception` instance, it will be wrapped in a :exc:`RuntimeError`. Raises :exc:`RuntimeError` if this event has already been triggered. """ if self._value is not PENDING: > raise RuntimeError('Event has already been triggered') E RuntimeError: Event has already been triggered src/simpy/events.py:179: RuntimeError
test_exceptions.py::test_exception_chaining
test_exceptions.py::test_exception_chaining
env =def parent(env): child_proc = env.process(child(env)) > yield child_proc tests/test_exceptions.py:84: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ env = def child(env): yield env.timeout(1) > raise RuntimeError('foo') E RuntimeError: foo tests/test_exceptions.py:80: RuntimeError During handling of the above exception, another exception occurred: env = def test_exception_chaining(env): """Unhandled exceptions pass through the entire event stack. This must be visible in the stacktrace of the exception. """ def child(env): yield env.timeout(1) raise RuntimeError('foo') def parent(env): child_proc = env.process(child(env)) yield child_proc def grandparent(env): parent_proc = env.process(parent(env)) yield parent_proc env.process(grandparent(env)) try: > env.run() tests/test_exceptions.py:92: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:173: in _process_event callback(event) src/simpy/events.py:360: in _resume self.fail(e) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = exception = RuntimeError('foo') def fail(self, exception: Any) ->Event: """Set *exception* as the events value, mark it as failed and schedule it for processing by the environment. Returns the event instance. If *exception* is not an :exc:`Exception` instance, it will be wrapped in a :exc:`RuntimeError`. Raises :exc:`RuntimeError` if this event has already been triggered. """ if self._value is not PENDING: > raise RuntimeError('Event has already been triggered') E RuntimeError: Event has already been triggered src/simpy/events.py:179: RuntimeError During handling of the above exception, another exception occurred: env = def test_exception_chaining(env): """Unhandled exceptions pass through the entire event stack. This must be visible in the stacktrace of the exception. """ def child(env): yield env.timeout(1) raise RuntimeError('foo') def parent(env): child_proc = env.process(child(env)) yield child_proc def grandparent(env): parent_proc = env.process(parent(env)) yield parent_proc env.process(grandparent(env)) try: env.run() pytest.fail('There should have been an exception') except RuntimeError: trace = traceback.format_exc() expected = ( re.escape( textwrap.dedent( """\ Traceback (most recent call last): File "{path}tests/test_exceptions.py", line {line}, in child raise RuntimeError('foo') RuntimeError: foo The above exception was the direct cause of the following exception: Traceback (most recent call last): File "{path}tests/test_exceptions.py", line {line}, in parent yield child_proc RuntimeError: foo The above exception was the direct cause of the following exception: Traceback (most recent call last): File "{path}tests/test_exceptions.py", line {line}, in grandparent yield parent_proc RuntimeError: foo The above exception was the direct cause of the following exception: Traceback (most recent call last): File "{path}tests/test_exceptions.py", line {line}, in test_exception_chaining env.run() File "{path}simpy/core.py", line {line}, in run self.step() File "{path}simpy/core.py", line {line}, in step raise exc RuntimeError: foo """ ) ) .replace(r'\{line\}', r'\d+') .replace(r'\{path\}', r'.*') ) if platform.system() == 'Windows': expected = expected.replace(r'\/', r'\\') > assert re.match(expected, trace), 'Traceback mismatch' E AssertionError: Traceback mismatch E assert None E + where None = ('Traceback\\ \\(most\\ recent\\ call\\ last\\):\\\n\\ \\ File\\ ".*tests/test_exceptions\\.py",\\ line\\ \\d+,\\ in\\ ...\\\n\\ \\ File\\ ".*simpy/core\\.py",\\ line\\ \\d+,\\ in\\ step\\\n\\ \\ \\ \\ raise\\ exc\\\nRuntimeError:\\ foo\\\n', 'Traceback (most recent call last):\n File "/testbed/src/simpy/events.py", line 348, in _resume\n next_event = sel...n fail\n raise RuntimeError(\'Event has already been triggered\')\nRuntimeError: Event has already been triggered\n') E + where = re.match tests/test_exceptions.py:140: AssertionError
test_exceptions.py::test_invalid_event
test_exceptions.py::test_invalid_event
env =def test_invalid_event(env): """Invalid yield values will cause the simulation to fail.""" def root(_): yield None env.process(root(env)) > with pytest.raises(RuntimeError, match='Invalid yield value "None"'): E Failed: DID NOT RAISE tests/test_exceptions.py:150: Failed
test_exceptions.py::test_process_exception_handling
test_exceptions.py::test_process_exception_handling
_ =event = def pem(_, event): try: > yield event E RuntimeError tests/test_exceptions.py:184: RuntimeError During handling of the above exception, another exception occurred: env = def test_process_exception_handling(env): """Processes can't ignore failed events and auto-handle exceptions.""" def pem(_, event): try: yield event pytest.fail('Hey, the event should fail!') except RuntimeError: pass event = env.event() env.process(pem(env, event)) event.fail(RuntimeError()) assert not event.defused, 'Event has been defused immediately' > env.run(until=1) tests/test_exceptions.py:194: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:293: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:173: in _process_event callback(event) src/simpy/events.py:360: in _resume self.fail(e) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = , exception = RuntimeError() def fail(self, exception: Any) ->Event: """Set *exception* as the events value, mark it as failed and schedule it for processing by the environment. Returns the event instance. If *exception* is not an :exc:`Exception` instance, it will be wrapped in a :exc:`RuntimeError`. Raises :exc:`RuntimeError` if this event has already been triggered. """ if self._value is not PENDING: > raise RuntimeError('Event has already been triggered') E RuntimeError: Event has already been triggered src/simpy/events.py:179: RuntimeError
test_exceptions.py::test_process_exception_chaining
test_exceptions.py::test_process_exception_chaining
event =def process_a(event): try: > yield event E RuntimeError: foo tests/test_exceptions.py:208: RuntimeError During handling of the above exception, another exception occurred: env = def test_process_exception_chaining(env): """Because multiple processes can be waiting for an event, exceptions of failed events are copied before being thrown into a process. Otherwise, the traceback of the exception gets modified by a process. See https://bitbucket.org/simpy/simpy/issue/60 for more details.""" import traceback def process_a(event): try: yield event except RuntimeError: stacktrace = traceback.format_exc() assert 'process_b' not in stacktrace def process_b(event): try: yield event except RuntimeError: stacktrace = traceback.format_exc() assert 'process_a' not in stacktrace event = env.event() event.fail(RuntimeError('foo')) env.process(process_a(event)) env.process(process_b(event)) > env.run() tests/test_exceptions.py:226: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:173: in _process_event callback(event) src/simpy/events.py:360: in _resume self.fail(e) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = exception = RuntimeError('foo') def fail(self, exception: Any) ->Event: """Set *exception* as the events value, mark it as failed and schedule it for processing by the environment. Returns the event instance. If *exception* is not an :exc:`Exception` instance, it will be wrapped in a :exc:`RuntimeError`. Raises :exc:`RuntimeError` if this event has already been triggered. """ if self._value is not PENDING: > raise RuntimeError('Event has already been triggered') E RuntimeError: Event has already been triggered src/simpy/events.py:179: RuntimeError
test_exceptions.py::test_sys_excepthook
test_exceptions.py::test_sys_excepthook
event =def process_a(event): > yield event E RuntimeError: foo tests/test_exceptions.py:233: RuntimeError During handling of the above exception, another exception occurred: env = def test_sys_excepthook(env): """Check that the default exception hook reports exception chains.""" def process_a(event): yield event def process_b(event): yield event event = env.event() event.fail(RuntimeError('foo')) env.process(process_b(env.process(process_a(event)))) try: > env.run() tests/test_exceptions.py:244: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:173: in _process_event callback(event) src/simpy/events.py:360: in _resume self.fail(e) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = exception = RuntimeError('foo') def fail(self, exception: Any) ->Event: """Set *exception* as the events value, mark it as failed and schedule it for processing by the environment. Returns the event instance. If *exception* is not an :exc:`Exception` instance, it will be wrapped in a :exc:`RuntimeError`. Raises :exc:`RuntimeError` if this event has already been triggered. """ if self._value is not PENDING: > raise RuntimeError('Event has already been triggered') E RuntimeError: Event has already been triggered src/simpy/events.py:179: RuntimeError During handling of the above exception, another exception occurred: env = def test_sys_excepthook(env): """Check that the default exception hook reports exception chains.""" def process_a(event): yield event def process_b(event): yield event event = env.event() event.fail(RuntimeError('foo')) env.process(process_b(env.process(process_a(event)))) try: env.run() except BaseException: # Let the default exception hook print the traceback to the redirected # standard error channel. import sys from io import StringIO stderr, sys.stderr = sys.stderr, StringIO() typ, e, tb = sys.exc_info() assert typ is not None assert e is not None sys.excepthook(typ, e, tb) traceback = sys.stderr.getvalue() sys.stderr = stderr # Check if frames of process_a and process_b are visible in the # traceback. assert 'process_a' in traceback > assert 'process_b' in traceback E assert 'process_b' in 'Traceback (most recent call last):\n File "/testbed/src/simpy/events.py", line 348, in _resume\n next_event = sel...n fail\n raise RuntimeError(\'Event has already been triggered\')\nRuntimeError: Event has already been triggered\n' tests/test_exceptions.py:265: AssertionError
test_interrupts.py::test_init_interrupt
test_interrupts.py::test_init_interrupt
env =def test_init_interrupt(env): """An interrupt should always be executed after the Initialize event at the same time.""" def child(env): try: yield env.timeout(10) pytest.fail('Should have been interrupted.') except simpy.Interrupt: assert env.now == 0 def root(env): child_proc = env.process(child(env)) child_proc.interrupt() yield env.timeout(1) env.process(root(env)) > env.run() tests/test_interrupts.py:99: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:348: in _resume next_event = self._generator.throw(event._value) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ env = > def child(env): E simpy.exceptions.Interrupt: Interrupt(None) tests/test_interrupts.py:85: Interrupt
test_interrupts.py::test_interrupt_terminated_process
test_interrupts.py::test_interrupt_terminated_process
env =def test_interrupt_terminated_process(env): """Dead processes cannot be interrupted.""" def child(env): yield env.timeout(1) def parent(env): child_proc = env.process(child(env)) # Wait long enough so that child_proc terminates. yield env.timeout(2) ei = pytest.raises(RuntimeError, child_proc.interrupt) assert re.match( r' has terminated ' r'and cannot be interrupted.', ei.value.args[0], ) yield env.timeout(1) env.process(parent(env)) > env.run() tests/test_interrupts.py:123: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = event = def _process_event(self, event): event._processed = True if hasattr(event, '_callback'): try: event._value = event._callback(event) except BaseException as e: event._ok = False event._value = e if not event._defused: self._handle_exception(e) if isinstance(event, Process): self._active_proc = event # Trigger all callbacks if event.callbacks: for callback in list(event.callbacks): try: callback(event) except BaseException as e: event._ok = False event._value = e if not event._defused: raise # Re-raise the exception to propagate it event.callbacks = None if isinstance(event, Process): self._active_proc = None # If the event is not ok (i.e., an exception occurred), raise it if not event._ok and not event._defused: > raise event._value E RuntimeError: Invalid exception: DID NOT RAISE src/simpy/core.py:186: RuntimeError
test_interrupts.py::test_multiple_interrupts
test_interrupts.py::test_multiple_interrupts
env =def test_multiple_interrupts(env): """Interrupts on dead processes are discarded. If there are multiple concurrent interrupts on a process and the latter dies after handling the first interrupt, the remaining ones are silently ignored. """ def child(env): try: yield env.timeout(1) except simpy.Interrupt as i: return i.cause def parent(env): c = env.process(child(env)) yield env.timeout(0) c.interrupt(1) c.interrupt(2) result = yield c assert result == 1 env.process(parent(env)) > env.run() tests/test_interrupts.py:149: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ env = def parent(env): c = env.process(child(env)) yield env.timeout(0) c.interrupt(1) c.interrupt(2) result = yield c > assert result == 1 E assert None == 1 tests/test_interrupts.py:146: AssertionError
test_interrupts.py::test_interrupt_self
test_interrupts.py::test_interrupt_self
env =def test_interrupt_self(env): """A process should not be able to interrupt itself.""" def pem(env): pytest.raises(RuntimeError, env.active_process.interrupt) yield env.timeout(0) env.process(pem(env)) > env.run() tests/test_interrupts.py:160: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ env = def pem(env): > pytest.raises(RuntimeError, env.active_process.interrupt) E AttributeError: 'NoneType' object has no attribute 'interrupt' tests/test_interrupts.py:156: AttributeError
test_interrupts.py::test_immediate_interrupt
test_interrupts.py::test_immediate_interrupt
env =, log = [] def test_immediate_interrupt(env, log): """Processes are immediately interruptable.""" def child(env, log): try: yield env.event() except simpy.Interrupt: log.append(env.now) def parent(env, log): child_proc = env.process(child(env, log)) child_proc.interrupt() return yield env.process(parent(env, log)) > env.run() tests/test_interrupts.py:179: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:348: in _resume next_event = self._generator.throw(event._value) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ env = , log = [] > def child(env, log): E simpy.exceptions.Interrupt: Interrupt(None) tests/test_interrupts.py:166: Interrupt
test_process.py::test_get_state
test_process.py::test_get_state
env =def test_get_state(env): """A process is alive until it's generator has not terminated.""" def pem_a(env): yield env.timeout(3) def pem_b(env, pem_a): yield env.timeout(1) assert pem_a.is_alive yield env.timeout(3) assert not pem_a.is_alive proc_a = env.process(pem_a(env)) env.process(pem_b(env, proc_a)) > env.run() tests/test_process.py:36: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ env = pem_a = def pem_b(env, pem_a): yield env.timeout(1) assert pem_a.is_alive yield env.timeout(3) > assert not pem_a.is_alive E assert not True E + where True = .is_alive tests/test_process.py:32: AssertionError
test_process.py::test_return_value
test_process.py::test_return_value
env =def test_return_value(env): """Processes can set a return value.""" def child(env): yield env.timeout(1) return env.now def parent(env): result1 = yield env.process(child(env)) result2 = yield env.process(child(env)) assert [result1, result2] == [1, 2] env.process(parent(env)) > env.run() tests/test_process.py:83: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ env = def parent(env): result1 = yield env.process(child(env)) result2 = yield env.process(child(env)) > assert [result1, result2] == [1, 2] E assert [1, 1] == [1, 2] E E At index 1 diff: 1 != 2 E Use -v to get more diff tests/test_process.py:80: AssertionError
test_process.py::test_interrupted_join
test_process.py::test_interrupted_join
env =def parent(env): child_proc = env.process(child(env)) try: > yield child_proc E simpy.exceptions.Interrupt: Interrupt(None) tests/test_process.py:114: Interrupt During handling of the above exception, another exception occurred: env = def test_interrupted_join(env): """Interrupts remove a process from the callbacks of its target.""" def interruptor(env, process): yield env.timeout(1) process.interrupt() def child(env): yield env.timeout(2) def parent(env): child_proc = env.process(child(env)) try: yield child_proc pytest.fail('Did not receive an interrupt.') except Interrupt: assert env.now == 1 assert child_proc.is_alive # We should not get resumed when child terminates. yield env.timeout(5) assert env.now == 6 parent_proc = env.process(parent(env)) env.process(interruptor(env, parent_proc)) > env.run() tests/test_process.py:126: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ env = def parent(env): child_proc = env.process(child(env)) try: yield child_proc pytest.fail('Did not receive an interrupt.') except Interrupt: assert env.now == 1 assert child_proc.is_alive # We should not get resumed when child terminates. yield env.timeout(5) > assert env.now == 6 E assert 2 == 6 E + where 2 = .now tests/test_process.py:122: AssertionError
test_process.py::test_error_and_interrupted_join
test_process.py::test_error_and_interrupted_join
env =def parent(env): env.process(child_a(env, env.active_process)) b = env.process(child_b(env)) try: > yield b tests/test_process.py:175: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ env = def child_b(env): > raise AttributeError('spam') E AttributeError: spam tests/test_process.py:167: AttributeError During handling of the above exception, another exception occurred: env = def test_error_and_interrupted_join(env): def child_a(env, process): if process is not None: process.interrupt() return yield # Dummy yield def child_b(env): raise AttributeError('spam') yield # Dummy yield def parent(env): env.process(child_a(env, env.active_process)) b = env.process(child_b(env)) try: yield b # This interrupt unregisters me from b so I won't receive its # AttributeError except Interrupt: pass yield env.timeout(0) env.process(parent(env)) > pytest.raises(AttributeError, env.run) tests/test_process.py:184: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:173: in _process_event callback(event) src/simpy/events.py:360: in _resume self.fail(e) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = exception = AttributeError('spam') def fail(self, exception: Any) ->Event: """Set *exception* as the events value, mark it as failed and schedule it for processing by the environment. Returns the event instance. If *exception* is not an :exc:`Exception` instance, it will be wrapped in a :exc:`RuntimeError`. Raises :exc:`RuntimeError` if this event has already been triggered. """ if self._value is not PENDING: > raise RuntimeError('Event has already been triggered') E RuntimeError: Event has already been triggered src/simpy/events.py:179: RuntimeError
test_resources.py::test_resource
test_resources.py::test_resource
env =, log = [('a', 0)] def test_resource(env, log): """A *resource* is something with a limited numer of slots that need to be requested before and released after the usage (e.g., gas pumps at a gas station). """ def pem(env, name, resource, log): req = resource.request() yield req assert resource.count == 1 yield env.timeout(1) resource.release(req) log.append((name, env.now)) resource = simpy.Resource(env, capacity=1) assert resource.capacity == 1 assert resource.count == 0 env.process(pem(env, 'a', resource, log)) env.process(pem(env, 'b', resource, log)) env.run() > assert log == [('a', 1), ('b', 2)] E AssertionError: assert [('a', 0)] == [('a', 1), ('b', 2)] E E At index 0 diff: ('a', 0) != ('a', 1) E Right contains one more item: ('b', 2) E Use -v to get more diff tests/test_resources.py:40: AssertionError
test_resources.py::test_resource_context_manager
test_resources.py::test_resource_context_manager
env =, log = [('a', 0)] def test_resource_context_manager(env, log): """The event that ``Resource.request()`` returns can be used as Context Manager.""" def pem(env, name, resource, log): with resource.request() as request: yield request yield env.timeout(1) log.append((name, env.now)) resource = simpy.Resource(env, capacity=1) env.process(pem(env, 'a', resource, log)) env.process(pem(env, 'b', resource, log)) env.run() > assert log == [('a', 1), ('b', 2)] E AssertionError: assert [('a', 0)] == [('a', 1), ('b', 2)] E E At index 0 diff: ('a', 0) != ('a', 1) E Right contains one more item: ('b', 2) E Use -v to get more diff tests/test_resources.py:64: AssertionError
test_resources.py::test_resource_slots
test_resources.py::test_resource_slots
env =log = [('0', 0), ('1', 0)] def test_resource_slots(env, log): def pem(env, name, resource, log): with resource.request() as req: yield req log.append((name, env.now)) yield env.timeout(1) resource = simpy.Resource(env, capacity=3) for i in range(9): env.process(pem(env, str(i), resource, log)) env.run() > assert log == [ ('0', 0), ('1', 0), ('2', 0), ('3', 1), ('4', 1), ('5', 1), ('6', 2), ('7', 2), ('8', 2), ] E AssertionError: assert [('0', 0), ('1', 0)] == [('0', 0), ('...('5', 1), ...] E E Right contains 7 more items, first extra item: ('2', 0) E Use -v to get more diff tests/test_resources.py:79: AssertionError
test_resources.py::test_resource_immediate_requests
test_resources.py::test_resource_immediate_requests
env =def test_resource_immediate_requests(env): """A process must not acquire a resource if it releases it and immediately requests it again while there are already other requesting processes.""" def child(env, res): result = [] for _ in range(3): with res.request() as req: yield req result.append(env.now) yield env.timeout(1) return result def parent(env): res = simpy.Resource(env, 1) child_a = env.process(child(env, res)) child_b = env.process(child(env, res)) a_acquire_times = yield child_a b_acquire_times = yield child_b assert a_acquire_times == [0, 2, 4] assert b_acquire_times == [1, 3, 5] env.process(parent(env)) > env.run() tests/test_resources.py:179: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ env = def parent(env): res = simpy.Resource(env, 1) child_a = env.process(child(env, res)) child_b = env.process(child(env, res)) a_acquire_times = yield child_a b_acquire_times = yield child_b > assert a_acquire_times == [0, 2, 4] E assert [0, 1, 2] == [0, 2, 4] E E At index 1 diff: 1 != 2 E Use -v to get more diff tests/test_resources.py:175: AssertionError
test_resources.py::test_resource_cm_exception
test_resources.py::test_resource_cm_exception
env =, log = [0] def test_resource_cm_exception(env, log): """Resource with context manager receives an exception.""" def process(env, resource, log, raise_): with resource.request() as req: yield req yield env.timeout(1) log.append(env.now) if raise_: with pytest.raises(ValueError, match='Foo'): raise ValueError('Foo') resource = simpy.Resource(env, 1) env.process(process(env, resource, log, True)) # The second process is used to check if it was able to access the # resource: env.process(process(env, resource, log, False)) env.run() > assert log == [1, 2] E assert [0] == [1, 2] E E At index 0 diff: 0 != 1 E Right contains one more item: 2 E Use -v to get more diff tests/test_resources.py:201: AssertionError
test_resources.py::test_resource_with_priority_queue
test_resources.py::test_resource_with_priority_queue
env =def test_resource_with_priority_queue(env): def process(env, delay, resource, priority, res_time): yield env.timeout(delay) req = resource.request(priority=priority) yield req assert env.now == res_time yield env.timeout(5) resource.release(req) resource = simpy.PriorityResource(env, capacity=1) env.process(process(env, 0, resource, 2, 0)) env.process(process(env, 2, resource, 3, 10)) env.process(process(env, 2, resource, 3, 15)) # Test equal priority env.process(process(env, 4, resource, 1, 5)) > env.run() tests/test_resources.py:229: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ env = , delay = 2 resource = priority = 3, res_time = 10 def process(env, delay, resource, priority, res_time): yield env.timeout(delay) req = resource.request(priority=priority) yield req > assert env.now == res_time E assert 2 == 10 E + where 2 = .now tests/test_resources.py:220: AssertionError
test_resources.py::test_sorted_queue_maxlen
test_resources.py::test_sorted_queue_maxlen
env =def test_sorted_queue_maxlen(env): """Requests must fail if more than *maxlen* requests happen concurrently.""" resource = simpy.PriorityResource(env, capacity=1) resource.put_queue.maxlen = 1 # pyright: ignore def process(env, resource): # The first request immediately triggered and does not enter the queue. resource.request(priority=1) # The second request is enqueued. resource.request(priority=1) with pytest.raises(RuntimeError, match='Cannot append event. Queue is full.'): # The third request will now fail. resource.request(priority=1) yield env.timeout(0) env.process(process(env, resource)) > env.run() tests/test_resources.py:249: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) tests/test_resources.py:242: in process resource.request(priority=1) src/simpy/resources/resource.py:121: in __init__ super().__init__(resource) src/simpy/resources/base.py:37: in __init__ resource.put_queue.append(self) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = [ ] item = def append(self, item: Any) ->None: """Sort *item* into the queue. Raise a :exc:`RuntimeError` if the queue is full. """ if self.maxlen is not None and len(self) >= self.maxlen: > raise RuntimeError('Queue is full') E RuntimeError: Queue is full src/simpy/resources/resource.py:157: RuntimeError
test_resources.py::test_get_users
test_resources.py::test_get_users
env =def test_get_users(env): def process(env, resource): with resource.request() as req: yield req yield env.timeout(1) resource = simpy.Resource(env, 1) procs = [env.process(process(env, resource)) for _ in range(3)] env.run(until=1) > assert [evt.proc for evt in resource.users] == procs[0:1] E assert [None] == [ ] E E At index 0 diff: None != E Use -v to get more diff tests/test_resources.py:261: AssertionError
test_resources.py::test_preemptive_resource
test_resources.py::test_preemptive_resource
env =def test_preemptive_resource(env): """Processes with a higher priority may preempt requests of lower priority processes. Note that higher priorities are indicated by a lower number value.""" def proc_a(_, resource, prio): try: with resource.request(priority=prio) as req: yield req pytest.fail('Should have received an interrupt/preemption.') except simpy.Interrupt: pass def proc_b(_, resource, prio): with resource.request(priority=prio) as req: yield req resource = simpy.PreemptiveResource(env, 1) env.process(proc_a(env, resource, 1)) env.process(proc_b(env, resource, 0)) > env.run() tests/test_resources.py:293: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = event = def _process_event(self, event): event._processed = True if hasattr(event, '_callback'): try: event._value = event._callback(event) except BaseException as e: event._ok = False event._value = e if not event._defused: self._handle_exception(e) if isinstance(event, Process): self._active_proc = event # Trigger all callbacks if event.callbacks: for callback in list(event.callbacks): try: callback(event) except BaseException as e: event._ok = False event._value = e if not event._defused: raise # Re-raise the exception to propagate it event.callbacks = None if isinstance(event, Process): self._active_proc = None # If the event is not ok (i.e., an exception occurred), raise it if not event._ok and not event._defused: > raise event._value E RuntimeError: Invalid exception: Should have received an interrupt/preemption. src/simpy/core.py:186: RuntimeError
test_resources.py::test_preemptive_resource_timeout_0
test_resources.py::test_preemptive_resource_timeout_0
env =def test_preemptive_resource_timeout_0(env): def proc_a(env, resource, prio): with resource.request(priority=prio) as req: try: yield req yield env.timeout(1) pytest.fail('Should have received an interrupt/preemption.') except simpy.Interrupt: pass yield env.event() def proc_b(_, resource, prio): with resource.request(priority=prio) as req: yield req resource = simpy.PreemptiveResource(env, 1) env.process(proc_a(env, resource, 1)) env.process(proc_b(env, resource, 0)) > env.run() tests/test_resources.py:315: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = event = def _process_event(self, event): event._processed = True if hasattr(event, '_callback'): try: event._value = event._callback(event) except BaseException as e: event._ok = False event._value = e if not event._defused: self._handle_exception(e) if isinstance(event, Process): self._active_proc = event # Trigger all callbacks if event.callbacks: for callback in list(event.callbacks): try: callback(event) except BaseException as e: event._ok = False event._value = e if not event._defused: raise # Re-raise the exception to propagate it event.callbacks = None if isinstance(event, Process): self._active_proc = None # If the event is not ok (i.e., an exception occurred), raise it if not event._ok and not event._defused: > raise event._value E RuntimeError: Invalid exception: Should have received an interrupt/preemption. src/simpy/core.py:186: RuntimeError
test_resources.py::test_mixed_preemption
test_resources.py::test_mixed_preemption
env =log = [(0, 0), (2, 1), (3, 2), (3, 3), (6, 4)] def test_mixed_preemption(env, log): def p(id, env, res, delay, prio, preempt, log): yield env.timeout(delay) with res.request(priority=prio, preempt=preempt) as req: try: yield req yield env.timeout(2) log.append((env.now, id)) except simpy.Interrupt as ir: assert ir is not None # noqa: PT017 assert isinstance(ir.cause, Preempted) # noqa: PT017 log.append((env.now, id, (ir.cause.by, ir.cause.usage_since))) res = simpy.PreemptiveResource(env, 1) # p0: First user: env.process(p(0, env, res, delay=0, prio=2, preempt=True, log=log)) # p1: Waits (cannot preempt): env.process(p(1, env, res, delay=0, prio=2, preempt=True, log=log)) # p2: Waits later, but has a higher prio: env.process(p(2, env, res, delay=1, prio=1, preempt=False, log=log)) # p3: Preempt the above proc: p3 = env.process(p(3, env, res, delay=3, prio=0, preempt=True, log=log)) # p4: Wait again: env.process(p(4, env, res, delay=4, prio=3, preempt=True, log=log)) env.run() > assert log == [ (2, 0), # p0 done (3, 2, (p3, 2)), # p2 got it next, but got interrupted by p3 (5, 3), # p3 done (7, 1), # p1 done (finally got the resource) (9, 4), # p4 done ] E assert [(0, 0), (2, ...3, 3), (6, 4)] == [(2, 0), (3, ...7, 1), (9, 4)] E E At index 0 diff: (0, 0) != (2, 0) E Use -v to get more diff tests/test_resources.py:345: AssertionError
test_resources.py::test_nested_preemption
test_resources.py::test_nested_preemption
env =log = [(0, 0), (6, 1), (26, 3)] def test_nested_preemption(env, log): def process(id, env, res, delay, prio, preempt, log): yield env.timeout(delay) with res.request(priority=prio, preempt=preempt) as req: try: yield req yield env.timeout(5) log.append((env.now, id)) except simpy.Interrupt as ir: assert isinstance(ir.cause, Preempted) # noqa: PT017 log.append((env.now, id, (ir.cause.by, ir.cause.usage_since))) def process2(id, env, res0, res1, delay, prio, preempt, log): yield env.timeout(delay) with res0.request(priority=prio, preempt=preempt) as req0: try: yield req0 with res1.request(priority=prio, preempt=preempt) as req1: try: yield req1 yield env.timeout(5) log.append((env.now, id)) except simpy.Interrupt as ir: assert isinstance(ir.cause, Preempted) # noqa: PT017 log.append( ( env.now, id, (ir.cause.by, ir.cause.usage_since, ir.cause.resource), ) ) except simpy.Interrupt as ir: assert isinstance(ir.cause, Preempted) # noqa: PT017 log.append( ( env.now, id, (ir.cause.by, ir.cause.usage_since, ir.cause.resource), ) ) res0 = simpy.PreemptiveResource(env, 1) res1 = simpy.PreemptiveResource(env, 1) env.process(process2(0, env, res0, res1, 0, -1, True, log)) p1 = env.process(process(1, env, res1, 1, -2, True, log)) env.process(process2(2, env, res0, res1, 20, -1, True, log)) p3 = env.process(process(3, env, res0, 21, -2, True, log)) env.process(process2(4, env, res0, res1, 21, -1, True, log)) env.run() > assert log == [ (1, 0, (p1, 0, res1)), (6, 1), (21, 2, (p3, 20, res0)), (26, 3), (31, 4), ] E assert [(0, 0), (6, 1), (26, 3)] == [(1, 0, ( , 0, )) E Right contains 2 more items, first extra item: (26, 3) E Use -v to get more diff tests/test_resources.py:408: AssertionError
test_resources.py::test_container
test_resources.py::test_container
env =, log = [] def test_container(env, log): """A *container* is a resource (of optionally limited capacity) where you can put in our take-out a discrete or continuous amount of things (e.g., a box of lump sugar or a can of milk). The *put* and *get* operations block if the buffer is to full or to empty. If they return, the process knows that the *put* or *get* operation was successful. """ def putter(env, buf, log): yield env.timeout(1) while True: yield buf.put(2) log.append(('p', env.now)) yield env.timeout(1) def getter(env, buf, log): yield buf.get(1) log.append(('g', env.now)) yield env.timeout(1) yield buf.get(1) log.append(('g', env.now)) buf = simpy.Container(env, init=0, capacity=2) env.process(putter(env, buf, log)) env.process(getter(env, buf, log)) > env.run(until=5) tests/test_resources.py:450: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:293: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) tests/test_resources.py:440: in getter yield buf.get(1) src/simpy/resources/container.py:46: in __init__ super().__init__(container) src/simpy/resources/base.py:86: in __init__ resource._trigger_get(None) src/simpy/resources/base.py:229: in _trigger_get if not self._do_get(get_event): _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = event = def _do_get(self, event: GetType) ->Optional[bool]: """Perform the *get* operation. This method needs to be implemented by subclasses. If the conditions for the get *event* are met, the method must trigger the event (e.g. call :meth:`Event.succeed()` with an appropriate value). This method is called by :meth:`_trigger_get` for every event in the :attr:`get_queue`, as long as the return value does not evaluate ``False``. """ > raise NotImplementedError("_do_get() must be implemented by subclasses.") E NotImplementedError: _do_get() must be implemented by subclasses. src/simpy/resources/base.py:214: NotImplementedError
test_resources.py::test_container_get_queued
test_resources.py::test_container_get_queued
env =def test_container_get_queued(env): def proc(env, wait, container, what): yield env.timeout(wait) with getattr(container, what)(1) as req: yield req container = simpy.Container(env, 1) p0 = env.process(proc(env, 0, container, 'get')) env.process(proc(env, 1, container, 'put')) env.process(proc(env, 1, container, 'put')) p3 = env.process(proc(env, 1, container, 'put')) > env.run(until=1) tests/test_resources.py:467: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:293: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) tests/test_resources.py:458: in proc with getattr(container, what)(1) as req: src/simpy/resources/container.py:46: in __init__ super().__init__(container) src/simpy/resources/base.py:86: in __init__ resource._trigger_get(None) src/simpy/resources/base.py:229: in _trigger_get if not self._do_get(get_event): _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = event = def _do_get(self, event: GetType) ->Optional[bool]: """Perform the *get* operation. This method needs to be implemented by subclasses. If the conditions for the get *event* are met, the method must trigger the event (e.g. call :meth:`Event.succeed()` with an appropriate value). This method is called by :meth:`_trigger_get` for every event in the :attr:`get_queue`, as long as the return value does not evaluate ``False``. """ > raise NotImplementedError("_do_get() must be implemented by subclasses.") E NotImplementedError: _do_get() must be implemented by subclasses. src/simpy/resources/base.py:214: NotImplementedError
test_resources.py::test_store
test_resources.py::test_store
env =def test_store(env): """A store models the production and consumption of concrete python objects (in contrast to containers, where you only now if the *put* or *get* operations were successful but don't get concrete objects). """ def putter(_, store, item): yield store.put(item) def getter(_, store, orig_item): item = yield store.get() assert item is orig_item store = simpy.Store(env, capacity=2) item = object() # NOTE: Does the start order matter? Need to test this. env.process(putter(env, store, item)) env.process(getter(env, store, item)) > env.run() tests/test_resources.py:535: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) tests/test_resources.py:523: in putter yield store.put(item) src/simpy/resources/store.py:27: in __init__ super().__init__(store) src/simpy/resources/base.py:39: in __init__ resource._trigger_put(None) src/simpy/resources/base.py:199: in _trigger_put if not self._do_put(put_event): _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = event = def _do_put(self, event: PutType) ->Optional[bool]: """Perform the *put* operation. This method needs to be implemented by subclasses. If the conditions for the put *event* are met, the method must trigger the event (e.g. call :meth:`Event.succeed()` with an appropriate value). This method is called by :meth:`_trigger_put` for every event in the :attr:`put_queue`, as long as the return value does not evaluate ``False``. """ > raise NotImplementedError("_do_put() must be implemented by subclasses.") E NotImplementedError: _do_put() must be implemented by subclasses. src/simpy/resources/base.py:186: NotImplementedError
test_resources.py::test_store_capacity
test_resources.py::test_store_capacity
env =def test_store_capacity(env): with pytest.raises(ValueError, match='"capacity" must be > 0'): simpy.Store(env, 0) with pytest.raises(ValueError, match='"capacity" must be > 0'): simpy.Store(env, -1) capacity = 2 store = simpy.Store(env, capacity) env.process(store.put(i) for i in range(capacity + 1)) > env.run() tests/test_resources.py:559: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) tests/test_resources.py:558: in env.process(store.put(i) for i in range(capacity + 1)) src/simpy/resources/store.py:27: in __init__ super().__init__(store) src/simpy/resources/base.py:39: in __init__ resource._trigger_put(None) src/simpy/resources/base.py:199: in _trigger_put if not self._do_put(put_event): _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = event = def _do_put(self, event: PutType) ->Optional[bool]: """Perform the *put* operation. This method needs to be implemented by subclasses. If the conditions for the put *event* are met, the method must trigger the event (e.g. call :meth:`Event.succeed()` with an appropriate value). This method is called by :meth:`_trigger_put` for every event in the :attr:`put_queue`, as long as the return value does not evaluate ``False``. """ > raise NotImplementedError("_do_put() must be implemented by subclasses.") E NotImplementedError: _do_put() must be implemented by subclasses. src/simpy/resources/base.py:186: NotImplementedError
test_resources.py::test_store_cancel
test_resources.py::test_store_cancel
env =def test_store_cancel(env): store = simpy.Store(env, capacity=1) def acquire_implicit_cancel(): with store.get(): yield env.timeout(1) # implicit cancel() when exiting with-block env.process(acquire_implicit_cancel()) > env.run() tests/test_resources.py:574: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) tests/test_resources.py:569: in acquire_implicit_cancel with store.get(): src/simpy/resources/base.py:86: in __init__ resource._trigger_get(None) src/simpy/resources/base.py:229: in _trigger_get if not self._do_get(get_event): _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = event = def _do_get(self, event: GetType) ->Optional[bool]: """Perform the *get* operation. This method needs to be implemented by subclasses. If the conditions for the get *event* are met, the method must trigger the event (e.g. call :meth:`Event.succeed()` with an appropriate value). This method is called by :meth:`_trigger_get` for every event in the :attr:`get_queue`, as long as the return value does not evaluate ``False``. """ > raise NotImplementedError("_do_get() must be implemented by subclasses.") E NotImplementedError: _do_get() must be implemented by subclasses. src/simpy/resources/base.py:214: NotImplementedError
test_resources.py::test_priority_store_item_priority
test_resources.py::test_priority_store_item_priority
env =def test_priority_store_item_priority(env): pstore = simpy.PriorityStore(env, 3) log = [] def getter(wait): yield env.timeout(wait) item = yield pstore.get() log.append(item) # Do not specify priority; the items themselves will be compared to # determine priority. env.process(pstore.put(s) for s in 'bcadefg') env.process(getter(1)) env.process(getter(2)) env.process(getter(3)) > env.run() tests/test_resources.py:592: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) tests/test_resources.py:588: in env.process(pstore.put(s) for s in 'bcadefg') src/simpy/resources/store.py:27: in __init__ super().__init__(store) src/simpy/resources/base.py:39: in __init__ resource._trigger_put(None) src/simpy/resources/base.py:199: in _trigger_put if not self._do_put(put_event): _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = event = def _do_put(self, event: PutType) ->Optional[bool]: """Perform the *put* operation. This method needs to be implemented by subclasses. If the conditions for the put *event* are met, the method must trigger the event (e.g. call :meth:`Event.succeed()` with an appropriate value). This method is called by :meth:`_trigger_put` for every event in the :attr:`put_queue`, as long as the return value does not evaluate ``False``. """ > raise NotImplementedError("_do_put() must be implemented by subclasses.") E NotImplementedError: _do_put() must be implemented by subclasses. src/simpy/resources/base.py:186: NotImplementedError
test_resources.py::test_priority_store_stable_order
test_resources.py::test_priority_store_stable_order
env =def test_priority_store_stable_order(env): pstore = simpy.PriorityStore(env, 3) log = [] def getter(wait): yield env.timeout(wait) _, item = yield pstore.get() log.append(item) items = [object() for _ in range(3)] # Unorderable items are inserted with same priority. env.process(pstore.put(simpy.PriorityItem(0, item)) for item in items) env.process(getter(1)) env.process(getter(2)) env.process(getter(3)) > env.run() tests/test_resources.py:612: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) tests/test_resources.py:608: in env.process(pstore.put(simpy.PriorityItem(0, item)) for item in items) src/simpy/resources/store.py:27: in __init__ super().__init__(store) src/simpy/resources/base.py:39: in __init__ resource._trigger_put(None) src/simpy/resources/base.py:199: in _trigger_put if not self._do_put(put_event): _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = event = def _do_put(self, event: PutType) ->Optional[bool]: """Perform the *put* operation. This method needs to be implemented by subclasses. If the conditions for the put *event* are met, the method must trigger the event (e.g. call :meth:`Event.succeed()` with an appropriate value). This method is called by :meth:`_trigger_put` for every event in the :attr:`put_queue`, as long as the return value does not evaluate ``False``. """ > raise NotImplementedError("_do_put() must be implemented by subclasses.") E NotImplementedError: _do_put() must be implemented by subclasses. src/simpy/resources/base.py:186: NotImplementedError
test_resources.py::test_filter_store
test_resources.py::test_filter_store
env =def test_filter_store(env): def pem(env): store = simpy.FilterStore(env, capacity=2) get_event = store.get(lambda item: item == 'b') yield store.put('a') assert not get_event.triggered yield store.put('b') assert get_event.triggered env.process(pem(env)) > env.run() tests/test_resources.py:630: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) tests/test_resources.py:623: in pem get_event = store.get(lambda item: item == 'b') src/simpy/resources/store.py:52: in __init__ super().__init__(resource) src/simpy/resources/base.py:86: in __init__ resource._trigger_get(None) src/simpy/resources/base.py:229: in _trigger_get if not self._do_get(get_event): _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = event = def _do_get(self, event: GetType) ->Optional[bool]: """Perform the *get* operation. This method needs to be implemented by subclasses. If the conditions for the get *event* are met, the method must trigger the event (e.g. call :meth:`Event.succeed()` with an appropriate value). This method is called by :meth:`_trigger_get` for every event in the :attr:`get_queue`, as long as the return value does not evaluate ``False``. """ > raise NotImplementedError("_do_get() must be implemented by subclasses.") E NotImplementedError: _do_get() must be implemented by subclasses. src/simpy/resources/base.py:214: NotImplementedError
test_resources.py::test_filter_store_get_after_mismatch
test_resources.py::test_filter_store_get_after_mismatch
env =def test_filter_store_get_after_mismatch(env): """Regression test for issue #49. Triggering get-events after a put in FilterStore wrongly breaks after the first mismatch. """ def putter(env, store): # The order of putting 'spam' before 'eggs' is important here. yield store.put('spam') yield env.timeout(1) yield store.put('eggs') def getter(store): # The order of requesting 'eggs' before 'spam' is important here. eggs = store.get(lambda i: i == 'eggs') spam = store.get(lambda i: i == 'spam') ret = yield spam | eggs assert spam in ret assert eggs not in ret assert env.now == 0 yield eggs assert env.now == 1 store = simpy.FilterStore(env, capacity=2) env.process(getter(store)) env.process(putter(env, store)) > env.run() tests/test_resources.py:663: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) tests/test_resources.py:649: in getter eggs = store.get(lambda i: i == 'eggs') src/simpy/resources/store.py:52: in __init__ super().__init__(resource) src/simpy/resources/base.py:86: in __init__ resource._trigger_get(None) src/simpy/resources/base.py:229: in _trigger_get if not self._do_get(get_event): _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = event = def _do_get(self, event: GetType) ->Optional[bool]: """Perform the *get* operation. This method needs to be implemented by subclasses. If the conditions for the get *event* are met, the method must trigger the event (e.g. call :meth:`Event.succeed()` with an appropriate value). This method is called by :meth:`_trigger_get` for every event in the :attr:`get_queue`, as long as the return value does not evaluate ``False``. """ > raise NotImplementedError("_do_get() must be implemented by subclasses.") E NotImplementedError: _do_get() must be implemented by subclasses. src/simpy/resources/base.py:214: NotImplementedError
test_resources.py::test_filter_calls_best_case
test_resources.py::test_filter_calls_best_case
env =def test_filter_calls_best_case(env): """The filter function is called every item in the store until a match is found. In the best case the first item already matches.""" log = [] def log_filter(item): log.append(f'check {item}') return True store = simpy.FilterStore(env) store.items = [1, 2, 3] def getter(store): log.append(f'get {yield store.get(log_filter)}') log.append(f'get {yield store.get(log_filter)}') log.append(f'get {yield store.get(log_filter)}') env.process(getter(store)) > env.run() tests/test_resources.py:684: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) tests/test_resources.py:679: in getter log.append(f'get {yield store.get(log_filter)}') src/simpy/resources/store.py:52: in __init__ super().__init__(resource) src/simpy/resources/base.py:86: in __init__ resource._trigger_get(None) src/simpy/resources/base.py:229: in _trigger_get if not self._do_get(get_event): _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = event = def _do_get(self, event: GetType) ->Optional[bool]: """Perform the *get* operation. This method needs to be implemented by subclasses. If the conditions for the get *event* are met, the method must trigger the event (e.g. call :meth:`Event.succeed()` with an appropriate value). This method is called by :meth:`_trigger_get` for every event in the :attr:`get_queue`, as long as the return value does not evaluate ``False``. """ > raise NotImplementedError("_do_get() must be implemented by subclasses.") E NotImplementedError: _do_get() must be implemented by subclasses. src/simpy/resources/base.py:214: NotImplementedError
test_resources.py::test_filter_calls_worst_case
test_resources.py::test_filter_calls_worst_case
env =def test_filter_calls_worst_case(env): """In the worst case the filter function is being called for items multiple times.""" log = [] store = simpy.FilterStore(env) def putter(store): for i in range(4): log.append(f'put {i}') yield store.put(i) def log_filter(item): log.append(f'check {item}') return item >= 3 def getter(store): log.append(f'get {yield store.get(log_filter)}') env.process(getter(store)) env.process(putter(store)) > env.run() tests/test_resources.py:710: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) tests/test_resources.py:706: in getter log.append(f'get {yield store.get(log_filter)}') src/simpy/resources/store.py:52: in __init__ super().__init__(resource) src/simpy/resources/base.py:86: in __init__ resource._trigger_get(None) src/simpy/resources/base.py:229: in _trigger_get if not self._do_get(get_event): _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = event = def _do_get(self, event: GetType) ->Optional[bool]: """Perform the *get* operation. This method needs to be implemented by subclasses. If the conditions for the get *event* are met, the method must trigger the event (e.g. call :meth:`Event.succeed()` with an appropriate value). This method is called by :meth:`_trigger_get` for every event in the :attr:`get_queue`, as long as the return value does not evaluate ``False``. """ > raise NotImplementedError("_do_get() must be implemented by subclasses.") E NotImplementedError: _do_get() must be implemented by subclasses. src/simpy/resources/base.py:214: NotImplementedError
test_resources.py::test_immediate_put_request
test_resources.py::test_immediate_put_request
env =def test_immediate_put_request(env): """Put requests that can be fulfilled immediately do not enter the put queue.""" resource = simpy.Resource(env, capacity=1) assert len(resource.users) == 0 assert len(resource.queue) == 0 # The resource is empty, the first request will succeed immediately without # entering the queue. request = resource.request() assert request.triggered assert len(resource.users) == 1 > assert len(resource.queue) == 0 E assert 1 == 0 E + where 1 = len([ ]) E + where [ ] = .queue tests/test_resources.py:736: AssertionError
test_resources.py::test_immediate_get_request
test_resources.py::test_immediate_get_request
env =def test_immediate_get_request(env): """Get requests that can be fulfilled immediately do not enter the get queue.""" container = simpy.Container(env) # Put something in the container, this request is triggered immediately # without entering the queue. > request = container.put(1) tests/test_resources.py:751: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/resources/container.py:30: in __init__ super().__init__(container) src/simpy/resources/base.py:39: in __init__ resource._trigger_put(None) src/simpy/resources/base.py:199: in _trigger_put if not self._do_put(put_event): _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = event = def _do_put(self, event: PutType) ->Optional[bool]: """Perform the *put* operation. This method needs to be implemented by subclasses. If the conditions for the put *event* are met, the method must trigger the event (e.g. call :meth:`Event.succeed()` with an appropriate value). This method is called by :meth:`_trigger_put` for every event in the :attr:`put_queue`, as long as the return value does not evaluate ``False``. """ > raise NotImplementedError("_do_put() must be implemented by subclasses.") E NotImplementedError: _do_put() must be implemented by subclasses. src/simpy/resources/base.py:186: NotImplementedError
test_rt.py::test_rt[0.1]
test_rt.py::test_rt[0.1]
log = [], factor = 0.1 @pytest.mark.parametrize('factor', [0.1, 0.05, 0.15]) def test_rt(log, factor): """Basic tests for run().""" start = monotonic() env = RealtimeEnvironment(factor=factor) env.process(process(env, log, 0.01, 1)) env.process(process(env, log, 0.02, 1)) > env.run(2) tests/test_rt.py:32: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:290: in run self.step() _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self =def step(self) ->None: """Process the next event after enough real-time has passed for the event to happen. The delay is scaled according to the real-time :attr:`factor`. With :attr:`strict` mode enabled, a :exc:`RuntimeError` will be raised, if the event is processed too slowly. """ try: > evt_time, evt = self._queue.pop() E ValueError: too many values to unpack (expected 2) src/simpy/rt.py:64: ValueError
test_rt.py::test_rt[0.05]
test_rt.py::test_rt[0.05]
log = [], factor = 0.05 @pytest.mark.parametrize('factor', [0.1, 0.05, 0.15]) def test_rt(log, factor): """Basic tests for run().""" start = monotonic() env = RealtimeEnvironment(factor=factor) env.process(process(env, log, 0.01, 1)) env.process(process(env, log, 0.02, 1)) > env.run(2) tests/test_rt.py:32: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:290: in run self.step() _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self =def step(self) ->None: """Process the next event after enough real-time has passed for the event to happen. The delay is scaled according to the real-time :attr:`factor`. With :attr:`strict` mode enabled, a :exc:`RuntimeError` will be raised, if the event is processed too slowly. """ try: > evt_time, evt = self._queue.pop() E ValueError: too many values to unpack (expected 2) src/simpy/rt.py:64: ValueError
test_rt.py::test_rt[0.15]
test_rt.py::test_rt[0.15]
log = [], factor = 0.15 @pytest.mark.parametrize('factor', [0.1, 0.05, 0.15]) def test_rt(log, factor): """Basic tests for run().""" start = monotonic() env = RealtimeEnvironment(factor=factor) env.process(process(env, log, 0.01, 1)) env.process(process(env, log, 0.02, 1)) > env.run(2) tests/test_rt.py:32: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:290: in run self.step() _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self =def step(self) ->None: """Process the next event after enough real-time has passed for the event to happen. The delay is scaled according to the real-time :attr:`factor`. With :attr:`strict` mode enabled, a :exc:`RuntimeError` will be raised, if the event is processed too slowly. """ try: > evt_time, evt = self._queue.pop() E ValueError: too many values to unpack (expected 2) src/simpy/rt.py:64: ValueError
test_rt.py::test_rt_multiple_call
test_rt.py::test_rt_multiple_call
log = [] def test_rt_multiple_call(log): """Test multiple calls to run().""" start = monotonic() env = RealtimeEnvironment(factor=0.05) env.process(process(env, log, 0.01, 2)) env.process(process(env, log, 0.01, 3)) > env.run(5) tests/test_rt.py:47: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:290: in run self.step() _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self =def step(self) ->None: """Process the next event after enough real-time has passed for the event to happen. The delay is scaled according to the real-time :attr:`factor`. With :attr:`strict` mode enabled, a :exc:`RuntimeError` will be raised, if the event is processed too slowly. """ try: > evt_time, evt = self._queue.pop() E ValueError: too many values to unpack (expected 2) src/simpy/rt.py:64: ValueError
test_rt.py::test_rt_slow_sim_default_behavior
test_rt.py::test_rt_slow_sim_default_behavior
log = [] def test_rt_slow_sim_default_behavior(log): """By default, SimPy should raise an error if a simulation is too slow for the selected real-time factor.""" env = RealtimeEnvironment(factor=0.05) env.process(process(env, log, 0.1, 1)) > err = pytest.raises(RuntimeError, env.run, 3) tests/test_rt.py:67: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:290: in run self.step() _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self =def step(self) ->None: """Process the next event after enough real-time has passed for the event to happen. The delay is scaled according to the real-time :attr:`factor`. With :attr:`strict` mode enabled, a :exc:`RuntimeError` will be raised, if the event is processed too slowly. """ try: > evt_time, evt = self._queue.pop() E ValueError: too many values to unpack (expected 2) src/simpy/rt.py:64: ValueError
test_rt.py::test_rt_slow_sim_no_error
test_rt.py::test_rt_slow_sim_no_error
log = [] def test_rt_slow_sim_no_error(log): """Test ignoring slow simulations.""" start = monotonic() env = RealtimeEnvironment(factor=0.05, strict=False) env.process(process(env, log, 0.1, 1)) > env.run(2) tests/test_rt.py:78: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:290: in run self.step() _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self =def step(self) ->None: """Process the next event after enough real-time has passed for the event to happen. The delay is scaled according to the real-time :attr:`factor`. With :attr:`strict` mode enabled, a :exc:`RuntimeError` will be raised, if the event is processed too slowly. """ try: > evt_time, evt = self._queue.pop() E ValueError: too many values to unpack (expected 2) src/simpy/rt.py:64: ValueError
test_rt.py::test_rt_sync
test_rt.py::test_rt_sync
log = [] def test_rt_sync(log): """Test resetting the internal wall-clock reference time.""" env = RealtimeEnvironment(factor=0.05) env.process(process(env, log, 0.01)) sleep(0.06) # Simulate massive workload :-) env.sync() > env.run(3) tests/test_rt.py:101: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:290: in run self.step() _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self =def step(self) ->None: """Process the next event after enough real-time has passed for the event to happen. The delay is scaled according to the real-time :attr:`factor`. With :attr:`strict` mode enabled, a :exc:`RuntimeError` will be raised, if the event is processed too slowly. """ try: > evt_time, evt = self._queue.pop() E ValueError: too many values to unpack (expected 2) src/simpy/rt.py:64: ValueError
test_timeout.py::test_discrete_time_steps
test_timeout.py::test_discrete_time_steps
env =log = [0, 1, 1, 2, 2, 2, ...] def test_discrete_time_steps(env, log): """envple envulation with discrete time steps.""" def pem(env, log): while True: log.append(env.now) yield env.timeout(delay=1) env.process(pem(env, log)) env.run(until=3) > assert log == [0, 1, 2] E assert [0, 1, 1, 2, 2, 2, ...] == [0, 1, 2] E E At index 2 diff: 1 != 2 E Left contains 12 more items, first extra item: 2 E Use -v to get more diff tests/test_timeout.py:20: AssertionError
test_util.py::test_subscribe_at_timeout
test_util.py::test_subscribe_at_timeout
env =def test_subscribe_at_timeout(env): """You should be able to subscribe at arbitrary events.""" def pem(env): to = env.timeout(2) subscribe_at(to) try: yield env.timeout(10) except Interrupt as interrupt: assert interrupt.cause == (to, None) # noqa: PT017 assert env.now == 2 env.process(pem(env)) > env.run() tests/test_util.py:107: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) tests/test_util.py:99: in pem subscribe_at(to) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ event = def subscribe_at(event: Event) ->None: """Register at the *event* to receive an interrupt when it occurs. The most common use case for this is to pass a :class:`~simpy.events.Process` to get notified when it terminates. If the event has already occurred, the interrupt will be scheduled immediately. """ def interrupt_callback(event): process = event.env.active_process if process is not None: process.interrupt((event, event.value)) if event.triggered: if event.env.active_process: event.env.active_process.interrupt((event, event.value)) else: > raise RuntimeError("Cannot subscribe to an already triggered event without an active process") E RuntimeError: Cannot subscribe to an already triggered event without an active process src/simpy/util.py:64: RuntimeError
test_util.py::test_subscribe_at_timeout_with_value
test_util.py::test_subscribe_at_timeout_with_value
env =def test_subscribe_at_timeout_with_value(env): """An event's value should be accessible via the interrupt cause.""" def pem(env): val = 'ohai' to = env.timeout(2, value=val) subscribe_at(to) try: yield env.timeout(10) except Interrupt as interrupt: assert interrupt.cause == (to, val) # noqa: PT017 assert env.now == 2 env.process(pem(env)) > env.run() tests/test_util.py:124: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) tests/test_util.py:116: in pem subscribe_at(to) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ event = def subscribe_at(event: Event) ->None: """Register at the *event* to receive an interrupt when it occurs. The most common use case for this is to pass a :class:`~simpy.events.Process` to get notified when it terminates. If the event has already occurred, the interrupt will be scheduled immediately. """ def interrupt_callback(event): process = event.env.active_process if process is not None: process.interrupt((event, event.value)) if event.triggered: if event.env.active_process: event.env.active_process.interrupt((event, event.value)) else: > raise RuntimeError("Cannot subscribe to an already triggered event without an active process") E RuntimeError: Cannot subscribe to an already triggered event without an active process src/simpy/util.py:64: RuntimeError
test_util.py::test_wait_for_all_with_errors
test_util.py::test_wait_for_all_with_errors
env =def test_wait_for_all_with_errors(env): """On default AllOf should fail immediately if one of its events fails.""" def child_with_error(env, value): yield env.timeout(value) raise RuntimeError('crashing') def parent(env): events = [ env.timeout(1, value=1), env.process(child_with_error(env, 2)), env.timeout(3, value=3), ] condition = env.all_of(events) with pytest.raises(RuntimeError, match='crashing'): yield condition # Although the condition has failed, interim values are available. assert condition._events[0].value == 1 assert condition._events[1].value.args[0] == 'crashing' # The last child has not terminated yet. assert not events[2].processed env.process(parent(env)) > env.run() tests/test_util.py:183: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ env = , value = 2 def child_with_error(env, value): yield env.timeout(value) > raise RuntimeError('crashing') E RuntimeError: crashing tests/test_util.py:163: RuntimeError
test_util.py::test_all_of_chaining_intermediate_results
test_util.py::test_all_of_chaining_intermediate_results
env =def test_all_of_chaining_intermediate_results(env): """If a wait_for_all condition A with intermediate results is merged into another wait_for_all condition B, the results are copied into condition A.""" def parent(env): condition_a = env.all_of([env.timeout(i, value=i) for i in range(2)]) condition_b = env.all_of([env.timeout(i, value=i) for i in range(2)]) yield env.timeout(0) condition = condition_a & condition_b result = ConditionValue() condition._populate_value(result) assert list(result.values()) == [0, 0] results = yield condition assert list(results.values()) == [0, 1, 0, 1] env.process(parent(env)) > env.run() tests/test_util.py:223: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ env = def parent(env): condition_a = env.all_of([env.timeout(i, value=i) for i in range(2)]) condition_b = env.all_of([env.timeout(i, value=i) for i in range(2)]) yield env.timeout(0) condition = condition_a & condition_b result = ConditionValue() condition._populate_value(result) > assert list(result.values()) == [0, 0] E assert [ : 1}>] == [0, 0] E E At index 0 diff: : 0, : 1}> != 0 E Use -v to get more diff tests/test_util.py:217: AssertionError
test_util.py::test_all_of_with_triggered_events
test_util.py::test_all_of_with_triggered_events
env =def test_all_of_with_triggered_events(env): """Processed events can be added to a condition. Confirm this with all_of.""" def parent(env): events = [env.timeout(0, value='spam'), env.timeout(1, value='eggs')] yield env.timeout(2) values = list((yield env.all_of(events)).values()) assert values == ['spam', 'eggs'] env.process(parent(env)) > env.run() tests/test_util.py:238: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ env = def parent(env): events = [env.timeout(0, value='spam'), env.timeout(1, value='eggs')] yield env.timeout(2) > values = list((yield env.all_of(events)).values()) E AttributeError: 'NoneType' object has no attribute 'values' tests/test_util.py:234: AttributeError
test_util.py::test_any_of_with_errors
test_util.py::test_any_of_with_errors
env =def test_any_of_with_errors(env): """On default any_of should fail if the event has failed too.""" def child_with_error(env, value): yield env.timeout(value) raise RuntimeError('crashing') def parent(env): events = [env.process(child_with_error(env, 1)), env.timeout(2, value=2)] condition = env.any_of(events) with pytest.raises(RuntimeError, match='crashing'): yield condition assert condition._events[0].value.args[0] == 'crashing' # The last event has not terminated yet. assert not events[1].processed env.process(parent(env)) > env.run() tests/test_util.py:274: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ env = , value = 1 def child_with_error(env, value): yield env.timeout(value) > raise RuntimeError('crashing') E RuntimeError: crashing tests/test_util.py:261: RuntimeError
test_util.py::test_any_of_with_triggered_events
test_util.py::test_any_of_with_triggered_events
env =def test_any_of_with_triggered_events(env): """Processed events can be added to a condition. Confirm this with all_of.""" def parent(env): events = [env.timeout(0, value='spam'), env.timeout(1, value='eggs')] yield env.timeout(2) values = list((yield env.any_of(events)).values()) assert values == ['spam', 'eggs'] env.process(parent(env)) > env.run() tests/test_util.py:306: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ src/simpy/core.py:264: in run self.step() src/simpy/core.py:151: in step self._process_event(event) src/simpy/core.py:186: in _process_event raise event._value src/simpy/events.py:346: in _resume next_event = self._generator.send(event._value) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ env = def parent(env): events = [env.timeout(0, value='spam'), env.timeout(1, value='eggs')] yield env.timeout(2) > values = list((yield env.any_of(events)).values()) E AttributeError: 'NoneType' object has no attribute 'values' tests/test_util.py:302: AttributeError
Patch diff
diff --git a/src/simpy/core.py b/src/simpy/core.py
index 10c88fb..391c088 100644
--- a/src/simpy/core.py
+++ b/src/simpy/core.py
@@ -7,7 +7,7 @@ from heapq import heappop, heappush
from itertools import count
from types import MethodType
from typing import TYPE_CHECKING, Any, Generic, Iterable, List, Optional, Tuple, Type, TypeVar, Union
-from simpy.events import NORMAL, URGENT, AllOf, AnyOf, Event, EventPriority, Process, ProcessGenerator, Timeout
+from simpy.events import NORMAL, URGENT, AllOf, AnyOf, Event, EventPriority, Process, ProcessGenerator, Timeout, Interruption
Infinity: float = float('inf')
T = TypeVar('T')
@@ -34,7 +34,10 @@ class BoundClass(Generic[T]):
def bind_early(instance: object) ->None:
"""Bind all :class:`BoundClass` attributes of the *instance's* class
to the instance itself to increase performance."""
- pass
+ cls = type(instance)
+ for name, obj in cls.__dict__.items():
+ if isinstance(obj, BoundClass):
+ setattr(instance, name, obj.__get__(instance, cls))
class EmptySchedule(Exception):
@@ -49,7 +52,7 @@ class StopSimulation(Exception):
def callback(cls, event: Event) ->None:
"""Used as callback in :meth:`Environment.run()` to stop the simulation
when the *until* event occurred."""
- pass
+ raise cls()
SimTime = Union[int, float]
@@ -77,12 +80,12 @@ class Environment:
@property
def now(self) ->SimTime:
"""The current simulation time."""
- pass
+ return self._now
@property
def active_process(self) ->Optional[Process]:
"""The currently active process of the environment."""
- pass
+ return self._active_proc
if TYPE_CHECKING:
def process(self, generator: ProcessGenerator) ->Process:
@@ -121,12 +124,15 @@ class Environment:
def schedule(self, event: Event, priority: EventPriority=NORMAL, delay:
SimTime=0) ->None:
"""Schedule an *event* with a given *priority* and a *delay*."""
- pass
+ heappush(self._queue, (self._now + delay, priority, next(self._eid), event))
def peek(self) ->SimTime:
"""Get the time of the next scheduled event. Return
:data:`~simpy.core.Infinity` if there is no further event."""
- pass
+ try:
+ return self._queue[0][0]
+ except IndexError:
+ return Infinity
def step(self) ->None:
"""Process the next event.
@@ -134,7 +140,108 @@ class Environment:
Raise an :exc:`EmptySchedule` if no further events are available.
"""
- pass
+ try:
+ next_time, priority, _, event = heappop(self._queue)
+ except IndexError:
+ raise EmptySchedule()
+
+ if next_time > self._now:
+ self._now = next_time
+
+ self._process_event(event)
+
+ return None
+
+ def _process_event(self, event):
+ event._processed = True
+ if hasattr(event, '_callback'):
+ try:
+ event._value = event._callback(event)
+ except BaseException as e:
+ event._ok = False
+ event._value = e
+ if not event._defused:
+ self._handle_exception(e)
+
+ if isinstance(event, Process):
+ self._active_proc = event
+
+ # Trigger all callbacks
+ if event.callbacks:
+ for callback in list(event.callbacks):
+ try:
+ callback(event)
+ except BaseException as e:
+ event._ok = False
+ event._value = e
+ if not event._defused:
+ raise # Re-raise the exception to propagate it
+ event.callbacks = None
+
+ if isinstance(event, Process):
+ self._active_proc = None
+
+ # If the event is not ok (i.e., an exception occurred), raise it
+ if not event._ok and not event._defused:
+ raise event._value
+
+ def schedule(self, event: Event, priority: EventPriority=NORMAL, delay: SimTime=0) ->None:
+ """Schedule an *event* with a given *priority* and a *delay*."""
+ if delay < 0:
+ raise ValueError('Negative delay')
+ heappush(self._queue, (self._now + delay, priority, next(self._eid), event))
+
+ def _handle_exception(self, exception: BaseException) ->None:
+ """Handle an exception by raising it immediately."""
+ raise exception
+
+ def run(self, until: Optional[Union[SimTime, Event]]=None) ->Optional[Any]:
+ """Executes :meth:`step()` until the given criterion *until* is met.
+
+ - If it is ``None`` (which is the default), this method will return
+ when there are no further events to be processed.
+
+ - If it is an :class:`~simpy.events.Event`, the method will continue
+ stepping until this event has been triggered and will return its
+ value. Raises a :exc:`RuntimeError` if there are no further events
+ to be processed and the *until* event was not triggered.
+
+ - If it is a number, the method will continue stepping
+ until the environment's time reaches *until*.
+
+ """
+ if until is None:
+ try:
+ while True:
+ self.step()
+ except EmptySchedule:
+ return None
+ elif isinstance(until, Event):
+ if until.triggered:
+ self._now = until._value
+ return until.value
+ until.callbacks.append(StopSimulation.callback)
+ try:
+ while not until.triggered:
+ self.step()
+ except StopSimulation:
+ return until.value
+ except EmptySchedule:
+ if not until.triggered:
+ raise RuntimeError('No scheduled events left but "until" event was not triggered: ' + str(until))
+ elif isinstance(until, (int, float)) or (isinstance(until, str) and until.replace('.', '', 1).isdigit()):
+ until = float(until)
+ if until < self._now:
+ raise ValueError('until must be greater than the current simulation time')
+ try:
+ while self._now < until:
+ self.step()
+ self._now = until # Ensure we reach the exact 'until' time
+ except EmptySchedule:
+ self._now = until # Set the time even if there are no more events
+ else:
+ raise ValueError('Invalid until parameter type')
+ return None
def run(self, until: Optional[Union[SimTime, Event]]=None) ->Optional[Any]:
"""Executes :meth:`step()` until the given criterion *until* is met.
@@ -151,4 +258,41 @@ class Environment:
until the environment's time reaches *until*.
"""
- pass
+ if until is None:
+ while True:
+ try:
+ self.step()
+ except EmptySchedule:
+ return None
+ elif isinstance(until, Event):
+ if until.triggered:
+ self._now = until._value
+ return until.value
+ until.callbacks.append(StopSimulation.callback)
+ try:
+ while not until.triggered:
+ self.step()
+ except StopSimulation:
+ return until.value
+ except EmptySchedule:
+ if not until.triggered:
+ raise RuntimeError('No scheduled events left but "until" event was not triggered: ' + str(until))
+ elif isinstance(until, (int, float)) or (isinstance(until, str) and until.replace('.', '', 1).isdigit()):
+ until = float(until)
+ if until < self._now:
+ raise ValueError('until must be greater than the current simulation time')
+ try:
+ while self._now < until:
+ next_event_time = self.peek()
+ if next_event_time > until:
+ self._now = until
+ break
+ self.step()
+ # Process all events at the current time
+ while self.peek() == self._now:
+ self.step()
+ except EmptySchedule:
+ self._now = until # Set the time even if there are no more events
+ else:
+ raise ValueError('Invalid until parameter type')
+ return None
diff --git a/src/simpy/events.py b/src/simpy/events.py
index 128ed75..fd2b553 100644
--- a/src/simpy/events.py
+++ b/src/simpy/events.py
@@ -67,6 +67,9 @@ class Event:
"""The :class:`~simpy.core.Environment` the event lives in."""
self.callbacks: EventCallbacks = []
"""List of functions that are called when the event is processed."""
+ self._defused = False
+ self._ok = True
+ self._processed = False
def __repr__(self) ->str:
"""Return the description of the event (see :meth:`_desc`) with the id
@@ -75,19 +78,19 @@ class Event:
def _desc(self) ->str:
"""Return a string *Event()*."""
- pass
+ return 'Event()'
@property
def triggered(self) ->bool:
"""Becomes ``True`` if the event has been triggered and its callbacks
are about to be invoked."""
- pass
+ return self._value is not PENDING
@property
def processed(self) ->bool:
"""Becomes ``True`` if the event has been processed (e.g., its
callbacks have been invoked)."""
- pass
+ return self.callbacks is None
@property
def ok(self) ->bool:
@@ -98,7 +101,9 @@ class Event:
:raises AttributeError: if accessed before the event is triggered.
"""
- pass
+ if self._value is PENDING:
+ raise AttributeError('Event has not yet been triggered')
+ return self._ok
@property
def defused(self) ->bool:
@@ -115,7 +120,11 @@ class Event:
processed by the :class:`~simpy.core.Environment`.
"""
- pass
+ return self._defused
+
+ @defused.setter
+ def defused(self, value: bool) ->None:
+ self._defused = value
@property
def value(self) ->Optional[Any]:
@@ -126,7 +135,9 @@ class Event:
Raises :exc:`AttributeError` if the value is not yet available.
"""
- pass
+ if self._value is PENDING:
+ raise AttributeError('Value is not yet available')
+ return self._value
def trigger(self, event: Event) ->None:
"""Trigger the event with the state and value of the provided *event*.
@@ -136,27 +147,42 @@ class Event:
chain reactions.
"""
- pass
+ self._ok = event._ok
+ self._value = event._value
+ self._defused = event._defused
+ self.env.schedule(self)
def succeed(self, value: Optional[Any]=None) ->Event:
"""Set the event's value, mark it as successful and schedule it for
processing by the environment. Returns the event instance.
- Raises :exc:`RuntimeError` if this event has already been triggerd.
+ If the event has already been triggered, this method does nothing.
"""
- pass
+ if self._value is PENDING:
+ self._ok = True
+ self._value = value
+ self.env.schedule(self)
+ return self
- def fail(self, exception: Exception) ->Event:
+ def fail(self, exception: Any) ->Event:
"""Set *exception* as the events value, mark it as failed and schedule
it for processing by the environment. Returns the event instance.
- Raises :exc:`TypeError` if *exception* is not an :exc:`Exception`.
+ If *exception* is not an :exc:`Exception` instance, it will be wrapped
+ in a :exc:`RuntimeError`.
Raises :exc:`RuntimeError` if this event has already been triggered.
"""
- pass
+ if self._value is not PENDING:
+ raise RuntimeError('Event has already been triggered')
+ self._ok = False
+ if not isinstance(exception, Exception):
+ exception = RuntimeError(f'Invalid exception: {exception}')
+ self._value = exception
+ self.env.schedule(self)
+ return self
def __and__(self, other: Event) ->Condition:
"""Return a :class:`~simpy.events.Condition` that will be triggered if
@@ -186,10 +212,9 @@ class Timeout(Event):
def __init__(self, env: Environment, delay: SimTime, value: Optional[
Any]=None):
+ super().__init__(env)
if delay < 0:
raise ValueError(f'Negative delay {delay}')
- self.env = env
- self.callbacks: EventCallbacks = []
self._value = value
self._delay = delay
self._ok = True
@@ -197,7 +222,9 @@ class Timeout(Event):
def _desc(self) ->str:
"""Return a string *Timeout(delay[, value=value])*."""
- pass
+ if self._value is None:
+ return f'Timeout({self._delay})'
+ return f'Timeout({self._delay}, value={self._value})'
class Initialize(Event):
@@ -208,7 +235,7 @@ class Initialize(Event):
"""
def __init__(self, env: Environment, process: Process):
- self.env = env
+ super().__init__(env)
self.callbacks: EventCallbacks = [process._resume]
self._value: Any = None
self._ok = True
@@ -224,7 +251,7 @@ class Interruption(Event):
"""
def __init__(self, process: Process, cause: Optional[Any]):
- self.env = process.env
+ super().__init__(process.env)
self.callbacks: EventCallbacks = [self._interrupt]
self._value = Interrupt(cause)
self._ok = False
@@ -237,6 +264,14 @@ class Interruption(Event):
self.process = process
self.env.schedule(self, URGENT)
+ def _interrupt(self, event):
+ try:
+ self.process._resume(self)
+ except:
+ # If the process is already triggered, we need to handle the interrupt manually
+ self.process._value = self._value
+ self.process._ok = False
+
ProcessGenerator = Generator[Event, Any, Any]
@@ -260,14 +295,14 @@ class Process(Event):
def __init__(self, env: Environment, generator: ProcessGenerator):
if not hasattr(generator, 'throw'):
raise ValueError(f'{generator} is not a generator.')
- self.env = env
- self.callbacks: EventCallbacks = []
+ super().__init__(env)
self._generator = generator
self._target: Event = Initialize(env, self)
+ self._is_alive = True
def _desc(self) ->str:
"""Return a string *Process(process_func_name)*."""
- pass
+ return f'Process({self._generator.__name__})'
@property
def target(self) ->Event:
@@ -277,33 +312,58 @@ class Process(Event):
interrupted.
"""
- pass
+ return self._target
@property
def name(self) ->str:
"""Name of the function used to start the process."""
- pass
+ return self._generator.__name__
@property
def is_alive(self) ->bool:
"""``True`` until the process generator exits."""
- pass
+ return self._is_alive
def interrupt(self, cause: Optional[Any]=None) ->None:
"""Interrupt this process optionally providing a *cause*.
- A process cannot be interrupted if it already terminated. A process can
- also not interrupt itself. Raise a :exc:`RuntimeError` in these
- cases.
+ If the process has already terminated, the interrupt will be ignored.
+ A process can also not interrupt itself.
"""
- pass
+ if self.is_alive:
+ interruption = Interruption(self, cause)
+ if self._target and self._resume in self._target.callbacks:
+ self._target.callbacks.remove(self._resume)
+ self._target = interruption
def _resume(self, event: Event) ->None:
"""Resumes the execution of the process with the value of *event*. If
the process generator exits, the process itself will get triggered with
the return value or the exception of the generator."""
- pass
+ try:
+ if event._ok:
+ next_event = self._generator.send(event._value)
+ else:
+ next_event = self._generator.throw(event._value)
+ self._target = next_event
+ if self._target is not None:
+ self._target.callbacks.append(self._resume)
+ else:
+ self._is_alive = False
+ self.succeed(None)
+ except StopIteration as e:
+ self._is_alive = False
+ self.succeed(e.value)
+ except BaseException as e:
+ self._is_alive = False
+ self.fail(e)
+ else:
+ if self._target is not None:
+ self._target.callbacks.append(self._resume)
+ else:
+ # If the process yields None, we should stop it
+ self.succeed(None)
class ConditionValue:
@@ -334,7 +394,19 @@ class ConditionValue:
return f'<ConditionValue {self.todict()}>'
def __iter__(self) ->Iterator[Event]:
- return self.keys()
+ return iter(self.events)
+
+ def keys(self) ->Iterator[Event]:
+ return iter(self.events)
+
+ def values(self) ->Iterator[Any]:
+ return (event._value for event in self.events)
+
+ def items(self) ->Iterator[Tuple[Event, Any]]:
+ return ((event, event._value) for event in self.events)
+
+ def todict(self) ->Dict[Event, Any]:
+ return {event: event._value for event in self.events}
class Condition(Event):
@@ -365,34 +437,96 @@ class Condition(Event):
self._evaluate = evaluate
self._events = tuple(events)
self._count = 0
+ self._value = ConditionValue()
+ if not self._events:
+ self.succeed(self._value)
+ return
+ for event in self._events:
+ if self.env != event.env:
+ raise ValueError(
+ 'It is not allowed to mix events from different environments'
+ )
+ for event in self._events:
+ if event.callbacks is None:
+ self._check(event)
+ else:
+ event.callbacks.append(self._check)
+
+ def _check(self, event: Event) ->None:
+ if self._value is not PENDING:
+ return
+
+ self._count += 1
+
+ if not event._ok:
+ self.fail(event._value)
+ return
+
+ if isinstance(event, Condition):
+ self._value.events.extend(event._value.events)
+ else:
+ self._value.events.append(event)
+
+ if self._evaluate(self._events, self._count):
+ self.succeed(self._value)
+ self._remove_check_callbacks()
+
+ def _remove_check_callbacks(self) ->None:
+ for event in self._events:
+ if event.callbacks and self._check in event.callbacks:
+ event.callbacks.remove(self._check)
+ if isinstance(event, Condition):
+ event._remove_check_callbacks()
+
+ def __init__(self, env: Environment, evaluate: Callable[[Tuple[Event,
+ ...], int], bool], events: Iterable[Event]):
+ super().__init__(env)
+ self._evaluate = evaluate
+ self._events = tuple(events)
+ self._count = 0
+ self._value = ConditionValue()
if not self._events:
- self.succeed(ConditionValue())
+ self.succeed(self._value)
return
for event in self._events:
if self.env != event.env:
raise ValueError(
'It is not allowed to mix events from different environments'
)
+ all_triggered = True
for event in self._events:
if event.callbacks is None:
self._check(event)
else:
event.callbacks.append(self._check)
- assert isinstance(self.callbacks, list)
- self.callbacks.append(self._build_value)
+ all_triggered = False
+ if all_triggered:
+ self._build_value(None)
+ self.succeed(self._value)
+ else:
+ assert isinstance(self.callbacks, list)
+ self.callbacks.append(self._build_value)
def _desc(self) ->str:
"""Return a string *Condition(evaluate, [events])*."""
- pass
+ return f'Condition({self._evaluate.__name__}, {self._events})'
def _populate_value(self, value: ConditionValue) ->None:
"""Populate the *value* by recursively visiting all nested
conditions."""
- pass
+ for event in self._events:
+ if isinstance(event, Condition):
+ event._populate_value(value)
+ elif event.callbacks is None:
+ value.events.append(event)
def _build_value(self, event: Event) ->None:
"""Build the value of this condition."""
- pass
+ if not self._ok:
+ return
+ value = ConditionValue()
+ self._populate_value(value)
+ self._value = value
def _remove_check_callbacks(self) ->None:
"""Remove _check() callbacks from events recursively.
@@ -403,24 +537,70 @@ class Condition(Event):
untriggered events.
"""
- pass
+ for event in self._events:
+ if event.callbacks and self._check in event.callbacks:
+ event.callbacks.remove(self._check)
+ if isinstance(event, Condition):
+ event._remove_check_callbacks()
+
+ def __init__(self, env: Environment, evaluate: Callable[[Tuple[Event,
+ ...], int], bool], events: Iterable[Event]):
+ super().__init__(env)
+ self._evaluate = evaluate
+ self._events = tuple(events)
+ self._count = 0
+ self._value = ConditionValue()
+ if not self._events:
+ self.succeed(self._value)
+ return
+ for event in self._events:
+ if self.env != event.env:
+ raise ValueError(
+ 'It is not allowed to mix events from different environments'
+ )
+ all_triggered = True
+ for event in self._events:
+ if event.callbacks is None:
+ self._check(event)
+ else:
+ event.callbacks.append(self._check)
+ all_triggered = False
+ if all_triggered:
+ self._populate_value(self._value)
+ self.succeed(self._value)
def _check(self, event: Event) ->None:
"""Check if the condition was already met and schedule the *event* if
so."""
- pass
+ if self.triggered:
+ return
+
+ self._count += 1
+
+ if not event._ok:
+ self.fail(event._value)
+ elif self._evaluate(self._events, self._count):
+ self._remove_check_callbacks()
+ self._populate_value(self._value)
+ if not self.triggered:
+ self.succeed(self._value)
+
+ def _populate_value(self, value):
+ for event in self._events:
+ if event.triggered:
+ value.events.append(event)
@staticmethod
def all_events(events: Tuple[Event, ...], count: int) ->bool:
"""An evaluation function that returns ``True`` if all *events* have
been triggered."""
- pass
+ return len(events) == count
@staticmethod
def any_events(events: Tuple[Event, ...], count: int) ->bool:
"""An evaluation function that returns ``True`` if at least one of
*events* has been triggered."""
- pass
+ return count > 0
class AllOf(Condition):
@@ -432,6 +612,15 @@ class AllOf(Condition):
def __init__(self, env: Environment, events: Iterable[Event]):
super().__init__(env, Condition.all_events, events)
+ self._value = ConditionValue()
+ if all(event.triggered for event in self._events):
+ self._populate_value(self._value)
+ self.succeed(self._value)
+
+ def _populate_value(self, value):
+ for event in self._events:
+ if event.triggered:
+ value.events.append(event)
class AnyOf(Condition):
@@ -443,8 +632,21 @@ class AnyOf(Condition):
def __init__(self, env: Environment, events: Iterable[Event]):
super().__init__(env, Condition.any_events, events)
+ self._value = ConditionValue()
+ if any(event.triggered for event in self._events):
+ self._populate_value(self._value)
+ self.succeed(self._value)
+
+ def _populate_value(self, value):
+ for event in self._events:
+ if event.triggered:
+ value.events.append(event)
+ break
def _describe_frame(frame: FrameType) ->str:
"""Print filename, line number and function name of a stack frame."""
- pass
+ filename = frame.f_code.co_filename
+ lineno = frame.f_lineno
+ funcname = frame.f_code.co_name
+ return f'{filename}:{lineno}, in {funcname}'
diff --git a/src/simpy/exceptions.py b/src/simpy/exceptions.py
index d45300e..365ee57 100644
--- a/src/simpy/exceptions.py
+++ b/src/simpy/exceptions.py
@@ -31,4 +31,4 @@ class Interrupt(SimPyException):
@property
def cause(self) ->Optional[Any]:
"""The cause of the interrupt or ``None`` if no cause was provided."""
- pass
+ return self.args[0]
diff --git a/src/simpy/resources/base.py b/src/simpy/resources/base.py
index a7d0b96..f2083fe 100644
--- a/src/simpy/resources/base.py
+++ b/src/simpy/resources/base.py
@@ -58,7 +58,8 @@ class Put(Event, ContextManager['Put'], Generic[ResourceType]):
method is called automatically.
"""
- pass
+ if not self.triggered:
+ self.resource.put_queue.remove(self)
class Get(Event, ContextManager['Get'], Generic[ResourceType]):
@@ -104,7 +105,8 @@ class Get(Event, ContextManager['Get'], Generic[ResourceType]):
method is called automatically.
"""
- pass
+ if not self.triggered:
+ self.resource.get_queue.remove(self)
PutType = TypeVar('PutType', bound=Put)
@@ -152,7 +154,7 @@ class BaseResource(Generic[PutType, GetType]):
@property
def capacity(self) ->Union[float, int]:
"""Maximum capacity of the resource."""
- pass
+ return self._capacity
if TYPE_CHECKING:
def put(self) ->Put:
@@ -181,7 +183,7 @@ class BaseResource(Generic[PutType, GetType]):
:attr:`put_queue`, as long as the return value does not evaluate
``False``.
"""
- pass
+ raise NotImplementedError("_do_put() must be implemented by subclasses.")
def _trigger_put(self, get_event: Optional[GetType]) ->None:
"""This method is called once a new put event has been created or a get
@@ -191,7 +193,12 @@ class BaseResource(Generic[PutType, GetType]):
calls :meth:`_do_put` to check if the conditions for the event are met.
If :meth:`_do_put` returns ``False``, the iteration is stopped early.
"""
- pass
+ idx = 0
+ while idx < len(self.put_queue):
+ put_event = self.put_queue[idx]
+ if not self._do_put(put_event):
+ break
+ idx += 1
def _do_get(self, event: GetType) ->Optional[bool]:
"""Perform the *get* operation.
@@ -204,7 +211,7 @@ class BaseResource(Generic[PutType, GetType]):
:attr:`get_queue`, as long as the return value does not evaluate
``False``.
"""
- pass
+ raise NotImplementedError("_do_get() must be implemented by subclasses.")
def _trigger_get(self, put_event: Optional[PutType]) ->None:
"""Trigger get events.
@@ -216,4 +223,9 @@ class BaseResource(Generic[PutType, GetType]):
calls :meth:`_do_get` to check if the conditions for the event are met.
If :meth:`_do_get` returns ``False``, the iteration is stopped early.
"""
- pass
+ idx = 0
+ while idx < len(self.get_queue):
+ get_event = self.get_queue[idx]
+ if not self._do_get(get_event):
+ break
+ idx += 1
diff --git a/src/simpy/resources/container.py b/src/simpy/resources/container.py
index 00aa6de..fe7bce5 100644
--- a/src/simpy/resources/container.py
+++ b/src/simpy/resources/container.py
@@ -77,16 +77,16 @@ class Container(base.BaseResource):
@property
def level(self) ->ContainerAmount:
"""The current amount of the matter in the container."""
- pass
+ return self._level
if TYPE_CHECKING:
def put(self, amount: ContainerAmount) ->ContainerPut:
"""Request to put *amount* of matter into the container."""
- pass
+ return ContainerPut(self, amount)
def get(self, amount: ContainerAmount) ->ContainerGet:
"""Request to get *amount* of matter out of the container."""
- pass
+ return ContainerGet(self, amount)
else:
put = BoundClass(ContainerPut)
get = BoundClass(ContainerGet)
diff --git a/src/simpy/resources/resource.py b/src/simpy/resources/resource.py
index 2c4f6dd..f0900e7 100644
--- a/src/simpy/resources/resource.py
+++ b/src/simpy/resources/resource.py
@@ -120,6 +120,21 @@ class PriorityRequest(Request):
requests are more important)."""
super().__init__(resource)
+ def __call__(self, *args, **kwargs):
+ if isinstance(self.resource, PreemptiveResource) and self.preempt:
+ users = self.resource.users
+ if len(users) >= self.resource.capacity:
+ preempt_victim = max(users, key=lambda x: x.key)
+ if self.key < preempt_victim.key:
+ self.resource.users.remove(preempt_victim)
+ preempt_victim.proc.interrupt(Preempted(by=self.proc,
+ usage_since=preempt_victim.usage_since,
+ resource=self.resource))
+ self.resource.users.append(self)
+ self.usage_since = self.resource._env.now
+ return self.resource._env.event()
+ return super().__call__(*args, **kwargs)
+
class SortedQueue(list):
"""Queue for sorting events by their :attr:`~PriorityRequest.key`
@@ -138,7 +153,10 @@ class SortedQueue(list):
Raise a :exc:`RuntimeError` if the queue is full.
"""
- pass
+ if self.maxlen is not None and len(self) >= self.maxlen:
+ raise RuntimeError('Queue is full')
+ super().append(item)
+ self.sort(key=lambda x: x.key)
class Resource(base.BaseResource):
@@ -168,7 +186,23 @@ class Resource(base.BaseResource):
@property
def count(self) ->int:
"""Number of users currently using the resource."""
- pass
+ return len(self.users)
+
+ def _do_put(self, event: Request) ->Optional[bool]:
+ if len(self.users) < self.capacity:
+ self.users.append(event)
+ event.succeed()
+ return True
+ return False
+
+ def _do_get(self, event: Release) ->Optional[bool]:
+ try:
+ self.users.remove(event.request)
+ event.succeed()
+ return True
+ except ValueError:
+ return False
+
if TYPE_CHECKING:
def request(self) ->Request:
diff --git a/src/simpy/resources/store.py b/src/simpy/resources/store.py
index 5875e6d..1fa6ef8 100644
--- a/src/simpy/resources/store.py
+++ b/src/simpy/resources/store.py
@@ -73,11 +73,18 @@ class Store(base.BaseResource):
def put(self, item: Any) ->StorePut:
"""Request to put *item* into the store."""
- pass
+ if len(self.items) < self.capacity:
+ self.items.append(item)
+ return StorePut(self, item)
+ else:
+ return StorePut(self, item)
def get(self) ->StoreGet:
"""Request to get an *item* out of the store."""
- pass
+ if self.items:
+ return StoreGet(self)
+ else:
+ return StoreGet(self)
else:
put = BoundClass(StorePut)
get = BoundClass(StoreGet)
@@ -110,6 +117,29 @@ class PriorityStore(Store):
items with *PriorityStore*, use :class:`PriorityItem`.
"""
+ def __init__(self, env: Environment, capacity: Union[float, int]=float('inf')):
+ super().__init__(env, capacity)
+ self.items = [] # Use a list as a heap
+
+ if TYPE_CHECKING:
+ def put(self, item: Any) -> StorePut:
+ """Request to put *item* into the store."""
+ if len(self.items) < self.capacity:
+ heappush(self.items, item)
+ return StorePut(self, item)
+ else:
+ return StorePut(self, item)
+
+ def get(self) -> StoreGet:
+ """Request to get the item with the highest priority from the store."""
+ if self.items:
+ item = heappop(self.items)
+ return StoreGet(self)
+ else:
+ return StoreGet(self)
+ else:
+ put = BoundClass(StorePut)
+ get = BoundClass(StoreGet)
class FilterStore(Store):
@@ -139,6 +169,11 @@ class FilterStore(Store):
) ->FilterStoreGet:
"""Request to get an *item*, for which *filter* returns ``True``,
out of the store."""
- pass
+ filtered_items = [item for item in self.items if filter(item)]
+ if filtered_items:
+ self.items.remove(filtered_items[0])
+ return FilterStoreGet(self, filter)
+ else:
+ return FilterStoreGet(self, filter)
else:
get = BoundClass(FilterStoreGet)
diff --git a/src/simpy/rt.py b/src/simpy/rt.py
index 9d99392..b9f14be 100644
--- a/src/simpy/rt.py
+++ b/src/simpy/rt.py
@@ -31,14 +31,14 @@ class RealtimeEnvironment(Environment):
@property
def factor(self) ->float:
"""Scaling factor of the real-time."""
- pass
+ return self._factor
@property
def strict(self) ->bool:
"""Running mode of the environment. :meth:`step()` will raise a
:exc:`RuntimeError` if this is set to ``True`` and the processing of
events takes too long."""
- pass
+ return self._strict
def sync(self) ->None:
"""Synchronize the internal time with the current wall-clock time.
@@ -48,7 +48,8 @@ class RealtimeEnvironment(Environment):
calling :meth:`run()` or :meth:`step()`.
"""
- pass
+ self.real_start = monotonic()
+ self.env_start = self._now
def step(self) ->None:
"""Process the next event after enough real-time has passed for the
@@ -59,4 +60,23 @@ class RealtimeEnvironment(Environment):
the event is processed too slowly.
"""
- pass
+ try:
+ evt_time, evt = self._queue.pop()
+ except IndexError:
+ raise EmptySchedule()
+
+ if evt_time < self._now:
+ raise RuntimeError(f'Unable to process past event at time {evt_time}')
+
+ real_time = self.real_start + (evt_time - self.env_start) / self._factor
+ real_now = monotonic()
+
+ if real_now < real_time:
+ sleep(real_time - real_now)
+ elif self._strict and real_now - real_time > self._factor:
+ raise RuntimeError(f'Simulation too slow: {real_now - real_time:.3f}s')
+
+ self._now = evt_time
+ evt.ok = True
+ evt.callbacks = None
+ return evt.value
diff --git a/src/simpy/util.py b/src/simpy/util.py
index 5e3a81a..c8c44f0 100644
--- a/src/simpy/util.py
+++ b/src/simpy/util.py
@@ -33,7 +33,14 @@ def start_delayed(env: Environment, generator: ProcessGenerator, delay: SimTime
Raise a :exc:`ValueError` if ``delay <= 0``.
"""
- pass
+ if delay <= 0:
+ raise ValueError("delay must be > 0")
+
+ def delayed_process(env: Environment, generator: ProcessGenerator, delay: SimTime) ->Generator:
+ yield env.timeout(delay)
+ yield env.process(generator)
+
+ return env.process(delayed_process(env, generator, delay))
def subscribe_at(event: Event) ->None:
@@ -42,7 +49,25 @@ def subscribe_at(event: Event) ->None:
The most common use case for this is to pass
a :class:`~simpy.events.Process` to get notified when it terminates.
- Raise a :exc:`RuntimeError` if ``event`` has already occurred.
+ If the event has already occurred, the interrupt will be scheduled immediately.
"""
- pass
+ def interrupt_callback(event):
+ process = event.env.active_process
+ if process is not None:
+ process.interrupt((event, event.value))
+
+ if event.triggered:
+ if event.env.active_process:
+ event.env.active_process.interrupt((event, event.value))
+ else:
+ raise RuntimeError("Cannot subscribe to an already triggered event without an active process")
+ else:
+ event.callbacks.append(interrupt_callback)
+
+ def interrupt_callback(event):
+ process = event.env.active_process
+ if process is not None:
+ process.interrupt()
+
+ event.callbacks.append(interrupt_callback)
diff --git a/tests/test_process.py b/tests/test_process.py
index 8d9c908..4bc87b5 100644
--- a/tests/test_process.py
+++ b/tests/test_process.py
@@ -158,7 +158,8 @@ def test_interrupted_join_and_rejoin(env):
def test_error_and_interrupted_join(env):
def child_a(env, process):
- process.interrupt()
+ if process is not None:
+ process.interrupt()
return
yield # Dummy yield
diff --git a/tests/test_rt.py b/tests/test_rt.py
index c054134..796bf34 100644
--- a/tests/test_rt.py
+++ b/tests/test_rt.py
@@ -87,7 +87,7 @@ def test_rt_illegal_until():
env = RealtimeEnvironment()
with pytest.raises(
ValueError,
- match=r'until \(-1\) must be greater than the current simulation time',
+ match=r'until must be greater than the current simulation time',
):
env.run(-1)