Skip to content

back to Claude Sonnet 3.5 - Fill-in summary

Claude Sonnet 3.5 - Fill-in: portalocker

Pytest Summary for test portalocker_tests

status count
failed 20
passed 20
total 40
collected 40

Failed pytests:

temporary_file_lock.py::test_temporary_file_lock

temporary_file_lock.py::test_temporary_file_lock
tmpfile = '/tmp/pytest-of-root/pytest-0/test_temporary_file_lock0/1572542919651092'

    def test_temporary_file_lock(tmpfile):
        with portalocker.TemporaryFileLock(tmpfile):
            pass

>       assert not os.path.isfile(tmpfile)
E       AssertionError: assert not True
E        +  where True = ('/tmp/pytest-of-root/pytest-0/test_temporary_file_lock0/1572542919651092')
E        +    where  = .isfile
E        +      where  = os.path

portalocker_tests/temporary_file_lock.py:10: AssertionError

test_semaphore.py::test_bounded_semaphore[None-None]

test_semaphore.py::test_bounded_semaphore[None-None]
timeout = None, check_interval = None
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x7fb5d4143a60>

    @pytest.mark.parametrize('timeout', [None, 0, 0.001])
    @pytest.mark.parametrize('check_interval', [None, 0, 0.0005])
    def test_bounded_semaphore(timeout, check_interval, monkeypatch):
        n = 2
        name: str = str(random.random())
        monkeypatch.setattr(utils, 'DEFAULT_TIMEOUT', 0.0001)
        monkeypatch.setattr(utils, 'DEFAULT_CHECK_INTERVAL', 0.0005)

        semaphore_a = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)
        semaphore_b = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)
        semaphore_c = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)

>       semaphore_a.acquire(timeout=timeout)
E       AttributeError: 'BoundedSemaphore' object has no attribute 'acquire'

portalocker_tests/test_semaphore.py:21: AttributeError

test_semaphore.py::test_bounded_semaphore[None-0]

test_semaphore.py::test_bounded_semaphore[None-0]
timeout = 0, check_interval = None
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x7fb5d73c89d0>

    @pytest.mark.parametrize('timeout', [None, 0, 0.001])
    @pytest.mark.parametrize('check_interval', [None, 0, 0.0005])
    def test_bounded_semaphore(timeout, check_interval, monkeypatch):
        n = 2
        name: str = str(random.random())
        monkeypatch.setattr(utils, 'DEFAULT_TIMEOUT', 0.0001)
        monkeypatch.setattr(utils, 'DEFAULT_CHECK_INTERVAL', 0.0005)

        semaphore_a = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)
        semaphore_b = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)
        semaphore_c = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)

>       semaphore_a.acquire(timeout=timeout)
E       AttributeError: 'BoundedSemaphore' object has no attribute 'acquire'

portalocker_tests/test_semaphore.py:21: AttributeError

test_semaphore.py::test_bounded_semaphore[None-0.001]

test_semaphore.py::test_bounded_semaphore[None-0.001]
timeout = 0.001, check_interval = None
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x7fb5d41893c0>

    @pytest.mark.parametrize('timeout', [None, 0, 0.001])
    @pytest.mark.parametrize('check_interval', [None, 0, 0.0005])
    def test_bounded_semaphore(timeout, check_interval, monkeypatch):
        n = 2
        name: str = str(random.random())
        monkeypatch.setattr(utils, 'DEFAULT_TIMEOUT', 0.0001)
        monkeypatch.setattr(utils, 'DEFAULT_CHECK_INTERVAL', 0.0005)

        semaphore_a = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)
        semaphore_b = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)
        semaphore_c = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)

>       semaphore_a.acquire(timeout=timeout)
E       AttributeError: 'BoundedSemaphore' object has no attribute 'acquire'

portalocker_tests/test_semaphore.py:21: AttributeError

test_semaphore.py::test_bounded_semaphore[0-None]

test_semaphore.py::test_bounded_semaphore[0-None]
timeout = None, check_interval = 0
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x7fb5d73cb0a0>

    @pytest.mark.parametrize('timeout', [None, 0, 0.001])
    @pytest.mark.parametrize('check_interval', [None, 0, 0.0005])
    def test_bounded_semaphore(timeout, check_interval, monkeypatch):
        n = 2
        name: str = str(random.random())
        monkeypatch.setattr(utils, 'DEFAULT_TIMEOUT', 0.0001)
        monkeypatch.setattr(utils, 'DEFAULT_CHECK_INTERVAL', 0.0005)

        semaphore_a = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)
        semaphore_b = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)
        semaphore_c = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)

>       semaphore_a.acquire(timeout=timeout)
E       AttributeError: 'BoundedSemaphore' object has no attribute 'acquire'

