Skip to content

back to Reference (Gold) summary

Reference (Gold): portalocker

Pytest Summary for test portalocker_tests

status count
passed 40
total 40
collected 40

Failed pytests:

Patch diff

diff --git a/portalocker/__about__.py b/portalocker/__about__.py
index fc965bf..a0b817a 100644
--- a/portalocker/__about__.py
+++ b/portalocker/__about__.py
@@ -2,5 +2,5 @@ __package_name__ = 'portalocker'
 __author__ = 'Rick van Hattem'
 __email__ = 'wolph@wol.ph'
 __version__ = '2.10.1'
-__description__ = 'Wraps the portalocker recipe for easy usage'
+__description__ = '''Wraps the portalocker recipe for easy usage'''
 __url__ = 'https://github.com/WoLpH/portalocker'
diff --git a/portalocker/constants.py b/portalocker/constants.py
index 12b8ef6..2099f1f 100644
--- a/portalocker/constants.py
+++ b/portalocker/constants.py
@@ -1,4 +1,4 @@
-"""
+'''
 Locking constants

 Lock types:
@@ -13,27 +13,47 @@ Lock flags:
 Manually unlock, only needed internally

 - `UNBLOCK` unlock
-"""
+'''
+
 import enum
 import os
-if os.name == 'nt':
+
+# The actual tests will execute the code anyhow so the following code can
+# safely be ignored from the coverage tests
+if os.name == 'nt':  # pragma: no cover
     import msvcrt
-    LOCK_EX = 1
-    LOCK_SH = 2
-    LOCK_NB = 4
-    LOCK_UN = msvcrt.LK_UNLCK
-elif os.name == 'posix':
+
+    #: exclusive lock
+    LOCK_EX = 0x1
+    #: shared lock
+    LOCK_SH = 0x2
+    #: non-blocking
+    LOCK_NB = 0x4
+    #: unlock
+    LOCK_UN = msvcrt.LK_UNLCK  # type: ignore
+
+elif os.name == 'posix':  # pragma: no cover
     import fcntl
+
+    #: exclusive lock
     LOCK_EX = fcntl.LOCK_EX
+    #: shared lock
     LOCK_SH = fcntl.LOCK_SH
+    #: non-blocking
     LOCK_NB = fcntl.LOCK_NB
+    #: unlock
     LOCK_UN = fcntl.LOCK_UN
-else:
+
+else:  # pragma: no cover
     raise RuntimeError('PortaLocker only defined for nt and posix platforms')


 class LockFlags(enum.IntFlag):
+    #: exclusive lock
     EXCLUSIVE = LOCK_EX
+    #: shared lock
     SHARED = LOCK_SH
+    #: non-blocking
     NON_BLOCKING = LOCK_NB
+    #: unlock
     UNBLOCK = LOCK_UN
diff --git a/portalocker/exceptions.py b/portalocker/exceptions.py
index 9c73924..e871d13 100644
--- a/portalocker/exceptions.py
+++ b/portalocker/exceptions.py
@@ -1,11 +1,16 @@
 import typing


-class BaseLockException(Exception):
+class BaseLockException(Exception):  # noqa: N818
+    # Error codes:
     LOCK_FAILED = 1

-    def __init__(self, *args: typing.Any, fh: typing.Union[typing.IO, None,
-        int]=None, **kwargs: typing.Any) ->None:
+    def __init__(
+        self,
+        *args: typing.Any,
+        fh: typing.Union[typing.IO, None, int] = None,
+        **kwargs: typing.Any,
+    ) -> None:
         self.fh = fh
         Exception.__init__(self, *args)

diff --git a/portalocker/portalocker.py b/portalocker/portalocker.py
index e718322..ceceeaa 100644
--- a/portalocker/portalocker.py
+++ b/portalocker/portalocker.py
@@ -1,25 +1,154 @@
 import os
 import typing
+
 from . import constants, exceptions
+
+# Alias for readability. Due to import recursion issues we cannot do:
+# from .constants import LockFlags
 LockFlags = constants.LockFlags


 class HasFileno(typing.Protocol):
-    pass
+    def fileno(self) -> int: ...
+

+LOCKER: typing.Optional[typing.Callable[
+    [typing.Union[int, HasFileno], int], typing.Any]] = None

-LOCKER: typing.Optional[typing.Callable[[typing.Union[int, HasFileno], int],
-    typing.Any]] = None
-if os.name == 'nt':
+
+if os.name == 'nt':  # pragma: no cover
     import msvcrt
+
     import pywintypes
     import win32con
     import win32file
     import winerror
+
     __overlapped = pywintypes.OVERLAPPED()
-elif os.name == 'posix':
+
+    def lock(file_: typing.Union[typing.IO, int], flags: LockFlags):
+        # Windows locking does not support locking through `fh.fileno()` so
+        # we cast it to make mypy and pyright happy
+        file_ = typing.cast(typing.IO, file_)
+
+        mode = 0
+        if flags & LockFlags.NON_BLOCKING:
+            mode |= win32con.LOCKFILE_FAIL_IMMEDIATELY
+
+        if flags & LockFlags.EXCLUSIVE:
+            mode |= win32con.LOCKFILE_EXCLUSIVE_LOCK
+
+        # Save the old position so we can go back to that position but
+        # still lock from the beginning of the file
+        savepos = file_.tell()
+        if savepos:
+            file_.seek(0)
+
+        os_fh = msvcrt.get_osfhandle(file_.fileno())  # type: ignore
+        try:
+            win32file.LockFileEx(os_fh, mode, 0, -0x10000, __overlapped)
+        except pywintypes.error as exc_value:
+            # error: (33, 'LockFileEx', 'The process cannot access the file
+            # because another process has locked a portion of the file.')
+            if exc_value.winerror == winerror.ERROR_LOCK_VIOLATION:
+                raise exceptions.AlreadyLocked(
+                    exceptions.LockException.LOCK_FAILED,
+                    exc_value.strerror,
+                    fh=file_,
+                ) from exc_value
+            else:
+                # Q:  Are there exceptions/codes we should be dealing with
+                # here?
+                raise
+        finally:
+            if savepos:
+                file_.seek(savepos)
+
+    def unlock(file_: typing.IO):
+        try:
+            savepos = file_.tell()
+            if savepos:
+                file_.seek(0)
+
+            os_fh = msvcrt.get_osfhandle(file_.fileno())  # type: ignore
+            try:
+                win32file.UnlockFileEx(
+                    os_fh,
+                    0,
+                    -0x10000,
+                    __overlapped,
+                )
+            except pywintypes.error as exc:
+                if exc.winerror != winerror.ERROR_NOT_LOCKED:
+                    # Q:  Are there exceptions/codes we should be
+                    # dealing with here?
+                    raise
+            finally:
+                if savepos:
+                    file_.seek(savepos)
+        except OSError as exc:
+            raise exceptions.LockException(
+                exceptions.LockException.LOCK_FAILED,
+                exc.strerror,
+                fh=file_,
+            ) from exc
+
+elif os.name == 'posix':  # pragma: no cover
     import errno
     import fcntl
+
+    # The locking implementation.
+    # Expected values are either fcntl.flock() or fcntl.lockf(),
+    # but any callable that matches the syntax will be accepted.
     LOCKER = fcntl.flock
-else:
+
+    def lock(file_: typing.Union[typing.IO, int], flags: LockFlags):
+        assert LOCKER is not None, 'We need a locking function in `LOCKER` '
+        # Locking with NON_BLOCKING without EXCLUSIVE or SHARED enabled
+        # results in an error
+        if (flags & LockFlags.NON_BLOCKING) and not flags & (
+            LockFlags.SHARED | LockFlags.EXCLUSIVE
+        ):
+            raise RuntimeError(
+                'When locking in non-blocking mode the SHARED '
+                'or EXCLUSIVE flag must be specified as well',
+            )
+
+        try:
+            LOCKER(file_, flags)
+        except OSError as exc_value:
+            # Python can use one of several different exception classes to
+            # represent timeout (most likely is BlockingIOError and IOError),
+            # but these errors may also represent other failures. On some
+            # systems, `IOError is OSError` which means checking for either
+            # IOError or OSError can mask other errors.
+            # The safest check is to catch OSError (from which the others
+            # inherit) and check the errno (which should be EACCESS or EAGAIN
+            # according to the spec).
+            if exc_value.errno in (errno.EACCES, errno.EAGAIN):
+                # A timeout exception, wrap this so the outer code knows to try
+                # again (if it wants to).
+                raise exceptions.AlreadyLocked(
+                    exc_value,
+                    fh=file_,
+                ) from exc_value
+            else:
+                # Something else went wrong; don't wrap this so we stop
+                # immediately.
+                raise exceptions.LockException(
+                    exc_value,
+                    fh=file_,
+                ) from exc_value
+        except EOFError as exc_value:
+            # On NFS filesystems, flock can raise an EOFError
+            raise exceptions.LockException(
+                exc_value,
+                fh=file_,
+            ) from exc_value
+
+    def unlock(file_: typing.IO):
+        assert LOCKER is not None, 'We need a locking function in `LOCKER` '
+        LOCKER(file_.fileno(), LockFlags.UNBLOCK)
+
+else:  # pragma: no cover
     raise RuntimeError('PortaLocker only defined for nt and posix platforms')
diff --git a/portalocker/redis.py b/portalocker/redis.py
index f02a9d9..11ee876 100644
--- a/portalocker/redis.py
+++ b/portalocker/redis.py
@@ -4,19 +4,28 @@ import logging
 import random
 import time
 import typing
+
 from redis import client
+
 from . import exceptions, utils
+
 logger = logging.getLogger(__name__)
+
 DEFAULT_UNAVAILABLE_TIMEOUT = 1
 DEFAULT_THREAD_SLEEP_TIME = 0.1


-class PubSubWorkerThread(client.PubSubWorkerThread):
-    pass
+class PubSubWorkerThread(client.PubSubWorkerThread):  # type: ignore
+    def run(self):
+        try:
+            super().run()
+        except Exception:  # pragma: no cover
+            _thread.interrupt_main()
+            raise


 class RedisLock(utils.LockBase):
-    """
+    '''
     An extremely reliable Redis lock based on pubsub with a keep-alive thread

     As opposed to most Redis locking systems based on key/value pairs,
