Skip to content

back to SWE-Agent summary

SWE-Agent: simpy

Pytest Summary for test tests

status count
passed 83
failed 57
total 140
collected 150
deselected 10

Failed pytests:

test_condition.py::test_cond_with_uncaught_error

test_condition.py::test_cond_with_uncaught_error
env = 

    def test_cond_with_uncaught_error(env):
        """Errors that happen after the condition has been triggered will not be
        handled by the condition and cause the simulation to crash."""

        def explode(env, delay):
            yield env.timeout(delay)
            raise ValueError(f'Onoes, failed after {delay}!')

        def process(env):
            yield env.timeout(1) | env.process(explode(env, 2))

        env.process(process(env))
>       with pytest.raises(ValueError, match='Onoes, failed after'):
E       Failed: DID NOT RAISE 

tests/test_condition.py:127: Failed

test_condition.py::test_all_of_empty_list

test_condition.py::test_all_of_empty_list
env = 

    def test_all_of_empty_list(env):
        """AllOf with an empty list should immediately be triggered."""
        evt = env.all_of([])
>       assert evt.triggered
E       assert None
E        +  where None = .triggered

tests/test_condition.py:299: AssertionError

test_condition.py::test_any_of_empty_list

test_condition.py::test_any_of_empty_list
env = 

    def test_any_of_empty_list(env):
        """AnyOf with an empty list should immediately be triggered."""
        evt = env.any_of([])
>       assert evt.triggered
E       assert None
E        +  where None = .triggered

tests/test_condition.py:305: AssertionError

test_environment.py::test_event_queue_empty

test_environment.py::test_event_queue_empty
env = , log = []

    def test_event_queue_empty(env, log):
        """The simulation should stop if there are no more events, that means, no
        more active process."""

        def pem(env, log):
            while env.now < 2:
                log.append(env.now)
                yield env.timeout(1)

        env.process(pem(env, log))
        env.run(10)

>       assert log == [0, 1]
E       assert [] == [0, 1]
E         
E         Right contains 2 more items, first extra item: 0
E         Use -v to get more diff

tests/test_environment.py:21: AssertionError

test_environment.py::test_run_negative_until

test_environment.py::test_run_negative_until
env = 

    def test_run_negative_until(env):
        """Test passing a negative time to run."""
>       with pytest.raises(
            ValueError, match='must be greater than the current simulation time'
        ):
E       Failed: DID NOT RAISE 

tests/test_environment.py:26: Failed

test_environment.py::test_run_resume

test_environment.py::test_run_resume
env = 

    def test_run_resume(env):
        """Stopped simulation can be resumed."""
        events = [env.timeout(t) for t in (5, 10, 15)]

>       assert env.now == 0
E       assert None == 0
E        +  where None = .now

tests/test_environment.py:36: AssertionError

test_environment.py::test_run_until_value

test_environment.py::test_run_until_value
env = 

    def test_run_until_value(env):
        """Anything that can be converted to a float is a valid until value."""
        env.run(until='3.141592')
>       assert env.now == 3.141592
E       assert None == 3.141592
E        +  where None = .now

tests/test_environment.py:57: AssertionError

test_environment.py::test_run_with_processed_event

test_environment.py::test_run_with_processed_event
env = 

    def test_run_with_processed_event(env):
        """An already processed event may also be passed as until value."""
        timeout = env.timeout(1, value='spam')
>       assert env.run(until=timeout) == 'spam'
E       AssertionError: assert None == 'spam'
E        +  where None = run(until=)
E        +    where run = .run

tests/test_environment.py:63: AssertionError

test_environment.py::test_run_with_untriggered_event

test_environment.py::test_run_with_untriggered_event
env = 

    def test_run_with_untriggered_event(env):
>       excinfo = pytest.raises(RuntimeError, env.run, until=env.event())
E       Failed: DID NOT RAISE 

tests/test_environment.py:74: Failed

test_event.py::test_names

test_event.py::test_names
env = 

    def test_names(env):
        def pem():
            return
            yield

>       assert re.match(r'', str(env.event()))
E       AssertionError: assert None
E        +  where None = ('', '')
E        +    where  = re.match
E        +    and   '' = str()
E        +      where  = Event()
E        +        where Event = .event

tests/test_event.py:52: AssertionError

test_event.py::test_value

test_event.py::test_value
env = 

    def test_value(env):
        """After an event has been triggered, its value becomes accessible."""
        event = env.timeout(0, 'I am the value')

        env.run()

>       assert event.value == 'I am the value'
E       AssertionError: assert None == 'I am the value'
E        +  where None = .value

tests/test_event.py:74: AssertionError

test_event.py::test_unavailable_value