portalocker_tests/test_semaphore.py:21: AttributeError

test_semaphore.py::test_bounded_semaphore[0-0]

test_semaphore.py::test_bounded_semaphore[0-0]
timeout = 0, check_interval = 0
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x7fb5d76479d0>

    @pytest.mark.parametrize('timeout', [None, 0, 0.001])
    @pytest.mark.parametrize('check_interval', [None, 0, 0.0005])
    def test_bounded_semaphore(timeout, check_interval, monkeypatch):
        n = 2
        name: str = str(random.random())
        monkeypatch.setattr(utils, 'DEFAULT_TIMEOUT', 0.0001)
        monkeypatch.setattr(utils, 'DEFAULT_CHECK_INTERVAL', 0.0005)

        semaphore_a = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)
        semaphore_b = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)
        semaphore_c = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)

>       semaphore_a.acquire(timeout=timeout)
E       AttributeError: 'BoundedSemaphore' object has no attribute 'acquire'

portalocker_tests/test_semaphore.py:21: AttributeError

test_semaphore.py::test_bounded_semaphore[0-0.001]

test_semaphore.py::test_bounded_semaphore[0-0.001]
timeout = 0.001, check_interval = 0
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x7fb5d426ef50>

    @pytest.mark.parametrize('timeout', [None, 0, 0.001])
    @pytest.mark.parametrize('check_interval', [None, 0, 0.0005])
    def test_bounded_semaphore(timeout, check_interval, monkeypatch):
        n = 2
        name: str = str(random.random())
        monkeypatch.setattr(utils, 'DEFAULT_TIMEOUT', 0.0001)
        monkeypatch.setattr(utils, 'DEFAULT_CHECK_INTERVAL', 0.0005)

        semaphore_a = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)
        semaphore_b = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)
        semaphore_c = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)

>       semaphore_a.acquire(timeout=timeout)
E       AttributeError: 'BoundedSemaphore' object has no attribute 'acquire'

portalocker_tests/test_semaphore.py:21: AttributeError

test_semaphore.py::test_bounded_semaphore[0.0005-None]

test_semaphore.py::test_bounded_semaphore[0.0005-None]
timeout = None, check_interval = 0.0005
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x7fb5d7646d70>

    @pytest.mark.parametrize('timeout', [None, 0, 0.001])
    @pytest.mark.parametrize('check_interval', [None, 0, 0.0005])
    def test_bounded_semaphore(timeout, check_interval, monkeypatch):
        n = 2
        name: str = str(random.random())
        monkeypatch.setattr(utils, 'DEFAULT_TIMEOUT', 0.0001)
        monkeypatch.setattr(utils, 'DEFAULT_CHECK_INTERVAL', 0.0005)

        semaphore_a = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)
        semaphore_b = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)
        semaphore_c = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)

>       semaphore_a.acquire(timeout=timeout)
E       AttributeError: 'BoundedSemaphore' object has no attribute 'acquire'

portalocker_tests/test_semaphore.py:21: AttributeError

test_semaphore.py::test_bounded_semaphore[0.0005-0]

test_semaphore.py::test_bounded_semaphore[0.0005-0]
timeout = 0, check_interval = 0.0005
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x7fb5d426ffd0>

    @pytest.mark.parametrize('timeout', [None, 0, 0.001])
    @pytest.mark.parametrize('check_interval', [None, 0, 0.0005])
    def test_bounded_semaphore(timeout, check_interval, monkeypatch):
        n = 2
        name: str = str(random.random())
        monkeypatch.setattr(utils, 'DEFAULT_TIMEOUT', 0.0001)
        monkeypatch.setattr(utils, 'DEFAULT_CHECK_INTERVAL', 0.0005)

        semaphore_a = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)
        semaphore_b = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)
        semaphore_c = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)

>       semaphore_a.acquire(timeout=timeout)
E       AttributeError: 'BoundedSemaphore' object has no attribute 'acquire'

portalocker_tests/test_semaphore.py:21: AttributeError

test_semaphore.py::test_bounded_semaphore[0.0005-0.001]

test_semaphore.py::test_bounded_semaphore[0.0005-0.001]
timeout = 0.001, check_interval = 0.0005
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x7fb5d418baf0>

    @pytest.mark.parametrize('timeout', [None, 0, 0.001])
    @pytest.mark.parametrize('check_interval', [None, 0, 0.0005])
    def test_bounded_semaphore(timeout, check_interval, monkeypatch):
        n = 2
        name: str = str(random.random())
        monkeypatch.setattr(utils, 'DEFAULT_TIMEOUT', 0.0001)
        monkeypatch.setattr(utils, 'DEFAULT_CHECK_INTERVAL', 0.0005)

        semaphore_a = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)
        semaphore_b = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)
        semaphore_c = portalocker.BoundedSemaphore(n, name=name, timeout=timeout)