@@ -50,7 +59,8 @@ class RedisLock(utils.LockBase):
             to override these you need to explicitly specify a value (e.g.
             `health_check_interval=0`)

-    """
+    '''
+
     redis_kwargs: typing.Dict[str, typing.Any]
     thread: typing.Optional[PubSubWorkerThread]
     channel: str
@@ -58,26 +68,169 @@ class RedisLock(utils.LockBase):
     connection: typing.Optional[client.Redis]
     pubsub: typing.Optional[client.PubSub] = None
     close_connection: bool
+
     DEFAULT_REDIS_KWARGS: typing.ClassVar[typing.Dict[str, typing.Any]] = dict(
-        health_check_interval=10)
-
-    def __init__(self, channel: str, connection: typing.Optional[client.
-        Redis]=None, timeout: typing.Optional[float]=None, check_interval:
-        typing.Optional[float]=None, fail_when_locked: typing.Optional[bool
-        ]=False, thread_sleep_time: float=DEFAULT_THREAD_SLEEP_TIME,
-        unavailable_timeout: float=DEFAULT_UNAVAILABLE_TIMEOUT,
-        redis_kwargs: typing.Optional[typing.Dict]=None):
+        health_check_interval=10,
+    )
+
+    def __init__(
+        self,
+        channel: str,
+        connection: typing.Optional[client.Redis] = None,
+        timeout: typing.Optional[float] = None,
+        check_interval: typing.Optional[float] = None,
+        fail_when_locked: typing.Optional[bool] = False,
+        thread_sleep_time: float = DEFAULT_THREAD_SLEEP_TIME,
+        unavailable_timeout: float = DEFAULT_UNAVAILABLE_TIMEOUT,
+        redis_kwargs: typing.Optional[typing.Dict] = None,
+    ):
+        # We don't want to close connections given as an argument
         self.close_connection = not connection
+
         self.thread = None
         self.channel = channel
         self.connection = connection
         self.thread_sleep_time = thread_sleep_time
         self.unavailable_timeout = unavailable_timeout
         self.redis_kwargs = redis_kwargs or dict()
+
         for key, value in self.DEFAULT_REDIS_KWARGS.items():
             self.redis_kwargs.setdefault(key, value)
-        super().__init__(timeout=timeout, check_interval=check_interval,
-            fail_when_locked=fail_when_locked)
+
+        super().__init__(
+            timeout=timeout,
+            check_interval=check_interval,
+            fail_when_locked=fail_when_locked,
+        )
+
+    def get_connection(self) -> client.Redis:
+        if not self.connection:
+            self.connection = client.Redis(**self.redis_kwargs)
+
+        return self.connection
+
+    def channel_handler(self, message):
+        if message.get('type') != 'message':  # pragma: no cover
+            return
+
+        try:
+            data = json.loads(message.get('data'))
+        except TypeError:  # pragma: no cover
+            logger.debug('TypeError while parsing: %r', message)
+            return
+
+        assert self.connection is not None
+        self.connection.publish(data['response_channel'], str(time.time()))
+
+    @property
+    def client_name(self):
+        return f'{self.channel}-lock'
+
+    def acquire(  # type: ignore[override]
+        self,
+        timeout: typing.Optional[float] = None,
+        check_interval: typing.Optional[float] = None,
+        fail_when_locked: typing.Optional[bool] = None,
+    ) -> 'RedisLock':
+        timeout = utils.coalesce(timeout, self.timeout, 0.0)
+        check_interval = utils.coalesce(
+            check_interval,
+            self.check_interval,
+            0.0,
+        )
+        fail_when_locked = utils.coalesce(
+            fail_when_locked,
+            self.fail_when_locked,
+        )
+
+        assert not self.pubsub, 'This lock is already active'
+        connection = self.get_connection()
+
+        timeout_generator = self._timeout_generator(timeout, check_interval)
+        for _ in timeout_generator:  # pragma: no branch
+            subscribers = connection.pubsub_numsub(self.channel)[0][1]
+
+            if subscribers:
+                logger.debug(
+                    'Found %d lock subscribers for %s',
+                    subscribers,
+                    self.channel,
+                )
+
+                if self.check_or_kill_lock(
+                    connection,
+                    self.unavailable_timeout,
+                ):  # pragma: no branch
+                    continue
+                else:  # pragma: no cover
+                    subscribers = 0
+
+            # Note: this should not be changed to an elif because the if
+            # above can still end up here
+            if not subscribers:
+                connection.client_setname(self.client_name)
+                self.pubsub = connection.pubsub()
+                self.pubsub.subscribe(**{self.channel: self.channel_handler})
+                self.thread = PubSubWorkerThread(
+                    self.pubsub,
+                    sleep_time=self.thread_sleep_time,
+                )
+                self.thread.start()
+
+                subscribers = connection.pubsub_numsub(self.channel)[0][1]
+                if subscribers == 1:  # pragma: no branch
+                    return self
+                else:  # pragma: no cover
+                    # Race condition, let's try again
+                    self.release()
+
+            if fail_when_locked:  # pragma: no cover
+                raise exceptions.AlreadyLocked(exceptions)
+
+        raise exceptions.AlreadyLocked(exceptions)
+
+    def check_or_kill_lock(self, connection, timeout):
+        # Random channel name to get messages back from the lock
+        response_channel = f'{self.channel}-{random.random()}'
+
+        pubsub = connection.pubsub()
+        pubsub.subscribe(response_channel)
+        connection.publish(
+            self.channel,
+            json.dumps(
+                dict(
+                    response_channel=response_channel,
+                    message='ping',
+                ),
+            ),
+        )
+
+        check_interval = min(self.thread_sleep_time, timeout / 10)
+        for _ in self._timeout_generator(
+            timeout,
+            check_interval,
+        ):  # pragma: no branch
+            if pubsub.get_message(timeout=check_interval):
+                pubsub.close()
+                return True
+
+        for client_ in connection.client_list('pubsub'):  # pragma: no cover
+            if client_.get('name') == self.client_name:
+                logger.warning('Killing unavailable redis client: %r', client_)
+                connection.client_kill_filter(client_.get('id'))
+        return None
+
+    def release(self):
+        if self.thread:  # pragma: no branch
+            self.thread.stop()
+            self.thread.join()
+            self.thread = None
+            time.sleep(0.01)
+
+        if self.pubsub:  # pragma: no branch
+            self.pubsub.unsubscribe(self.channel)
+            self.pubsub.close()
+            self.pubsub = None

     def __del__(self):
         self.release()
diff --git a/portalocker/utils.py b/portalocker/utils.py
index a482b38..5115b0e 100644
--- a/portalocker/utils.py
+++ b/portalocker/utils.py
@@ -9,18 +9,26 @@ import tempfile
 import time
 import typing
 import warnings
+
 from . import constants, exceptions, portalocker
+
 logger = logging.getLogger(__name__)
+
 DEFAULT_TIMEOUT = 5
 DEFAULT_CHECK_INTERVAL = 0.25
 DEFAULT_FAIL_WHEN_LOCKED = False
 LOCK_METHOD = constants.LockFlags.EXCLUSIVE | constants.LockFlags.NON_BLOCKING
-__all__ = ['Lock', 'open_atomic']
+
+__all__ = [
+    'Lock',
+    'open_atomic',
+]
+
 Filename = typing.Union[str, pathlib.Path]


-def coalesce(*args: typing.Any, test_value: typing.Any=None) ->typing.Any:
-    """Simple coalescing function that returns the first value that is not
+def coalesce(*args: typing.Any, test_value: typing.Any = None) -> typing.Any:
+    '''Simple coalescing function that returns the first value that is not
     equal to the `test_value`. Or `None` if no value is valid. Usually this
     means that the last given value is the default value.

@@ -40,14 +48,16 @@ def coalesce(*args: typing.Any, test_value: typing.Any=None) ->typing.Any:
     # This won't work because of the `is not test_value` type testing:
     >>> coalesce([], dict(spam='eggs'), test_value=[])
     []
-    """
-    pass
+    '''
+    return next((arg for arg in args if arg is not test_value), None)


 @contextlib.contextmanager