test_event.py::test_unavailable_value
env = 

    def test_unavailable_value(env):
        """If an event has not yet been triggered, its value is not available and
        trying to access it will result in a AttributeError."""
        event = env.event()

>       with pytest.raises(AttributeError, match='.* is not yet available$'):
E       Failed: DID NOT RAISE 

tests/test_event.py:82: Failed

test_event.py::test_triggered

test_event.py::test_triggered
env = 

    def test_triggered(env):
        def pem(env, event):
            value = yield event
            return value

        event = env.event()
        event.succeed('i was already done')

        result = env.run(env.process(pem(env, event)))

>       assert result == 'i was already done'
E       AssertionError: assert None == 'i was already done'

tests/test_event.py:96: AssertionError

test_event.py::test_condition_callback_removal

test_event.py::test_condition_callback_removal
env = 

    def test_condition_callback_removal(env):
        """A condition will remove all outstanding callbacks from its events."""
        a, b = env.event(), env.event()
        a.succeed()
        env.run(until=a | b)
        # The condition has removed its callback from event b.
>       assert not a.callbacks
E       assert not [>]
E        +  where [>] = .callbacks

tests/test_event.py:118: AssertionError

test_event.py::test_condition_nested_callback_removal

test_event.py::test_condition_nested_callback_removal
env = 

    def test_condition_nested_callback_removal(env):
        """A condition will remove all outstanding callbacks from its events (even
        if nested)."""
        a, b, c = env.event(), env.event(), env.event()
        b_and_c = b & c
        a_or_b_and_c = a | b_and_c
        a.succeed()
        env.run(until=a_or_b_and_c)
        # Callbacks from nested conditions are also removed.
>       assert not a.callbacks
E       assert not [>]
E        +  where [>] = .callbacks

tests/test_event.py:131: AssertionError

test_exceptions.py::test_no_parent_process

test_exceptions.py::test_no_parent_process
env = 

    def test_no_parent_process(env):
        """Exceptions should be normally raised if there are no processes waiting
        for the one that raises something.

        """

        def child(env):
            raise ValueError('Onoes!')
            yield env.timeout(1)

        def parent(env):
            try:
                env.process(child(env))
                yield env.timeout(1)
            except Exception as err:
                pytest.fail(f'There should be no error ({err}).')

        env.process(parent(env))
>       with pytest.raises(ValueError, match='Onoes!'):
E       Failed: DID NOT RAISE 

tests/test_exceptions.py:49: Failed

test_exceptions.py::test_exception_chaining

test_exceptions.py::test_exception_chaining
env = 

    def test_exception_chaining(env):
        """Unhandled exceptions pass through the entire event stack. This must be
        visible in the stacktrace of the exception.

        """

        def child(env):
            yield env.timeout(1)
            raise RuntimeError('foo')

        def parent(env):
            child_proc = env.process(child(env))
            yield child_proc

        def grandparent(env):
            parent_proc = env.process(parent(env))
            yield parent_proc

        env.process(grandparent(env))
        try:
            env.run()
>           pytest.fail('There should have been an exception')
E           Failed: There should have been an exception

tests/test_exceptions.py:93: Failed

test_exceptions.py::test_invalid_event

test_exceptions.py::test_invalid_event
env = 

    def test_invalid_event(env):
        """Invalid yield values will cause the simulation to fail."""

        def root(_):
            yield None

        env.process(root(env))
>       with pytest.raises(RuntimeError, match='Invalid yield value "None"'):
E       Failed: DID NOT RAISE 

tests/test_exceptions.py:150: Failed

test_exceptions.py::test_exception_handling

test_exceptions.py::test_exception_handling
env = 

    def test_exception_handling(env):
        """If failed events are not defused (which is the default) the simulation
        crashes."""

        event = env.event()
        event.fail(RuntimeError())
>       with pytest.raises(RuntimeError):
E       Failed: DID NOT RAISE 

tests/test_exceptions.py:160: Failed

test_exceptions.py::test_callback_exception_handling

test_exceptions.py::test_callback_exception_handling
env = 

    def test_callback_exception_handling(env):
        """Callbacks of events may handle exception by setting the ``defused``
        attribute of ``event`` to ``True``."""

        def callback(event):
            event.defused = True

        event = env.event()
        event.callbacks.append(callback)
        event.fail(RuntimeError())
        assert not event.defused, 'Event has been defused immediately'
        env.run(until=1)
>       assert event.defused, 'Event has not been defused'
E       AssertionError: Event has not been defused
E       assert None
E        +  where None = .defused

tests/test_exceptions.py:176: AssertionError

test_exceptions.py::test_process_exception_handling