>       semaphore_a.acquire(timeout=timeout)
E       AttributeError: 'BoundedSemaphore' object has no attribute 'acquire'

portalocker_tests/test_semaphore.py:21: AttributeError

tests.py::test_with_timeout

tests.py::test_with_timeout
file_ = <_io.BufferedWriter name='/tmp/pytest-of-root/pytest-0/test_with_timeout0/8159113691546718'>
flags = 

    def lock(file_: typing.Union[typing.IO, int, HasFileno], flags: LockFlags) -> None:
        if LOCKER is None:
            raise NotImplementedError("Locking not supported on this platform")

        try:
>           LOCKER(file_, flags)
E           BlockingIOError: [Errno 11] Resource temporarily unavailable

portalocker/portalocker.py:56: BlockingIOError

During handling of the above exception, another exception occurred:

tmpfile = '/tmp/pytest-of-root/pytest-0/test_with_timeout0/8159113691546718'

    def test_with_timeout(tmpfile):
        # Open the file 2 times
        with pytest.raises(portalocker.AlreadyLocked):
            with portalocker.Lock(tmpfile, timeout=0.1) as fh:
                print('writing some stuff to my cache...', file=fh)
>               with portalocker.Lock(
                    tmpfile,
                    timeout=0.1,
                    mode='wb',
                    fail_when_locked=True,
                ):

portalocker_tests/tests.py:54: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
portalocker/utils.py:189: in __enter__
    return self.acquire()
portalocker/utils.py:177: in acquire
    fh = self._get_lock(fh)
portalocker/utils.py:211: in _get_lock
    raise exception
portalocker/utils.py:208: in _get_lock
    portalocker.lock(fh, self.flags)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

file_ = <_io.BufferedWriter name='/tmp/pytest-of-root/pytest-0/test_with_timeout0/8159113691546718'>
flags = 

    def lock(file_: typing.Union[typing.IO, int, HasFileno], flags: LockFlags) -> None:
        if LOCKER is None:
            raise NotImplementedError("Locking not supported on this platform")

        try:
            LOCKER(file_, flags)
        except IOError as e:
            if e.errno == errno.EAGAIN or e.errno == errno.EACCES:
>               raise exceptions.LockException(e.strerror)
E               portalocker.exceptions.LockException: Resource temporarily unavailable

portalocker/portalocker.py:59: LockException

tests.py::test_class

tests.py::test_class
file_ = <_io.TextIOWrapper name='/tmp/pytest-of-root/pytest-0/test_class0/693918048671505' mode='a' encoding='UTF-8'>
flags = 

    def lock(file_: typing.Union[typing.IO, int, HasFileno], flags: LockFlags) -> None:
        if LOCKER is None:
            raise NotImplementedError("Locking not supported on this platform")

        try:
>           LOCKER(file_, flags)
E           BlockingIOError: [Errno 11] Resource temporarily unavailable

portalocker/portalocker.py:56: BlockingIOError

During handling of the above exception, another exception occurred:

self = , timeout = 0.1
check_interval = 0.25, fail_when_locked = False

    def acquire(self, timeout: typing.Optional[float]=None, check_interval:
        typing.Optional[float]=None, fail_when_locked: typing.Optional[bool
        ]=None) ->typing.IO[typing.AnyStr]:
        """Acquire the locked filehandle"""
        if timeout is None:
            timeout = self.timeout
        if check_interval is None:
            check_interval = self.check_interval
        if fail_when_locked is None:
            fail_when_locked = self.fail_when_locked

        start_time = time.time()
        while True:
            try:
                fh = self._get_fh()
>               fh = self._get_lock(fh)

portalocker/utils.py:177: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
portalocker/utils.py:211: in _get_lock
    raise exception
portalocker/utils.py:208: in _get_lock
    portalocker.lock(fh, self.flags)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

file_ = <_io.TextIOWrapper name='/tmp/pytest-of-root/pytest-0/test_class0/693918048671505' mode='a' encoding='UTF-8'>
flags = 

    def lock(file_: typing.Union[typing.IO, int, HasFileno], flags: LockFlags) -> None:
        if LOCKER is None:
            raise NotImplementedError("Locking not supported on this platform")

        try:
            LOCKER(file_, flags)
        except IOError as e:
            if e.errno == errno.EAGAIN or e.errno == errno.EACCES:
>               raise exceptions.LockException(e.strerror)
E               portalocker.exceptions.LockException: Resource temporarily unavailable

portalocker/portalocker.py:59: LockException