-def open_atomic(filename: Filename, binary: bool=True) ->typing.Iterator[typing
-    .IO]:
-    """Open a file for atomic writing. Instead of locking this method allows
+def open_atomic(
+    filename: Filename,
+    binary: bool = True,
+) -> typing.Iterator[typing.IO]:
+    '''Open a file for atomic writing. Instead of locking this method allows
     you to write the entire file and move it to the actual location. Note that
     this makes the assumption that a rename is atomic on your platform which
     is generally the case but not a guarantee.
@@ -70,29 +80,92 @@ def open_atomic(filename: Filename, binary: bool=True) ->typing.Iterator[typing
     ...     written = fh.write(b'test')
     >>> assert path_filename.exists()
     >>> path_filename.unlink()
-    """
-    pass
-
-
-class LockBase(abc.ABC):
+    '''
+    # `pathlib.Path` cast in case `path` is a `str`
+    path: pathlib.Path = pathlib.Path(filename)
+
+    assert not path.exists(), f'{path!r} exists'
+
+    # Create the parent directory if it doesn't exist
+    path.parent.mkdir(parents=True, exist_ok=True)
+
+    temp_fh = tempfile.NamedTemporaryFile(
+        mode=binary and 'wb' or 'w',
+        dir=str(path.parent),
+        delete=False,
+    )
+    yield temp_fh
+    temp_fh.flush()
+    os.fsync(temp_fh.fileno())
+    temp_fh.close()
+    try:
+        os.rename(temp_fh.name, path)
+    finally:
+        with contextlib.suppress(Exception):
+            os.remove(temp_fh.name)
+
+
+class LockBase(abc.ABC):  # pragma: no cover
+    #: timeout when trying to acquire a lock
     timeout: float
+    #: check interval while waiting for `timeout`
     check_interval: float
+    #: skip the timeout and immediately fail if the initial lock fails
     fail_when_locked: bool

-    def __init__(self, timeout: typing.Optional[float]=None, check_interval:
-        typing.Optional[float]=None, fail_when_locked: typing.Optional[bool
-        ]=None):
+    def __init__(
+        self,
+        timeout: typing.Optional[float] = None,
+        check_interval: typing.Optional[float] = None,
+        fail_when_locked: typing.Optional[bool] = None,
+    ):
         self.timeout = coalesce(timeout, DEFAULT_TIMEOUT)
         self.check_interval = coalesce(check_interval, DEFAULT_CHECK_INTERVAL)
-        self.fail_when_locked = coalesce(fail_when_locked,
-            DEFAULT_FAIL_WHEN_LOCKED)
-
-    def __enter__(self) ->typing.IO[typing.AnyStr]:
+        self.fail_when_locked = coalesce(
+            fail_when_locked,
+            DEFAULT_FAIL_WHEN_LOCKED,
+        )
+
+    @abc.abstractmethod
+    def acquire(
+        self,
+        timeout: typing.Optional[float] = None,
+        check_interval: typing.Optional[float] = None,
+        fail_when_locked: typing.Optional[bool] = None,
+    ) -> typing.IO[typing.AnyStr]: ...
+
+    def _timeout_generator(
+        self,
+        timeout: typing.Optional[float],
+        check_interval: typing.Optional[float],
+    ) -> typing.Iterator[int]:
+        f_timeout = coalesce(timeout, self.timeout, 0.0)
+        f_check_interval = coalesce(check_interval, self.check_interval, 0.0)
+
+        yield 0
+        i = 0
+
+        start_time = time.perf_counter()
+        while start_time + f_timeout > time.perf_counter():
+            i += 1
+            yield i
+
+            # Take low lock checks into account to stay within the interval
+            since_start_time = time.perf_counter() - start_time
+            time.sleep(max(0.001, (i * f_check_interval) - since_start_time))
+
+    @abc.abstractmethod
+    def release(self): ...
+
+    def __enter__(self) -> typing.IO[typing.AnyStr]:
         return self.acquire()

-    def __exit__(self, exc_type: typing.Optional[typing.Type[BaseException]
-        ], exc_value: typing.Optional[BaseException], traceback: typing.Any
-        ) ->typing.Optional[bool]:
+    def __exit__(
+        self,
+        exc_type: typing.Optional[typing.Type[BaseException]],
+        exc_value: typing.Optional[BaseException],
+        traceback: typing.Any,  # Should be typing.TracebackType
+    ) -> typing.Optional[bool]:
         self.release()
         return None

@@ -101,7 +174,7 @@ class LockBase(abc.ABC):


 class Lock(LockBase):
-    """Lock manager with built-in timeout
+    '''Lock manager with built-in timeout

     Args:
         filename: filename