test_exceptions.py::test_process_exception_handling
env = 

    def test_process_exception_handling(env):
        """Processes can't ignore failed events and auto-handle exceptions."""

        def pem(_, event):
            try:
                yield event
                pytest.fail('Hey, the event should fail!')
            except RuntimeError:
                pass

        event = env.event()
        env.process(pem(env, event))
        event.fail(RuntimeError())

        assert not event.defused, 'Event has been defused immediately'
        env.run(until=1)
>       assert event.defused, 'Event has not been defused'
E       AssertionError: Event has not been defused
E       assert None
E        +  where None = .defused

tests/test_exceptions.py:195: AssertionError

test_interrupts.py::test_concurrent_interrupts

test_interrupts.py::test_concurrent_interrupts
env = , log = []

    def test_concurrent_interrupts(env, log):
        """Concurrent interrupts are scheduled in the order in which they
        occurred.

        """

        def fox(env, log):
            while True:
                try:
                    yield env.timeout(10)
                except simpy.Interrupt as interrupt:
                    log.append((env.now, interrupt.cause))

        def farmer(env, name, fox):
            fox.interrupt(name)
            yield env.timeout(1)

        fantastic_mr_fox = env.process(fox(env, log))
        for name in ('boggis', 'bunce', 'beans'):
            env.process(farmer(env, name, fantastic_mr_fox))

        env.run(20)