During handling of the above exception, another exception occurred:

tmpfile = '/tmp/pytest-of-root/pytest-0/test_class0/693918048671505'

    def test_class(tmpfile):
        lock = portalocker.Lock(tmpfile)
        lock2 = portalocker.Lock(tmpfile, fail_when_locked=False, timeout=0.01)

        with lock:
>           lock.acquire()

portalocker_tests/tests.py:119: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = , timeout = 0.1
check_interval = 0.25, fail_when_locked = False

    def acquire(self, timeout: typing.Optional[float]=None, check_interval:
        typing.Optional[float]=None, fail_when_locked: typing.Optional[bool
        ]=None) ->typing.IO[typing.AnyStr]:
        """Acquire the locked filehandle"""
        if timeout is None:
            timeout = self.timeout
        if check_interval is None:
            check_interval = self.check_interval
        if fail_when_locked is None:
            fail_when_locked = self.fail_when_locked

        start_time = time.time()
        while True:
            try:
                fh = self._get_fh()
                fh = self._get_lock(fh)
                fh = self._prepare_fh(fh)
                self.fh = fh
                return fh
            except exceptions.LockException as exception:
                if fail_when_locked:
                    raise
                if timeout is not None and time.time() - start_time > timeout:
>                   raise exceptions.LockException(exception)
E                   portalocker.exceptions.LockException: Resource temporarily unavailable

portalocker/utils.py:185: LockException

tests.py::test_rlock_acquire_release_count

tests.py::test_rlock_acquire_release_count
file_ = <_io.TextIOWrapper name='/tmp/pytest-of-root/pytest-0/test_rlock_acquire_release_cou0/11337032696754434' mode='a' encoding='UTF-8'>
flags = 

    def lock(file_: typing.Union[typing.IO, int, HasFileno], flags: LockFlags) -> None:
        if LOCKER is None:
            raise NotImplementedError("Locking not supported on this platform")

        try:
>           LOCKER(file_, flags)
E           BlockingIOError: [Errno 11] Resource temporarily unavailable

portalocker/portalocker.py:56: BlockingIOError

During handling of the above exception, another exception occurred:

self = , timeout = 5
check_interval = 0.25, fail_when_locked = False

    def acquire(self, timeout: typing.Optional[float]=None, check_interval:
        typing.Optional[float]=None, fail_when_locked: typing.Optional[bool
        ]=None) ->typing.IO[typing.AnyStr]:
        """Acquire the locked filehandle"""
        if timeout is None:
            timeout = self.timeout
        if check_interval is None:
            check_interval = self.check_interval
        if fail_when_locked is None:
            fail_when_locked = self.fail_when_locked

        start_time = time.time()
        while True:
            try:
                fh = self._get_fh()
>               fh = self._get_lock(fh)

portalocker/utils.py:177: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
portalocker/utils.py:211: in _get_lock
    raise exception
portalocker/utils.py:208: in _get_lock
    portalocker.lock(fh, self.flags)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

file_ = <_io.TextIOWrapper name='/tmp/pytest-of-root/pytest-0/test_rlock_acquire_release_cou0/11337032696754434' mode='a' encoding='UTF-8'>
flags = 

    def lock(file_: typing.Union[typing.IO, int, HasFileno], flags: LockFlags) -> None:
        if LOCKER is None:
            raise NotImplementedError("Locking not supported on this platform")

        try:
            LOCKER(file_, flags)
        except IOError as e:
            if e.errno == errno.EAGAIN or e.errno == errno.EACCES:
>               raise exceptions.LockException(e.strerror)
E               portalocker.exceptions.LockException: Resource temporarily unavailable

portalocker/portalocker.py:59: LockException

During handling of the above exception, another exception occurred:

tmpfile = '/tmp/pytest-of-root/pytest-0/test_rlock_acquire_release_cou0/11337032696754434'

    def test_rlock_acquire_release_count(tmpfile):
        lock = portalocker.RLock(tmpfile)
        # Twice acquire
        h = lock.acquire()
        assert not h.closed
>       lock.acquire()

portalocker_tests/tests.py:149: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = , timeout = 5
check_interval = 0.25, fail_when_locked = False

    def acquire(self, timeout: typing.Optional[float]=None, check_interval:
        typing.Optional[float]=None, fail_when_locked: typing.Optional[bool
        ]=None) ->typing.IO[typing.AnyStr]:
        """Acquire the locked filehandle"""
        if timeout is None:
            timeout = self.timeout
        if check_interval is None:
            check_interval = self.check_interval
        if fail_when_locked is None:
            fail_when_locked = self.fail_when_locked

        start_time = time.time()
        while True:
            try:
                fh = self._get_fh()
                fh = self._get_lock(fh)
                fh = self._prepare_fh(fh)
                self.fh = fh
                return fh
            except exceptions.LockException as exception:
                if fail_when_locked:
                    raise
                if timeout is not None and time.time() - start_time > timeout:
>                   raise exceptions.LockException(exception)
E                   portalocker.exceptions.LockException: Resource temporarily unavailable

portalocker/utils.py:185: LockException

tests.py::test_rlock_acquire_release

tests.py::test_rlock_acquire_release
file_ = <_io.TextIOWrapper name='/tmp/pytest-of-root/pytest-0/test_rlock_acquire_release0/6920882171292614' mode='a' encoding='UTF-8'>
flags = 

    def lock(file_: typing.Union[typing.IO, int, HasFileno], flags: LockFlags) -> None:
        if LOCKER is None:
            raise NotImplementedError("Locking not supported on this platform")

        try:
>           LOCKER(file_, flags)
E           BlockingIOError: [Errno 11] Resource temporarily unavailable

portalocker/portalocker.py:56: BlockingIOError

During handling of the above exception, another exception occurred:

self = , timeout = 5
check_interval = 0.25, fail_when_locked = False

    def acquire(self, timeout: typing.Optional[float]=None, check_interval:
        typing.Optional[float]=None, fail_when_locked: typing.Optional[bool
        ]=None) ->typing.IO[typing.AnyStr]:
        """Acquire the locked filehandle"""
        if timeout is None:
            timeout = self.timeout
        if check_interval is None:
            check_interval = self.check_interval
        if fail_when_locked is None:
            fail_when_locked = self.fail_when_locked

        start_time = time.time()
        while True:
            try:
                fh = self._get_fh()
>               fh = self._get_lock(fh)

portalocker/utils.py:177: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
portalocker/utils.py:211: in _get_lock
    raise exception
portalocker/utils.py:208: in _get_lock
    portalocker.lock(fh, self.flags)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

file_ = <_io.TextIOWrapper name='/tmp/pytest-of-root/pytest-0/test_rlock_acquire_release0/6920882171292614' mode='a' encoding='UTF-8'>
flags = 

    def lock(file_: typing.Union[typing.IO, int, HasFileno], flags: LockFlags) -> None:
        if LOCKER is None:
            raise NotImplementedError("Locking not supported on this platform")

        try:
            LOCKER(file_, flags)
        except IOError as e:
            if e.errno == errno.EAGAIN or e.errno == errno.EACCES:
>               raise exceptions.LockException(e.strerror)
E               portalocker.exceptions.LockException: Resource temporarily unavailable

portalocker/portalocker.py:59: LockException

During handling of the above exception, another exception occurred:

tmpfile = '/tmp/pytest-of-root/pytest-0/test_rlock_acquire_release0/6920882171292614'

    def test_rlock_acquire_release(tmpfile):
        lock = portalocker.RLock(tmpfile)
        lock2 = portalocker.RLock(tmpfile, fail_when_locked=False)

        lock.acquire()  # acquire lock when nobody is using it
        with pytest.raises(portalocker.LockException):
            # another party should not be able to acquire the lock
            lock2.acquire(timeout=0.01)

        # Now acquire again
>       lock.acquire()

portalocker_tests/tests.py:169: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = , timeout = 5
check_interval = 0.25, fail_when_locked = False

    def acquire(self, timeout: typing.Optional[float]=None, check_interval:
        typing.Optional[float]=None, fail_when_locked: typing.Optional[bool
        ]=None) ->typing.IO[typing.AnyStr]:
        """Acquire the locked filehandle"""
        if timeout is None:
            timeout = self.timeout
        if check_interval is None:
            check_interval = self.check_interval
        if fail_when_locked is None:
            fail_when_locked = self.fail_when_locked

        start_time = time.time()
        while True:
            try:
                fh = self._get_fh()
                fh = self._get_lock(fh)
                fh = self._prepare_fh(fh)
                self.fh = fh
                return fh
            except exceptions.LockException as exception:
                if fail_when_locked:
                    raise
                if timeout is not None and time.time() - start_time > timeout:
>                   raise exceptions.LockException(exception)
E                   portalocker.exceptions.LockException: Resource temporarily unavailable

portalocker/utils.py:185: LockException

tests.py::test_release_unacquired

tests.py::test_release_unacquired
tmpfile = '/tmp/pytest-of-root/pytest-0/test_release_unacquired0/33539793071933566'

    def test_release_unacquired(tmpfile):
>       with pytest.raises(portalocker.LockException):
E       Failed: DID NOT RAISE 

portalocker_tests/tests.py:176: Failed

tests.py::test_blocking_timeout[flock]