@@ -119,22 +192,32 @@ class Lock(LockBase):

     Note that the file is opened first and locked later. So using 'w' as
     mode will result in truncate _BEFORE_ the lock is checked.
-    """
-
-    def __init__(self, filename: Filename, mode: str='a', timeout: typing.
-        Optional[float]=None, check_interval: float=DEFAULT_CHECK_INTERVAL,
-        fail_when_locked: bool=DEFAULT_FAIL_WHEN_LOCKED, flags: constants.
-        LockFlags=LOCK_METHOD, **file_open_kwargs):
+    '''
+
+    def __init__(
+        self,
+        filename: Filename,
+        mode: str = 'a',
+        timeout: typing.Optional[float] = None,
+        check_interval: float = DEFAULT_CHECK_INTERVAL,
+        fail_when_locked: bool = DEFAULT_FAIL_WHEN_LOCKED,
+        flags: constants.LockFlags = LOCK_METHOD,
+        **file_open_kwargs,
+    ):
         if 'w' in mode:
             truncate = True
             mode = mode.replace('w', 'a')
         else:
             truncate = False
+
         if timeout is None:
             timeout = DEFAULT_TIMEOUT
-        elif not flags & constants.LockFlags.NON_BLOCKING:
-            warnings.warn('timeout has no effect in blocking mode',
-                stacklevel=1)
+        elif not (flags & constants.LockFlags.NON_BLOCKING):
+            warnings.warn(
+                'timeout has no effect in blocking mode',
+                stacklevel=1,
+            )
+
         self.fh: typing.Optional[typing.IO] = None
         self.filename: str = str(filename)
         self.mode: str = mode
@@ -145,68 +228,195 @@ class Lock(LockBase):
         self.flags: constants.LockFlags = flags
         self.file_open_kwargs = file_open_kwargs

-    def acquire(self, timeout: typing.Optional[float]=None, check_interval:
-        typing.Optional[float]=None, fail_when_locked: typing.Optional[bool
-        ]=None) ->typing.IO[typing.AnyStr]:
-        """Acquire the locked filehandle"""
-        pass
+    def acquire(
+        self,
+        timeout: typing.Optional[float] = None,
+        check_interval: typing.Optional[float] = None,
+        fail_when_locked: typing.Optional[bool] = None,
+    ) -> typing.IO[typing.AnyStr]:
+        '''Acquire the locked filehandle'''
+
+        fail_when_locked = coalesce(fail_when_locked, self.fail_when_locked)

-    def __enter__(self) ->typing.IO[typing.AnyStr]:
+        if (
+            not (self.flags & constants.LockFlags.NON_BLOCKING)
+            and timeout is not None
+        ):
+            warnings.warn(
+                'timeout has no effect in blocking mode',
+                stacklevel=1,
+            )
+
+        # If we already have a filehandle, return it
+        fh: typing.Optional[typing.IO] = self.fh
+        if fh:
+            return fh
+
+        # Get a new filehandler
+        fh = self._get_fh()
+
+        def try_close():  # pragma: no cover
+            # Silently try to close the handle if possible, ignore all issues
+            if fh is not None:
+                with contextlib.suppress(Exception):
+                    fh.close()
+
+        exception = None
+        # Try till the timeout has passed
+        for _ in self._timeout_generator(timeout, check_interval):
+            exception = None
+            try:
+                # Try to lock
+                fh = self._get_lock(fh)
+                break
+            except exceptions.LockException as exc:
+                # Python will automatically remove the variable from memory
+                # unless you save it in a different location
+                exception = exc
+
+                # We already tried to the get the lock
+                # If fail_when_locked is True, stop trying
+                if fail_when_locked:
+                    try_close()
+                    raise exceptions.AlreadyLocked(exception) from exc
+            except Exception as exc:
+                # Something went wrong with the locking mechanism.
+                # Wrap in a LockException and re-raise:
+                try_close()
+                raise exceptions.LockException(exc) from exc
+
+            # Wait a bit
+
+        if exception:
+            try_close()
+            # We got a timeout... reraising
+            raise exception
+
+        # Prepare the filehandle (truncate if needed)
+        fh = self._prepare_fh(fh)
+
+        self.fh = fh
+        return fh
+
+    def __enter__(self) -> typing.IO[typing.AnyStr]:
         return self.acquire()

     def release(self):
-        """Releases the currently locked file handle"""
-        pass
-
-    def _get_fh(self) ->typing.IO:
-        """Get a new filehandle"""
-        pass
-
-    def _get_lock(self, fh: typing.IO) ->typing.IO:
-        """
+        '''Releases the currently locked file handle'''
+        if self.fh:
+            portalocker.unlock(self.fh)
+            self.fh.close()
+            self.fh = None
+
+    def _get_fh(self) -> typing.IO:
+        '''Get a new filehandle'''
+        return open(  # noqa: SIM115
+            self.filename,
+            self.mode,
+            **self.file_open_kwargs,
+        )
+
+    def _get_lock(self, fh: typing.IO) -> typing.IO:
+        '''
         Try to lock the given filehandle

-        returns LockException if it fails"""
-        pass
+        returns LockException if it fails'''
+        portalocker.lock(fh, self.flags)
+        return fh