>       assert log == [(0, 'boggis'), (0, 'bunce'), (0, 'beans')]
E       AssertionError: assert [] == [(0, 'boggis'... (0, 'beans')]
E         
E         Right contains 3 more items, first extra item: (0, 'boggis')
E         Use -v to get more diff

tests/test_interrupts.py:50: AssertionError

test_interrupts.py::test_concurrent_interrupts_and_events

test_interrupts.py::test_concurrent_interrupts_and_events
env = , log = []

    def test_concurrent_interrupts_and_events(env, log):
        """Interrupts interrupt a process while waiting for an event. Even if the
        event has happened concurrently with the interrupt."""

        def fox(env, coup, log):
            while True:
                try:
                    yield coup
                    log.append(f'coup completed at {env.now}')
                except simpy.Interrupt:
                    log.append(f'coup interrupted at {env.now}')
                else:
                    return

        def master_plan(env, fox, coup):
            yield env.timeout(1)
            # Succeed and interrupt concurrently.
            coup.succeed()
            fox.interrupt()

        coup = env.event()
        fantastic_mr_fox = env.process(fox(env, coup, log))
        env.process(master_plan(env, fantastic_mr_fox, coup))

        env.run(5)
>       assert log == ['coup interrupted at 1', 'coup completed at 1']
E       AssertionError: assert [] == ['coup interr...mpleted at 1']
E         
E         Right contains 2 more items, first extra item: 'coup interrupted at 1'
E         Use -v to get more diff

tests/test_interrupts.py:78: AssertionError

test_interrupts.py::test_immediate_interrupt

test_interrupts.py::test_immediate_interrupt
env = , log = []

    def test_immediate_interrupt(env, log):
        """Processes are immediately interruptable."""

        def child(env, log):
            try:
                yield env.event()
            except simpy.Interrupt:
                log.append(env.now)

        def parent(env, log):
            child_proc = env.process(child(env, log))
            child_proc.interrupt()
            return
            yield

        env.process(parent(env, log))
        env.run()

        # Confirm that child has been interrupted immediately at timestep 0.
>       assert log == [0]
E       assert [] == [0]
E         
E         Right contains one more item: 0
E         Use -v to get more diff

tests/test_interrupts.py:182: AssertionError

test_process.py::test_target

test_process.py::test_target
env = 

    def test_target(env):
        def pem(env, event):
            yield event

        event = env.timeout(5)
        proc = env.process(pem(env, event))

        # Wait until "proc" is initialized and yielded the event
>       while env.peek() < 5:
E       TypeError: '<' not supported between instances of 'NoneType' and 'int'

tests/test_process.py:47: TypeError

test_process.py::test_error_and_interrupted_join

test_process.py::test_error_and_interrupted_join
env = 

    def test_error_and_interrupted_join(env):
        def child_a(env, process):
            process.interrupt()
            return
            yield  # Dummy yield

        def child_b(env):
            raise AttributeError('spam')
            yield  # Dummy yield

        def parent(env):
            env.process(child_a(env, env.active_process))
            b = env.process(child_b(env))

            try:
                yield b
            # This interrupt unregisters me from b so I won't receive its
            # AttributeError
            except Interrupt:
                pass

            yield env.timeout(0)

        env.process(parent(env))
>       pytest.raises(AttributeError, env.run)
E       Failed: DID NOT RAISE 

tests/test_process.py:183: Failed

test_resources.py::test_resource

test_resources.py::test_resource
env = , log = []

    def test_resource(env, log):
        """A *resource* is something with a limited numer of slots that need
        to be requested before and released after the usage (e.g., gas pumps
        at a gas station).

        """

        def pem(env, name, resource, log):
            req = resource.request()
            yield req
            assert resource.count == 1

            yield env.timeout(1)
            resource.release(req)

            log.append((name, env.now))

        resource = simpy.Resource(env, capacity=1)
>       assert resource.capacity == 1
E       assert None == 1
E        +  where None = .capacity

tests/test_resources.py:34: AssertionError

test_resources.py::test_resource_context_manager

test_resources.py::test_resource_context_manager
env = , log = []

    def test_resource_context_manager(env, log):
        """The event that ``Resource.request()`` returns can be used as
        Context Manager."""

        def pem(env, name, resource, log):
            with resource.request() as request:
                yield request
                yield env.timeout(1)

            log.append((name, env.now))

        resource = simpy.Resource(env, capacity=1)
        env.process(pem(env, 'a', resource, log))
        env.process(pem(env, 'b', resource, log))
        env.run()

>       assert log == [('a', 1), ('b', 2)]
E       AssertionError: assert [] == [('a', 1), ('b', 2)]
E         
E         Right contains 2 more items, first extra item: ('a', 1)
E         Use -v to get more diff

tests/test_resources.py:64: AssertionError

test_resources.py::test_resource_slots

test_resources.py::test_resource_slots
env = , log = []

    def test_resource_slots(env, log):
        def pem(env, name, resource, log):
            with resource.request() as req:
                yield req
                log.append((name, env.now))
                yield env.timeout(1)

        resource = simpy.Resource(env, capacity=3)
        for i in range(9):
            env.process(pem(env, str(i), resource, log))
        env.run()

>       assert log == [
            ('0', 0),
            ('1', 0),
            ('2', 0),
            ('3', 1),
            ('4', 1),
            ('5', 1),
            ('6', 2),
            ('7', 2),
            ('8', 2),
        ]
E       AssertionError: assert [] == [('0', 0), ('...('5', 1), ...]
E         
E         Right contains 9 more items, first extra item: ('0', 0)
E         Use -v to get more diff

tests/test_resources.py:79: AssertionError

test_resources.py::test_resource_cm_exception

test_resources.py::test_resource_cm_exception
env = , log = []

    def test_resource_cm_exception(env, log):
        """Resource with context manager receives an exception."""

        def process(env, resource, log, raise_):
            with resource.request() as req:
                yield req
                yield env.timeout(1)
                log.append(env.now)
                if raise_:
                    with pytest.raises(ValueError, match='Foo'):
                        raise ValueError('Foo')

        resource = simpy.Resource(env, 1)
        env.process(process(env, resource, log, True))
        # The second process is used to check if it was able to access the
        # resource:
        env.process(process(env, resource, log, False))
        env.run()

>       assert log == [1, 2]
E       assert [] == [1, 2]
E         
E         Right contains 2 more items, first extra item: 1
E         Use -v to get more diff

tests/test_resources.py:201: AssertionError

test_resources.py::test_get_users

test_resources.py::test_get_users
env = 

    def test_get_users(env):
        def process(env, resource):
            with resource.request() as req:
                yield req
                yield env.timeout(1)

        resource = simpy.Resource(env, 1)
        procs = [env.process(process(env, resource)) for _ in range(3)]
        env.run(until=1)
>       assert [evt.proc for evt in resource.users] == procs[0:1]
E       assert [] == []
E         
E         Right contains one more item: 
E         Use -v to get more diff

tests/test_resources.py:261: AssertionError

test_resources.py::test_mixed_preemption

test_resources.py::test_mixed_preemption
env = , log = []

    def test_mixed_preemption(env, log):
        def p(id, env, res, delay, prio, preempt, log):
            yield env.timeout(delay)
            with res.request(priority=prio, preempt=preempt) as req:
                try:
                    yield req
                    yield env.timeout(2)
                    log.append((env.now, id))
                except simpy.Interrupt as ir:
                    assert ir is not None  # noqa: PT017
                    assert isinstance(ir.cause, Preempted)  # noqa: PT017
                    log.append((env.now, id, (ir.cause.by, ir.cause.usage_since)))

        res = simpy.PreemptiveResource(env, 1)
        # p0: First user:
        env.process(p(0, env, res, delay=0, prio=2, preempt=True, log=log))
        # p1: Waits (cannot preempt):
        env.process(p(1, env, res, delay=0, prio=2, preempt=True, log=log))
        # p2: Waits later, but has a higher prio:
        env.process(p(2, env, res, delay=1, prio=1, preempt=False, log=log))
        # p3: Preempt the above proc:
        p3 = env.process(p(3, env, res, delay=3, prio=0, preempt=True, log=log))
        # p4: Wait again:
        env.process(p(4, env, res, delay=4, prio=3, preempt=True, log=log))

        env.run()

>       assert log == [
            (2, 0),  # p0 done
            (3, 2, (p3, 2)),  # p2 got it next, but got interrupted by p3
            (5, 3),  # p3 done
            (7, 1),  # p1 done (finally got the resource)
            (9, 4),  # p4 done
        ]
E       assert [] == [(2, 0), (3, ...7, 1), (9, 4)]
E         
E         Right contains 5 more items, first extra item: (2, 0)
E         Use -v to get more diff

tests/test_resources.py:345: AssertionError

test_resources.py::test_nested_preemption

test_resources.py::test_nested_preemption
env = , log = []

    def test_nested_preemption(env, log):
        def process(id, env, res, delay, prio, preempt, log):
            yield env.timeout(delay)
            with res.request(priority=prio, preempt=preempt) as req:
                try:
                    yield req
                    yield env.timeout(5)
                    log.append((env.now, id))
                except simpy.Interrupt as ir:
                    assert isinstance(ir.cause, Preempted)  # noqa: PT017
                    log.append((env.now, id, (ir.cause.by, ir.cause.usage_since)))

        def process2(id, env, res0, res1, delay, prio, preempt, log):
            yield env.timeout(delay)
            with res0.request(priority=prio, preempt=preempt) as req0:
                try:
                    yield req0
                    with res1.request(priority=prio, preempt=preempt) as req1:
                        try:
                            yield req1
                            yield env.timeout(5)
                            log.append((env.now, id))
                        except simpy.Interrupt as ir:
                            assert isinstance(ir.cause, Preempted)  # noqa: PT017
                            log.append(
                                (
                                    env.now,
                                    id,
                                    (ir.cause.by, ir.cause.usage_since, ir.cause.resource),
                                )
                            )
                except simpy.Interrupt as ir:
                    assert isinstance(ir.cause, Preempted)  # noqa: PT017
                    log.append(
                        (
                            env.now,
                            id,
                            (ir.cause.by, ir.cause.usage_since, ir.cause.resource),
                        )
                    )

        res0 = simpy.PreemptiveResource(env, 1)
        res1 = simpy.PreemptiveResource(env, 1)

        env.process(process2(0, env, res0, res1, 0, -1, True, log))
        p1 = env.process(process(1, env, res1, 1, -2, True, log))

        env.process(process2(2, env, res0, res1, 20, -1, True, log))
        p3 = env.process(process(3, env, res0, 21, -2, True, log))

        env.process(process2(4, env, res0, res1, 21, -1, True, log))

        env.run()

>       assert log == [
            (1, 0, (p1, 0, res1)),
            (6, 1),
            (21, 2, (p3, 20, res0)),
            (26, 3),
            (31, 4),
        ]
E       assert [] == [(1, 0, (, 0, ))
E         Use -v to get more diff

tests/test_resources.py:408: AssertionError

test_resources.py::test_container

test_resources.py::test_container
env = , log = []

    def test_container(env, log):
        """A *container* is a resource (of optionally limited capacity) where
        you can put in our take-out a discrete or continuous amount of
        things (e.g., a box of lump sugar or a can of milk).  The *put* and
        *get* operations block if the buffer is to full or to empty. If they
        return, the process knows that the *put* or *get* operation was
        successful.

        """

        def putter(env, buf, log):
            yield env.timeout(1)
            while True:
                yield buf.put(2)
                log.append(('p', env.now))
                yield env.timeout(1)

        def getter(env, buf, log):
            yield buf.get(1)
            log.append(('g', env.now))

            yield env.timeout(1)
            yield buf.get(1)
            log.append(('g', env.now))

        buf = simpy.Container(env, init=0, capacity=2)
        env.process(putter(env, buf, log))
        env.process(getter(env, buf, log))
        env.run(until=5)

>       assert log == [('p', 1), ('g', 1), ('g', 2), ('p', 2)]
E       AssertionError: assert [] == [('p', 1), ('... 2), ('p', 2)]
E         
E         Right contains 4 more items, first extra item: ('p', 1)
E         Use -v to get more diff

tests/test_resources.py:452: AssertionError

test_resources.py::test_container_get_queued

test_resources.py::test_container_get_queued
env = 

    def test_container_get_queued(env):
        def proc(env, wait, container, what):
            yield env.timeout(wait)
            with getattr(container, what)(1) as req:
                yield req

        container = simpy.Container(env, 1)
        p0 = env.process(proc(env, 0, container, 'get'))
        env.process(proc(env, 1, container, 'put'))
        env.process(proc(env, 1, container, 'put'))
        p3 = env.process(proc(env, 1, container, 'put'))

        env.run(until=1)
        assert [ev.proc for ev in container.put_queue] == []
>       assert [ev.proc for ev in container.get_queue] == [p0]
E       assert [] == []
E         
E         Right contains one more item: 
E         Use -v to get more diff

tests/test_resources.py:469: AssertionError

test_resources.py::test_initial_container_capacity

test_resources.py::test_initial_container_capacity
env = 

    def test_initial_container_capacity(env):
        container = simpy.Container(env)
>       assert container.capacity == float('inf')
E       AssertionError: assert None == inf
E        +  where None = .capacity
E        +  and   inf = float('inf')

tests/test_resources.py:478: AssertionError

test_resources.py::test_initial_store_capacity[Store]

test_resources.py::test_initial_store_capacity[Store]
env = 
store_type = 

    @pytest.mark.parametrize(
        'store_type',
        [
            simpy.Store,
            simpy.FilterStore,
        ],
    )
    def test_initial_store_capacity(env, store_type):
        store = store_type(env)
>       assert store.capacity == float('inf')
E       AssertionError: assert None == inf
E        +  where None = .capacity
E        +  and   inf = float('inf')

tests/test_resources.py:547: AssertionError

test_resources.py::test_initial_store_capacity[FilterStore]

test_resources.py::test_initial_store_capacity[FilterStore]
env = 
store_type = 

    @pytest.mark.parametrize(
        'store_type',
        [
            simpy.Store,
            simpy.FilterStore,
        ],
    )
    def test_initial_store_capacity(env, store_type):
        store = store_type(env)
>       assert store.capacity == float('inf')
E       AssertionError: assert None == inf
E        +  where None = .capacity
E        +  and   inf = float('inf')

tests/test_resources.py:547: AssertionError

test_resources.py::test_store_capacity

test_resources.py::test_store_capacity
env = 

    def test_store_capacity(env):
        with pytest.raises(ValueError, match='"capacity" must be > 0'):
            simpy.Store(env, 0)
        with pytest.raises(ValueError, match='"capacity" must be > 0'):
            simpy.Store(env, -1)

        capacity = 2
        store = simpy.Store(env, capacity)
        env.process(store.put(i) for i in range(capacity + 1))
        env.run()

        # Ensure store is filled to capacity
>       assert len(store.items) == capacity
E       assert 0 == 2
E        +  where 0 = len([])
E        +    where [] = .items

tests/test_resources.py:562: AssertionError

test_resources.py::test_priority_store_item_priority

test_resources.py::test_priority_store_item_priority
env = 

    def test_priority_store_item_priority(env):
        pstore = simpy.PriorityStore(env, 3)
        log = []

        def getter(wait):
            yield env.timeout(wait)
            item = yield pstore.get()
            log.append(item)

        # Do not specify priority; the items themselves will be compared to
        # determine priority.
        env.process(pstore.put(s) for s in 'bcadefg')
        env.process(getter(1))
        env.process(getter(2))
        env.process(getter(3))
        env.run()
>       assert log == ['a', 'b', 'c']
E       AssertionError: assert [] == ['a', 'b', 'c']
E         
E         Right contains 3 more items, first extra item: 'a'
E         Use -v to get more diff

tests/test_resources.py:593: AssertionError

test_resources.py::test_priority_store_stable_order

test_resources.py::test_priority_store_stable_order
env = 

    def test_priority_store_stable_order(env):
        pstore = simpy.PriorityStore(env, 3)
        log = []

        def getter(wait):
            yield env.timeout(wait)
            _, item = yield pstore.get()
            log.append(item)

        items = [object() for _ in range(3)]

        # Unorderable items are inserted with same priority.
        env.process(pstore.put(simpy.PriorityItem(0, item)) for item in items)
        env.process(getter(1))
        env.process(getter(2))
        env.process(getter(3))
        env.run()

        # Since the priorities were the same for all items, ensure that items are
        # retrieved in insertion order.
>       assert log == items
E       assert [] == []
E         
E         Right contains 3 more items, first extra item: 
E         Use -v to get more diff

tests/test_resources.py:616: AssertionError


test_resources.py::test_filter_calls_best_case

test_resources.py::test_filter_calls_best_case
env = 

    def test_filter_calls_best_case(env):
        """The filter function is called every item in the store until a match is
        found. In the best case the first item already matches."""
        log = []

        def log_filter(item):
            log.append(f'check {item}')
            return True

        store = simpy.FilterStore(env)
        store.items = [1, 2, 3]

        def getter(store):
            log.append(f'get {yield store.get(log_filter)}')
            log.append(f'get {yield store.get(log_filter)}')
            log.append(f'get {yield store.get(log_filter)}')

        env.process(getter(store))
        env.run()

>       assert log == ['check 1', 'get 1', 'check 2', 'get 2', 'check 3', 'get 3']
E       AssertionError: assert [] == ['check 1', '...k 3', 'get 3']
E         
E         Right contains 6 more items, first extra item: 'check 1'
E         Use -v to get more diff

tests/test_resources.py:686: AssertionError

test_resources.py::test_filter_calls_worst_case

test_resources.py::test_filter_calls_worst_case
env = 

    def test_filter_calls_worst_case(env):
        """In the worst case the filter function is being called for items multiple
        times."""

        log = []
        store = simpy.FilterStore(env)

        def putter(store):
            for i in range(4):
                log.append(f'put {i}')
                yield store.put(i)

        def log_filter(item):
            log.append(f'check {item}')
            return item >= 3

        def getter(store):
            log.append(f'get {yield store.get(log_filter)}')

        env.process(getter(store))
        env.process(putter(store))
        env.run()

        # The filter function is repeatedly called for every item in the store
        # until a match is found.
        # fmt: off
>       assert log == [
            'put 0', 'check 0',
            'put 1', 'check 0', 'check 1',
            'put 2', 'check 0', 'check 1', 'check 2',
            'put 3', 'check 0', 'check 1', 'check 2', 'check 3', 'get 3',
        ]
E       AssertionError: assert [] == ['put 0', 'ch... 'put 2', ...]
E         
E         Right contains 15 more items, first extra item: 'put 0'
E         Use -v to get more diff

tests/test_resources.py:715: AssertionError

test_resources.py::test_immediate_put_request

test_resources.py::test_immediate_put_request
env = 

    def test_immediate_put_request(env):
        """Put requests that can be fulfilled immediately do not enter the put
        queue."""
        resource = simpy.Resource(env, capacity=1)
        assert len(resource.users) == 0
        assert len(resource.queue) == 0

        # The resource is empty, the first request will succeed immediately without
        # entering the queue.
        request = resource.request()
>       assert request.triggered
E       assert None
E        +  where None = .triggered

tests/test_resources.py:734: AssertionError

test_resources.py::test_immediate_get_request

test_resources.py::test_immediate_get_request
env = 

    def test_immediate_get_request(env):
        """Get requests that can be fulfilled immediately do not enter the get
        queue."""
        container = simpy.Container(env)
        # Put something in the container, this request is triggered immediately
        # without entering the queue.
        request = container.put(1)
>       assert request.triggered
E       assert None
E        +  where None = .triggered

tests/test_resources.py:752: AssertionError

test_rt.py::test_rt[0.1]

test_rt.py::test_rt[0.1]
log = [], factor = 0.1

    @pytest.mark.parametrize('factor', [0.1, 0.05, 0.15])
    def test_rt(log, factor):
        """Basic tests for run()."""
        start = monotonic()
        env = RealtimeEnvironment(factor=factor)
        env.process(process(env, log, 0.01, 1))
        env.process(process(env, log, 0.02, 1))

        env.run(2)
        duration = monotonic() - start

>       assert check_duration(duration, 2 * factor)
E       assert False
E        +  where False = check_duration(1.58820000000226e-05, (2 * 0.1))

tests/test_rt.py:35: AssertionError

test_rt.py::test_rt[0.05]

test_rt.py::test_rt[0.05]
log = [], factor = 0.05

    @pytest.mark.parametrize('factor', [0.1, 0.05, 0.15])
    def test_rt(log, factor):
        """Basic tests for run()."""
        start = monotonic()
        env = RealtimeEnvironment(factor=factor)
        env.process(process(env, log, 0.01, 1))
        env.process(process(env, log, 0.02, 1))

        env.run(2)
        duration = monotonic() - start

>       assert check_duration(duration, 2 * factor)
E       assert False
E        +  where False = check_duration(1.3072000000668993e-05, (2 * 0.05))

tests/test_rt.py:35: AssertionError

test_rt.py::test_rt[0.15]

test_rt.py::test_rt[0.15]
log = [], factor = 0.15

    @pytest.mark.parametrize('factor', [0.1, 0.05, 0.15])
    def test_rt(log, factor):
        """Basic tests for run()."""
        start = monotonic()
        env = RealtimeEnvironment(factor=factor)
        env.process(process(env, log, 0.01, 1))
        env.process(process(env, log, 0.02, 1))

        env.run(2)
        duration = monotonic() - start

>       assert check_duration(duration, 2 * factor)
E       assert False
E        +  where False = check_duration(1.1543000000280301e-05, (2 * 0.15))

tests/test_rt.py:35: AssertionError

test_rt.py::test_rt_multiple_call

test_rt.py::test_rt_multiple_call
log = []

    def test_rt_multiple_call(log):
        """Test multiple calls to run()."""
        start = monotonic()
        env = RealtimeEnvironment(factor=0.05)

        env.process(process(env, log, 0.01, 2))
        env.process(process(env, log, 0.01, 3))

        env.run(5)
        duration = monotonic() - start

        # assert almost_equal(duration, 0.2)
>       assert check_duration(duration, 5 * 0.05)
E       assert False
E        +  where False = check_duration(1.1999999999900979e-05, (5 * 0.05))

tests/test_rt.py:51: AssertionError

test_rt.py::test_rt_slow_sim_default_behavior

test_rt.py::test_rt_slow_sim_default_behavior
log = []

    def test_rt_slow_sim_default_behavior(log):
        """By default, SimPy should raise an error if a simulation is too
        slow for the selected real-time factor."""
        env = RealtimeEnvironment(factor=0.05)
        env.process(process(env, log, 0.1, 1))

>       err = pytest.raises(RuntimeError, env.run, 3)
E       Failed: DID NOT RAISE 

tests/test_rt.py:67: Failed

test_rt.py::test_rt_slow_sim_no_error

test_rt.py::test_rt_slow_sim_no_error
log = []

    def test_rt_slow_sim_no_error(log):
        """Test ignoring slow simulations."""
        start = monotonic()
        env = RealtimeEnvironment(factor=0.05, strict=False)
        env.process(process(env, log, 0.1, 1))

        env.run(2)
        duration = monotonic() - start

>       assert check_duration(duration, 2 * 0.1)
E       assert False
E        +  where False = check_duration(9.179000000081317e-06, (2 * 0.1))

tests/test_rt.py:81: AssertionError

test_rt.py::test_rt_illegal_until

test_rt.py::test_rt_illegal_until
def test_rt_illegal_until():
        """Test illegal value for *until*."""
        env = RealtimeEnvironment()
>       with pytest.raises(
            ValueError,
            match=r'until \(-1\) must be greater than the current simulation time',
        ):
E       Failed: DID NOT RAISE 

tests/test_rt.py:88: Failed

test_rt.py::test_run_with_untriggered_event

test_rt.py::test_run_with_untriggered_event
env = 

    def test_run_with_untriggered_event(env):
        env = RealtimeEnvironment(factor=0.05)
>       excinfo = pytest.raises(RuntimeError, env.run, until=env.event())
E       Failed: DID NOT RAISE 

tests/test_rt.py:106: Failed

test_timeout.py::test_discrete_time_steps

test_timeout.py::test_discrete_time_steps
env = , log = []

    def test_discrete_time_steps(env, log):
        """envple envulation with discrete time steps."""

        def pem(env, log):
            while True:
                log.append(env.now)
                yield env.timeout(delay=1)

        env.process(pem(env, log))
        env.run(until=3)

>       assert log == [0, 1, 2]
E       assert [] == [0, 1, 2]
E         
E         Right contains 3 more items, first extra item: 0
E         Use -v to get more diff

tests/test_timeout.py:20: AssertionError

test_timeout.py::test_negative_timeout

test_timeout.py::test_negative_timeout
env = 

    def test_negative_timeout(env):
        """Don't allow negative timeout times."""

        def pem(env):
            yield env.timeout(-1)

        env.process(pem(env))
>       with pytest.raises(ValueError, match='Negative delay'):
E       Failed: DID NOT RAISE 

tests/test_timeout.py:30: Failed

test_timeout.py::test_shared_timeout

test_timeout.py::test_shared_timeout
env = , log = []

    def test_shared_timeout(env, log):
        def child(env, timeout, id, log):
            yield timeout
            log.append((id, env.now))

        timeout = env.timeout(1)
        for i in range(3):
            env.process(child(env, timeout, i, log))

        env.run()
>       assert log == [(0, 1), (1, 1), (2, 1)]
E       assert [] == [(0, 1), (1, 1), (2, 1)]
E         
E         Right contains 3 more items, first extra item: (0, 1)
E         Use -v to get more diff

tests/test_timeout.py:61: AssertionError

test_util.py::test_start_delayed_error

test_util.py::test_start_delayed_error
env = 

    def test_start_delayed_error(env):
        """Check if delayed() raises an error if you pass a negative dt."""

        def pem(env):
            yield env.timeout(1)

>       with pytest.raises(ValueError, match='delay.*must be > 0'):
E       Failed: DID NOT RAISE 

tests/test_util.py:27: Failed

Patch diff