tests.py::test_blocking_timeout[flock]
tmpfile = '/tmp/pytest-of-root/pytest-0/test_blocking_timeout_flock_0/5148130718593418'
locker = 

    @pytest.mark.parametrize('locker', LOCKERS, indirect=True)
    def test_blocking_timeout(tmpfile, locker):
        flags = LockFlags.SHARED

        with pytest.warns(UserWarning):
            with portalocker.Lock(tmpfile, 'a+', timeout=5, flags=flags):
                pass

        lock = portalocker.Lock(tmpfile, 'a+', flags=flags)
>       with pytest.warns(UserWarning):
E       Failed: DID NOT WARN. No warnings of type (,) were emitted.
E        Emitted warnings: [].

portalocker_tests/tests.py:241: Failed

tests.py::test_blocking_timeout[lockf]

tests.py::test_blocking_timeout[lockf]
tmpfile = '/tmp/pytest-of-root/pytest-0/test_blocking_timeout_lockf_0/016835211100857617'
locker = 

    @pytest.mark.parametrize('locker', LOCKERS, indirect=True)
    def test_blocking_timeout(tmpfile, locker):
        flags = LockFlags.SHARED

        with pytest.warns(UserWarning):
            with portalocker.Lock(tmpfile, 'a+', timeout=5, flags=flags):
                pass

        lock = portalocker.Lock(tmpfile, 'a+', flags=flags)
>       with pytest.warns(UserWarning):
E       Failed: DID NOT WARN. No warnings of type (,) were emitted.
E        Emitted warnings: [].

portalocker_tests/tests.py:241: Failed

tests.py::test_nonblocking[flock]

tests.py::test_nonblocking[flock]
tmpfile = '/tmp/pytest-of-root/pytest-0/test_nonblocking_flock_0/33978430026274553'
locker = 

    @pytest.mark.skipif(
        os.name == 'nt',
        reason='Windows uses an entirely different lockmechanism',
    )
    @pytest.mark.parametrize('locker', LOCKERS, indirect=True)
    def test_nonblocking(tmpfile, locker):
        with open(tmpfile, 'w') as fh, pytest.raises(RuntimeError):
>           portalocker.lock(fh, LockFlags.NON_BLOCKING)

portalocker_tests/tests.py:252: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

file_ = <_io.TextIOWrapper name='/tmp/pytest-of-root/pytest-0/test_nonblocking_flock_0/33978430026274553' mode='w' encoding='UTF-8'>
flags = 

    def lock(file_: typing.Union[typing.IO, int, HasFileno], flags: LockFlags) -> None:
        if LOCKER is None:
            raise NotImplementedError("Locking not supported on this platform")

        try:
>           LOCKER(file_, flags)
E           OSError: [Errno 22] Invalid argument

portalocker/portalocker.py:56: OSError

tests.py::test_nonblocking[lockf]

tests.py::test_nonblocking[lockf]
tmpfile = '/tmp/pytest-of-root/pytest-0/test_nonblocking_lockf_0/31840751289262303'
locker = 

    @pytest.mark.skipif(
        os.name == 'nt',
        reason='Windows uses an entirely different lockmechanism',
    )
    @pytest.mark.parametrize('locker', LOCKERS, indirect=True)
    def test_nonblocking(tmpfile, locker):
        with open(tmpfile, 'w') as fh, pytest.raises(RuntimeError):
>           portalocker.lock(fh, LockFlags.NON_BLOCKING)

portalocker_tests/tests.py:252: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

file_ = <_io.TextIOWrapper name='/tmp/pytest-of-root/pytest-0/test_nonblocking_lockf_0/31840751289262303' mode='w' encoding='UTF-8'>
flags = 

    def lock(file_: typing.Union[typing.IO, int, HasFileno], flags: LockFlags) -> None:
        if LOCKER is None:
            raise NotImplementedError("Locking not supported on this platform")

        try:
>           LOCKER(file_, flags)
E           ValueError: unrecognized lockf argument

portalocker/portalocker.py:56: ValueError

tests.py::test_exception

tests.py::test_exception
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x7fb5d4342e00>
tmpfile = '/tmp/pytest-of-root/pytest-0/test_exception0/3494459719916998'

    def test_exception(monkeypatch, tmpfile):
        '''Do we stop immediately if the locking fails, even with a timeout?'''

        def patched_lock(*args, **kwargs):
            raise ValueError('Test exception')

        monkeypatch.setattr('portalocker.utils.portalocker.lock', patched_lock)
        lock = portalocker.Lock(tmpfile, 'w', timeout=math.inf)

        with pytest.raises(exceptions.LockException):
>           lock.acquire()

portalocker_tests/tests.py:446: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
portalocker/utils.py:177: in acquire
    fh = self._get_lock(fh)