-    def _prepare_fh(self, fh: typing.IO) ->typing.IO:
-        """
+    def _prepare_fh(self, fh: typing.IO) -> typing.IO:
+        '''
         Prepare the filehandle for usage

         If truncate is a number, the file will be truncated to that amount of
         bytes
-        """
-        pass
+        '''
+        if self.truncate:
+            fh.seek(0)
+            fh.truncate(0)
+
+        return fh


 class RLock(Lock):
-    """
+    '''
     A reentrant lock, functions in a similar way to threading.RLock in that it
     can be acquired multiple times.  When the corresponding number of release()
     calls are made the lock will finally release the underlying file lock.
-    """
-
-    def __init__(self, filename, mode='a', timeout=DEFAULT_TIMEOUT,
-        check_interval=DEFAULT_CHECK_INTERVAL, fail_when_locked=False,
-        flags=LOCK_METHOD):
-        super().__init__(filename, mode, timeout, check_interval,
-            fail_when_locked, flags)
+    '''
+
+    def __init__(
+        self,
+        filename,
+        mode='a',
+        timeout=DEFAULT_TIMEOUT,
+        check_interval=DEFAULT_CHECK_INTERVAL,
+        fail_when_locked=False,
+        flags=LOCK_METHOD,
+    ):
+        super().__init__(
+            filename,
+            mode,
+            timeout,
+            check_interval,
+            fail_when_locked,
+            flags,
+        )
         self._acquire_count = 0

+    def acquire(
+        self,
+        timeout: typing.Optional[float] = None,
+        check_interval: typing.Optional[float] = None,
+        fail_when_locked: typing.Optional[bool] = None,
+    ) -> typing.IO:
+        if self._acquire_count >= 1:
+            fh = self.fh
+        else:
+            fh = super().acquire(timeout, check_interval, fail_when_locked)
+        self._acquire_count += 1
+        assert fh
+        return fh

-class TemporaryFileLock(Lock):
+    def release(self):
+        if self._acquire_count == 0:
+            raise exceptions.LockException(
+                'Cannot release more times than acquired',
+            )
+
+        if self._acquire_count == 1:
+            super().release()
+        self._acquire_count -= 1

-    def __init__(self, filename='.lock', timeout=DEFAULT_TIMEOUT,
-        check_interval=DEFAULT_CHECK_INTERVAL, fail_when_locked=True, flags
-        =LOCK_METHOD):
-        Lock.__init__(self, filename=filename, mode='w', timeout=timeout,
-            check_interval=check_interval, fail_when_locked=
-            fail_when_locked, flags=flags)
+
+class TemporaryFileLock(Lock):
+    def __init__(
+        self,
+        filename='.lock',
+        timeout=DEFAULT_TIMEOUT,
+        check_interval=DEFAULT_CHECK_INTERVAL,
+        fail_when_locked=True,
+        flags=LOCK_METHOD,
+    ):
+        Lock.__init__(
+            self,
+            filename=filename,
+            mode='w',
+            timeout=timeout,
+            check_interval=check_interval,
+            fail_when_locked=fail_when_locked,
+            flags=flags,
+        )
         atexit.register(self.release)

+    def release(self):
+        Lock.release(self)
+        if os.path.isfile(self.filename):  # pragma: no branch
+            os.unlink(self.filename)
+

 class BoundedSemaphore(LockBase):
-    """
+    '''
     Bounded semaphore to prevent too many parallel processes from running

     This method is deprecated because multiple processes that are completely
@@ -219,29 +429,100 @@ class BoundedSemaphore(LockBase):
     'bounded_semaphore.00.lock'
     >>> str(sorted(semaphore.get_random_filenames())[1])
     'bounded_semaphore.01.lock'
-    """
+    '''
+
     lock: typing.Optional[Lock]