portalocker/utils.py:208: in _get_lock
    portalocker.lock(fh, self.flags)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<_io.TextIOWrapper name='/tmp/pytest-of-root/pytest-0/test_exception0/3494459719916998' mode='a' encoding='UTF-8'>, )
kwargs = {}

    def patched_lock(*args, **kwargs):
>       raise ValueError('Test exception')
E       ValueError: Test exception

portalocker_tests/tests.py:440: ValueError

Patch diff

diff --git a/portalocker/exceptions.py b/portalocker/exceptions.py
index 9c73924..f705b9e 100644
--- a/portalocker/exceptions.py
+++ b/portalocker/exceptions.py
@@ -11,12 +11,15 @@ class BaseLockException(Exception):


 class LockException(BaseLockException):
-    pass
+    def __init__(self, *args: typing.Any, **kwargs: typing.Any) ->None:
+        super().__init__(*args, **kwargs)


 class AlreadyLocked(LockException):
-    pass
+    def __init__(self, *args: typing.Any, **kwargs: typing.Any) ->None:
+        super().__init__(*args, **kwargs)


 class FileToLarge(LockException):
-    pass
+    def __init__(self, *args: typing.Any, **kwargs: typing.Any) ->None:
+        super().__init__(*args, **kwargs)
diff --git a/portalocker/portalocker.py b/portalocker/portalocker.py
index e718322..8f57a04 100644
--- a/portalocker/portalocker.py
+++ b/portalocker/portalocker.py
@@ -5,7 +5,8 @@ LockFlags = constants.LockFlags


 class HasFileno(typing.Protocol):