-    def __init__(self, maximum: int, name: str='bounded_semaphore',
-        filename_pattern: str='{name}.{number:02d}.lock', directory: str=
-        tempfile.gettempdir(), timeout: typing.Optional[float]=
-        DEFAULT_TIMEOUT, check_interval: typing.Optional[float]=
-        DEFAULT_CHECK_INTERVAL, fail_when_locked: typing.Optional[bool]=True):
+    def __init__(
+        self,
+        maximum: int,
+        name: str = 'bounded_semaphore',
+        filename_pattern: str = '{name}.{number:02d}.lock',
+        directory: str = tempfile.gettempdir(),
+        timeout: typing.Optional[float] = DEFAULT_TIMEOUT,
+        check_interval: typing.Optional[float] = DEFAULT_CHECK_INTERVAL,
+        fail_when_locked: typing.Optional[bool] = True,
+    ):
         self.maximum = maximum
         self.name = name
         self.filename_pattern = filename_pattern
         self.directory = directory
         self.lock: typing.Optional[Lock] = None
-        super().__init__(timeout=timeout, check_interval=check_interval,
-            fail_when_locked=fail_when_locked)
+        super().__init__(
+            timeout=timeout,
+            check_interval=check_interval,
+            fail_when_locked=fail_when_locked,
+        )
+
         if not name or name == 'bounded_semaphore':
             warnings.warn(
-                '`BoundedSemaphore` without an explicit `name` argument is deprecated, use NamedBoundedSemaphore'
-                , DeprecationWarning, stacklevel=1)
+                '`BoundedSemaphore` without an explicit `name` '
+                'argument is deprecated, use NamedBoundedSemaphore',
+                DeprecationWarning,
+                stacklevel=1,
+            )
+
+    def get_filenames(self) -> typing.Sequence[pathlib.Path]:
+        return [self.get_filename(n) for n in range(self.maximum)]
+
+    def get_random_filenames(self) -> typing.Sequence[pathlib.Path]:
+        filenames = list(self.get_filenames())
+        random.shuffle(filenames)
+        return filenames
+
+    def get_filename(self, number) -> pathlib.Path:
+        return pathlib.Path(self.directory) / self.filename_pattern.format(
+            name=self.name,
+            number=number,
+        )
+
+    def acquire(  # type: ignore[override]
+        self,
+        timeout: typing.Optional[float] = None,
+        check_interval: typing.Optional[float] = None,
+        fail_when_locked: typing.Optional[bool] = None,
+    ) -> typing.Optional[Lock]:
+        assert not self.lock, 'Already locked'
+
+        filenames = self.get_filenames()
+
+        for n in self._timeout_generator(timeout, check_interval):  # pragma:
+            logger.debug('trying lock (attempt %d) %r', n, filenames)
+            # no branch
+            if self.try_lock(filenames):  # pragma: no branch
+                return self.lock  # pragma: no cover
+
+        if fail_when_locked := coalesce(
+            fail_when_locked,
+            self.fail_when_locked,
+        ):
+            raise exceptions.AlreadyLocked()
+
+        return None
+
+    def try_lock(self, filenames: typing.Sequence[Filename]) -> bool:
+        filename: Filename
+        for filename in filenames:
+            logger.debug('trying lock for %r', filename)
+            self.lock = Lock(filename, fail_when_locked=True)
+            try:
+                self.lock.acquire()
+            except exceptions.AlreadyLocked:
+                self.lock = None
+            else:
+                logger.debug('locked %r', filename)
+                return True
+
+        return False
+
+    def release(self):  # pragma: no cover
+        if self.lock is not None:
+            self.lock.release()
+            self.lock = None


 class NamedBoundedSemaphore(BoundedSemaphore):