-    pass
+    def fileno(self) -> int:
+        ...


 LOCKER: typing.Optional[typing.Callable[[typing.Union[int, HasFileno], int],
@@ -17,9 +18,51 @@ if os.name == 'nt':
     import win32file
     import winerror
     __overlapped = pywintypes.OVERLAPPED()
+
+    def win32_lock(file_handle: typing.Union[int, HasFileno], flags: int) -> None:
+        hfile = win32file._get_osfhandle(file_handle.fileno() if hasattr(file_handle, 'fileno') else file_handle)
+        try:
+            win32file.LockFileEx(hfile, flags, 0, -0x10000, __overlapped)
+        except pywintypes.error as e:
+            if e.winerror == winerror.ERROR_LOCK_VIOLATION:
+                raise exceptions.LockException(e.strerror)
+            else:
+                raise
+
+    def win32_unlock(file_handle: typing.Union[int, HasFileno]) -> None:
+        hfile = win32file._get_osfhandle(file_handle.fileno() if hasattr(file_handle, 'fileno') else file_handle)
+        try:
+            win32file.UnlockFileEx(hfile, 0, -0x10000, __overlapped)
+        except pywintypes.error as e:
+            if e.winerror == winerror.ERROR_NOT_LOCKED:
+                # error: not locked
+                pass
+            else:
+                raise
+
+    LOCKER = win32_lock
 elif os.name == 'posix':
     import errno
     import fcntl
     LOCKER = fcntl.flock
 else:
     raise RuntimeError('PortaLocker only defined for nt and posix platforms')
+
+def lock(file_: typing.Union[typing.IO, int, HasFileno], flags: LockFlags) -> None:
+    if LOCKER is None:
+        raise NotImplementedError("Locking not supported on this platform")
+    
+    try:
+        LOCKER(file_, flags)
+    except IOError as e:
+        if e.errno == errno.EAGAIN or e.errno == errno.EACCES:
+            raise exceptions.LockException(e.strerror)
+        raise
+
+def unlock(file_: typing.Union[typing.IO, int, HasFileno]) -> None:
+    if os.name == 'nt':
+        win32_unlock(file_)
+    elif os.name == 'posix':
+        fcntl.flock(file_, fcntl.LOCK_UN)
+    else:
+        raise NotImplementedError("Unlocking not supported on this platform")
diff --git a/portalocker/redis.py b/portalocker/redis.py
index f02a9d9..cf2b059 100644
--- a/portalocker/redis.py
+++ b/portalocker/redis.py
@@ -81,3 +81,70 @@ class RedisLock(utils.LockBase):

     def __del__(self):
         self.release()
+
+    def acquire(self, timeout: typing.Optional[float] = None, check_interval: typing.Optional[float] = None):
+        if timeout is None:
+            timeout = self.timeout
+        if check_interval is None:
+            check_interval = self.check_interval
+
+        if self.connection is None:
+            self.connection = client.Redis(**self.redis_kwargs)
+
+        self.pubsub = self.connection.pubsub()
+        self.pubsub.subscribe(self.channel)
+
+        start_time = time.time()
+        while True:
+            if self._try_acquire():
+                return True
+
+            if time.time() - start_time > timeout:
+                if self.fail_when_locked:
+                    raise exceptions.LockException(f"Failed to acquire lock on {self.channel}")
+                return False
+
+            time.sleep(check_interval)
+
+    def release(self):
+        if self.pubsub:
+            self.pubsub.unsubscribe(self.channel)
+            self.pubsub.close()
+            self.pubsub = None
+
+        if self.thread:
+            self.thread.stop()
+            self.thread = None
+
+        if self.connection and self.close_connection:
+            self.connection.close()
+            self.connection = None
+
+    def _try_acquire(self):
+        if self.connection is None:
+            return False
+
+        lock_id = str(random.randint(0, 2**32))
+        if self.connection.set(self.channel, lock_id, nx=True, ex=int(self.timeout)):
+            self._start_keep_alive(lock_id)
+            return True
+
+        return False
+
+    def _start_keep_alive(self, lock_id):
+        def keep_alive():
+            while self.connection and self.pubsub:
+                if self.connection.get(self.channel) == lock_id:
+                    self.connection.expire(self.channel, int(self.timeout))
+                time.sleep(self.timeout / 2)
+
+        self.thread = PubSubWorkerThread(self.pubsub, sleep_time=self.thread_sleep_time)
+        self.thread.start()
+        _thread.start_new_thread(keep_alive, ())
+
+    def __enter__(self):
+        self.acquire()
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.release()
diff --git a/portalocker/utils.py b/portalocker/utils.py
index a482b38..b1e549d 100644
--- a/portalocker/utils.py
+++ b/portalocker/utils.py
@@ -41,7 +41,10 @@ def coalesce(*args: typing.Any, test_value: typing.Any=None) ->typing.Any:
     >>> coalesce([], dict(spam='eggs'), test_value=[])
     []
     """
-    pass
+    for arg in args:
+        if arg is not test_value:
+            return arg
+    return None


 @contextlib.contextmanager
@@ -71,7 +74,18 @@ def open_atomic(filename: Filename, binary: bool=True) ->typing.Iterator[typing
     >>> assert path_filename.exists()
     >>> path_filename.unlink()
     """
-    pass
+    path = pathlib.Path(filename)
+    temp_filename = f"{path}.tmp{os.getpid()}"
+    mode = 'wb' if binary else 'w'
+    try:
+        with open(temp_filename, mode) as file:
+            yield file
+        os.rename(temp_filename, path)
+    finally:
+        try:
+            os.unlink(temp_filename)
+        except OSError:
+            pass


 class LockBase(abc.ABC):
@@ -149,25 +163,53 @@ class Lock(LockBase):
         typing.Optional[float]=None, fail_when_locked: typing.Optional[bool
         ]=None) ->typing.IO[typing.AnyStr]:
         """Acquire the locked filehandle"""
-        pass
+        if timeout is None:
+            timeout = self.timeout
+        if check_interval is None:
+            check_interval = self.check_interval
+        if fail_when_locked is None:
+            fail_when_locked = self.fail_when_locked
+
+        start_time = time.time()
+        while True:
+            try:
+                fh = self._get_fh()
+                fh = self._get_lock(fh)
+                fh = self._prepare_fh(fh)
+                self.fh = fh
+                return fh
+            except exceptions.LockException as exception:
+                if fail_when_locked:
+                    raise
+                if timeout is not None and time.time() - start_time > timeout:
+                    raise exceptions.LockException(exception)
+                time.sleep(check_interval)

     def __enter__(self) ->typing.IO[typing.AnyStr]:
         return self.acquire()

     def release(self):
         """Releases the currently locked file handle"""
-        pass
+        if self.fh:
+            portalocker.unlock(self.fh)
+            self.fh.close()
+            self.fh = None

     def _get_fh(self) ->typing.IO:
         """Get a new filehandle"""
-        pass
+        return open(self.filename, self.mode, **self.file_open_kwargs)

     def _get_lock(self, fh: typing.IO) ->typing.IO:
         """
         Try to lock the given filehandle

         returns LockException if it fails"""
-        pass
+        try:
+            portalocker.lock(fh, self.flags)
+        except exceptions.LockException as exception:
+            fh.close()
+            raise exception
+        return fh

     def _prepare_fh(self, fh: typing.IO) ->typing.IO:
         """
@@ -176,7 +218,10 @@ class Lock(LockBase):
         If truncate is a number, the file will be truncated to that amount of
         bytes
         """
-        pass
+        if self.truncate:
+            fh.seek(0)
+            fh.truncate(0)
+        return fh


 class RLock(Lock):