-    """
+    '''
     Bounded semaphore to prevent too many parallel processes from running

     It's also possible to specify a timeout when acquiring the lock to wait
@@ -263,14 +544,26 @@ class NamedBoundedSemaphore(BoundedSemaphore):
     >>> 'bounded_semaphore' in str(semaphore.get_filenames()[0])
     True

-    """
-
-    def __init__(self, maximum: int, name: typing.Optional[str]=None,
-        filename_pattern: str='{name}.{number:02d}.lock', directory: str=
-        tempfile.gettempdir(), timeout: typing.Optional[float]=
-        DEFAULT_TIMEOUT, check_interval: typing.Optional[float]=
-        DEFAULT_CHECK_INTERVAL, fail_when_locked: typing.Optional[bool]=True):
+    '''
+
+    def __init__(
+        self,
+        maximum: int,
+        name: typing.Optional[str] = None,
+        filename_pattern: str = '{name}.{number:02d}.lock',
+        directory: str = tempfile.gettempdir(),
+        timeout: typing.Optional[float] = DEFAULT_TIMEOUT,
+        check_interval: typing.Optional[float] = DEFAULT_CHECK_INTERVAL,
+        fail_when_locked: typing.Optional[bool] = True,
+    ):
         if name is None:
             name = 'bounded_semaphore.%d' % random.randint(0, 1000000)
-        super().__init__(maximum, name, filename_pattern, directory,
-            timeout, check_interval, fail_when_locked)
+        super().__init__(
+            maximum,
+            name,
+            filename_pattern,
+            directory,
+            timeout,
+            check_interval,
+            fail_when_locked,
+        )