back to Reference (Gold) summary
Reference (Gold): portalocker
Pytest Summary for test portalocker_tests
status | count |
---|---|
passed | 40 |
total | 40 |
collected | 40 |
Failed pytests:
Patch diff
diff --git a/portalocker/__about__.py b/portalocker/__about__.py
index fc965bf..a0b817a 100644
--- a/portalocker/__about__.py
+++ b/portalocker/__about__.py
@@ -2,5 +2,5 @@ __package_name__ = 'portalocker'
__author__ = 'Rick van Hattem'
__email__ = 'wolph@wol.ph'
__version__ = '2.10.1'
-__description__ = 'Wraps the portalocker recipe for easy usage'
+__description__ = '''Wraps the portalocker recipe for easy usage'''
__url__ = 'https://github.com/WoLpH/portalocker'
diff --git a/portalocker/constants.py b/portalocker/constants.py
index 12b8ef6..2099f1f 100644
--- a/portalocker/constants.py
+++ b/portalocker/constants.py
@@ -1,4 +1,4 @@
-"""
+'''
Locking constants
Lock types:
@@ -13,27 +13,47 @@ Lock flags:
Manually unlock, only needed internally
- `UNBLOCK` unlock
-"""
+'''
+
import enum
import os
-if os.name == 'nt':
+
+# The actual tests will execute the code anyhow so the following code can
+# safely be ignored from the coverage tests
+if os.name == 'nt': # pragma: no cover
import msvcrt
- LOCK_EX = 1
- LOCK_SH = 2
- LOCK_NB = 4
- LOCK_UN = msvcrt.LK_UNLCK
-elif os.name == 'posix':
+
+ #: exclusive lock
+ LOCK_EX = 0x1
+ #: shared lock
+ LOCK_SH = 0x2
+ #: non-blocking
+ LOCK_NB = 0x4
+ #: unlock
+ LOCK_UN = msvcrt.LK_UNLCK # type: ignore
+
+elif os.name == 'posix': # pragma: no cover
import fcntl
+
+ #: exclusive lock
LOCK_EX = fcntl.LOCK_EX
+ #: shared lock
LOCK_SH = fcntl.LOCK_SH
+ #: non-blocking
LOCK_NB = fcntl.LOCK_NB
+ #: unlock
LOCK_UN = fcntl.LOCK_UN
-else:
+
+else: # pragma: no cover
raise RuntimeError('PortaLocker only defined for nt and posix platforms')
class LockFlags(enum.IntFlag):
+ #: exclusive lock
EXCLUSIVE = LOCK_EX
+ #: shared lock
SHARED = LOCK_SH
+ #: non-blocking
NON_BLOCKING = LOCK_NB
+ #: unlock
UNBLOCK = LOCK_UN
diff --git a/portalocker/exceptions.py b/portalocker/exceptions.py
index 9c73924..e871d13 100644
--- a/portalocker/exceptions.py
+++ b/portalocker/exceptions.py
@@ -1,11 +1,16 @@
import typing
-class BaseLockException(Exception):
+class BaseLockException(Exception): # noqa: N818
+ # Error codes:
LOCK_FAILED = 1
- def __init__(self, *args: typing.Any, fh: typing.Union[typing.IO, None,
- int]=None, **kwargs: typing.Any) ->None:
+ def __init__(
+ self,
+ *args: typing.Any,
+ fh: typing.Union[typing.IO, None, int] = None,
+ **kwargs: typing.Any,
+ ) -> None:
self.fh = fh
Exception.__init__(self, *args)
diff --git a/portalocker/portalocker.py b/portalocker/portalocker.py
index e718322..ceceeaa 100644
--- a/portalocker/portalocker.py
+++ b/portalocker/portalocker.py
@@ -1,25 +1,154 @@
import os
import typing
+
from . import constants, exceptions
+
+# Alias for readability. Due to import recursion issues we cannot do:
+# from .constants import LockFlags
LockFlags = constants.LockFlags
class HasFileno(typing.Protocol):
- pass
+ def fileno(self) -> int: ...
+
+LOCKER: typing.Optional[typing.Callable[
+ [typing.Union[int, HasFileno], int], typing.Any]] = None
-LOCKER: typing.Optional[typing.Callable[[typing.Union[int, HasFileno], int],
- typing.Any]] = None
-if os.name == 'nt':
+
+if os.name == 'nt': # pragma: no cover
import msvcrt
+
import pywintypes
import win32con
import win32file
import winerror
+
__overlapped = pywintypes.OVERLAPPED()
-elif os.name == 'posix':
+
+ def lock(file_: typing.Union[typing.IO, int], flags: LockFlags):
+ # Windows locking does not support locking through `fh.fileno()` so
+ # we cast it to make mypy and pyright happy
+ file_ = typing.cast(typing.IO, file_)
+
+ mode = 0
+ if flags & LockFlags.NON_BLOCKING:
+ mode |= win32con.LOCKFILE_FAIL_IMMEDIATELY
+
+ if flags & LockFlags.EXCLUSIVE:
+ mode |= win32con.LOCKFILE_EXCLUSIVE_LOCK
+
+ # Save the old position so we can go back to that position but
+ # still lock from the beginning of the file
+ savepos = file_.tell()
+ if savepos:
+ file_.seek(0)
+
+ os_fh = msvcrt.get_osfhandle(file_.fileno()) # type: ignore
+ try:
+ win32file.LockFileEx(os_fh, mode, 0, -0x10000, __overlapped)
+ except pywintypes.error as exc_value:
+ # error: (33, 'LockFileEx', 'The process cannot access the file
+ # because another process has locked a portion of the file.')
+ if exc_value.winerror == winerror.ERROR_LOCK_VIOLATION:
+ raise exceptions.AlreadyLocked(
+ exceptions.LockException.LOCK_FAILED,
+ exc_value.strerror,
+ fh=file_,
+ ) from exc_value
+ else:
+ # Q: Are there exceptions/codes we should be dealing with
+ # here?
+ raise
+ finally:
+ if savepos:
+ file_.seek(savepos)
+
+ def unlock(file_: typing.IO):
+ try:
+ savepos = file_.tell()
+ if savepos:
+ file_.seek(0)
+
+ os_fh = msvcrt.get_osfhandle(file_.fileno()) # type: ignore
+ try:
+ win32file.UnlockFileEx(
+ os_fh,
+ 0,
+ -0x10000,
+ __overlapped,
+ )
+ except pywintypes.error as exc:
+ if exc.winerror != winerror.ERROR_NOT_LOCKED:
+ # Q: Are there exceptions/codes we should be
+ # dealing with here?
+ raise
+ finally:
+ if savepos:
+ file_.seek(savepos)
+ except OSError as exc:
+ raise exceptions.LockException(
+ exceptions.LockException.LOCK_FAILED,
+ exc.strerror,
+ fh=file_,
+ ) from exc
+
+elif os.name == 'posix': # pragma: no cover
import errno
import fcntl
+
+ # The locking implementation.
+ # Expected values are either fcntl.flock() or fcntl.lockf(),
+ # but any callable that matches the syntax will be accepted.
LOCKER = fcntl.flock
-else:
+
+ def lock(file_: typing.Union[typing.IO, int], flags: LockFlags):
+ assert LOCKER is not None, 'We need a locking function in `LOCKER` '
+ # Locking with NON_BLOCKING without EXCLUSIVE or SHARED enabled
+ # results in an error
+ if (flags & LockFlags.NON_BLOCKING) and not flags & (
+ LockFlags.SHARED | LockFlags.EXCLUSIVE
+ ):
+ raise RuntimeError(
+ 'When locking in non-blocking mode the SHARED '
+ 'or EXCLUSIVE flag must be specified as well',
+ )
+
+ try:
+ LOCKER(file_, flags)
+ except OSError as exc_value:
+ # Python can use one of several different exception classes to
+ # represent timeout (most likely is BlockingIOError and IOError),
+ # but these errors may also represent other failures. On some
+ # systems, `IOError is OSError` which means checking for either
+ # IOError or OSError can mask other errors.
+ # The safest check is to catch OSError (from which the others
+ # inherit) and check the errno (which should be EACCESS or EAGAIN
+ # according to the spec).
+ if exc_value.errno in (errno.EACCES, errno.EAGAIN):
+ # A timeout exception, wrap this so the outer code knows to try
+ # again (if it wants to).
+ raise exceptions.AlreadyLocked(
+ exc_value,
+ fh=file_,
+ ) from exc_value
+ else:
+ # Something else went wrong; don't wrap this so we stop
+ # immediately.
+ raise exceptions.LockException(
+ exc_value,
+ fh=file_,
+ ) from exc_value
+ except EOFError as exc_value:
+ # On NFS filesystems, flock can raise an EOFError
+ raise exceptions.LockException(
+ exc_value,
+ fh=file_,
+ ) from exc_value
+
+ def unlock(file_: typing.IO):
+ assert LOCKER is not None, 'We need a locking function in `LOCKER` '
+ LOCKER(file_.fileno(), LockFlags.UNBLOCK)
+
+else: # pragma: no cover
raise RuntimeError('PortaLocker only defined for nt and posix platforms')
diff --git a/portalocker/redis.py b/portalocker/redis.py
index f02a9d9..11ee876 100644
--- a/portalocker/redis.py
+++ b/portalocker/redis.py
@@ -4,19 +4,28 @@ import logging
import random
import time
import typing
+
from redis import client
+
from . import exceptions, utils
+
logger = logging.getLogger(__name__)
+
DEFAULT_UNAVAILABLE_TIMEOUT = 1
DEFAULT_THREAD_SLEEP_TIME = 0.1
-class PubSubWorkerThread(client.PubSubWorkerThread):
- pass
+class PubSubWorkerThread(client.PubSubWorkerThread): # type: ignore
+ def run(self):
+ try:
+ super().run()
+ except Exception: # pragma: no cover
+ _thread.interrupt_main()
+ raise
class RedisLock(utils.LockBase):
- """
+ '''
An extremely reliable Redis lock based on pubsub with a keep-alive thread
As opposed to most Redis locking systems based on key/value pairs,
@@ -50,7 +59,8 @@ class RedisLock(utils.LockBase):
to override these you need to explicitly specify a value (e.g.
`health_check_interval=0`)
- """
+ '''
+
redis_kwargs: typing.Dict[str, typing.Any]
thread: typing.Optional[PubSubWorkerThread]
channel: str
@@ -58,26 +68,169 @@ class RedisLock(utils.LockBase):
connection: typing.Optional[client.Redis]
pubsub: typing.Optional[client.PubSub] = None
close_connection: bool
+
DEFAULT_REDIS_KWARGS: typing.ClassVar[typing.Dict[str, typing.Any]] = dict(
- health_check_interval=10)
-
- def __init__(self, channel: str, connection: typing.Optional[client.
- Redis]=None, timeout: typing.Optional[float]=None, check_interval:
- typing.Optional[float]=None, fail_when_locked: typing.Optional[bool
- ]=False, thread_sleep_time: float=DEFAULT_THREAD_SLEEP_TIME,
- unavailable_timeout: float=DEFAULT_UNAVAILABLE_TIMEOUT,
- redis_kwargs: typing.Optional[typing.Dict]=None):
+ health_check_interval=10,
+ )
+
+ def __init__(
+ self,
+ channel: str,
+ connection: typing.Optional[client.Redis] = None,
+ timeout: typing.Optional[float] = None,
+ check_interval: typing.Optional[float] = None,
+ fail_when_locked: typing.Optional[bool] = False,
+ thread_sleep_time: float = DEFAULT_THREAD_SLEEP_TIME,
+ unavailable_timeout: float = DEFAULT_UNAVAILABLE_TIMEOUT,
+ redis_kwargs: typing.Optional[typing.Dict] = None,
+ ):
+ # We don't want to close connections given as an argument
self.close_connection = not connection
+
self.thread = None
self.channel = channel
self.connection = connection
self.thread_sleep_time = thread_sleep_time
self.unavailable_timeout = unavailable_timeout
self.redis_kwargs = redis_kwargs or dict()
+
for key, value in self.DEFAULT_REDIS_KWARGS.items():
self.redis_kwargs.setdefault(key, value)
- super().__init__(timeout=timeout, check_interval=check_interval,
- fail_when_locked=fail_when_locked)
+
+ super().__init__(
+ timeout=timeout,
+ check_interval=check_interval,
+ fail_when_locked=fail_when_locked,
+ )
+
+ def get_connection(self) -> client.Redis:
+ if not self.connection:
+ self.connection = client.Redis(**self.redis_kwargs)
+
+ return self.connection
+
+ def channel_handler(self, message):
+ if message.get('type') != 'message': # pragma: no cover
+ return
+
+ try:
+ data = json.loads(message.get('data'))
+ except TypeError: # pragma: no cover
+ logger.debug('TypeError while parsing: %r', message)
+ return
+
+ assert self.connection is not None
+ self.connection.publish(data['response_channel'], str(time.time()))
+
+ @property
+ def client_name(self):
+ return f'{self.channel}-lock'
+
+ def acquire( # type: ignore[override]
+ self,
+ timeout: typing.Optional[float] = None,
+ check_interval: typing.Optional[float] = None,
+ fail_when_locked: typing.Optional[bool] = None,
+ ) -> 'RedisLock':
+ timeout = utils.coalesce(timeout, self.timeout, 0.0)
+ check_interval = utils.coalesce(
+ check_interval,
+ self.check_interval,
+ 0.0,
+ )
+ fail_when_locked = utils.coalesce(
+ fail_when_locked,
+ self.fail_when_locked,
+ )
+
+ assert not self.pubsub, 'This lock is already active'
+ connection = self.get_connection()
+
+ timeout_generator = self._timeout_generator(timeout, check_interval)
+ for _ in timeout_generator: # pragma: no branch
+ subscribers = connection.pubsub_numsub(self.channel)[0][1]
+
+ if subscribers:
+ logger.debug(
+ 'Found %d lock subscribers for %s',
+ subscribers,
+ self.channel,
+ )
+
+ if self.check_or_kill_lock(
+ connection,
+ self.unavailable_timeout,
+ ): # pragma: no branch
+ continue
+ else: # pragma: no cover
+ subscribers = 0
+
+ # Note: this should not be changed to an elif because the if
+ # above can still end up here
+ if not subscribers:
+ connection.client_setname(self.client_name)
+ self.pubsub = connection.pubsub()
+ self.pubsub.subscribe(**{self.channel: self.channel_handler})
+ self.thread = PubSubWorkerThread(
+ self.pubsub,
+ sleep_time=self.thread_sleep_time,
+ )
+ self.thread.start()
+
+ subscribers = connection.pubsub_numsub(self.channel)[0][1]
+ if subscribers == 1: # pragma: no branch
+ return self
+ else: # pragma: no cover
+ # Race condition, let's try again
+ self.release()
+
+ if fail_when_locked: # pragma: no cover
+ raise exceptions.AlreadyLocked(exceptions)
+
+ raise exceptions.AlreadyLocked(exceptions)
+
+ def check_or_kill_lock(self, connection, timeout):
+ # Random channel name to get messages back from the lock
+ response_channel = f'{self.channel}-{random.random()}'
+
+ pubsub = connection.pubsub()
+ pubsub.subscribe(response_channel)
+ connection.publish(
+ self.channel,
+ json.dumps(
+ dict(
+ response_channel=response_channel,
+ message='ping',
+ ),
+ ),
+ )
+
+ check_interval = min(self.thread_sleep_time, timeout / 10)
+ for _ in self._timeout_generator(
+ timeout,
+ check_interval,
+ ): # pragma: no branch
+ if pubsub.get_message(timeout=check_interval):
+ pubsub.close()
+ return True
+
+ for client_ in connection.client_list('pubsub'): # pragma: no cover
+ if client_.get('name') == self.client_name:
+ logger.warning('Killing unavailable redis client: %r', client_)
+ connection.client_kill_filter(client_.get('id'))
+ return None
+
+ def release(self):
+ if self.thread: # pragma: no branch
+ self.thread.stop()
+ self.thread.join()
+ self.thread = None
+ time.sleep(0.01)
+
+ if self.pubsub: # pragma: no branch
+ self.pubsub.unsubscribe(self.channel)
+ self.pubsub.close()
+ self.pubsub = None
def __del__(self):
self.release()
diff --git a/portalocker/utils.py b/portalocker/utils.py
index a482b38..5115b0e 100644
--- a/portalocker/utils.py
+++ b/portalocker/utils.py
@@ -9,18 +9,26 @@ import tempfile
import time
import typing
import warnings
+
from . import constants, exceptions, portalocker
+
logger = logging.getLogger(__name__)
+
DEFAULT_TIMEOUT = 5
DEFAULT_CHECK_INTERVAL = 0.25
DEFAULT_FAIL_WHEN_LOCKED = False
LOCK_METHOD = constants.LockFlags.EXCLUSIVE | constants.LockFlags.NON_BLOCKING
-__all__ = ['Lock', 'open_atomic']
+
+__all__ = [
+ 'Lock',
+ 'open_atomic',
+]
+
Filename = typing.Union[str, pathlib.Path]
-def coalesce(*args: typing.Any, test_value: typing.Any=None) ->typing.Any:
- """Simple coalescing function that returns the first value that is not
+def coalesce(*args: typing.Any, test_value: typing.Any = None) -> typing.Any:
+ '''Simple coalescing function that returns the first value that is not
equal to the `test_value`. Or `None` if no value is valid. Usually this
means that the last given value is the default value.
@@ -40,14 +48,16 @@ def coalesce(*args: typing.Any, test_value: typing.Any=None) ->typing.Any:
# This won't work because of the `is not test_value` type testing:
>>> coalesce([], dict(spam='eggs'), test_value=[])
[]
- """
- pass
+ '''
+ return next((arg for arg in args if arg is not test_value), None)
@contextlib.contextmanager
-def open_atomic(filename: Filename, binary: bool=True) ->typing.Iterator[typing
- .IO]:
- """Open a file for atomic writing. Instead of locking this method allows
+def open_atomic(
+ filename: Filename,
+ binary: bool = True,
+) -> typing.Iterator[typing.IO]:
+ '''Open a file for atomic writing. Instead of locking this method allows
you to write the entire file and move it to the actual location. Note that
this makes the assumption that a rename is atomic on your platform which
is generally the case but not a guarantee.
@@ -70,29 +80,92 @@ def open_atomic(filename: Filename, binary: bool=True) ->typing.Iterator[typing
... written = fh.write(b'test')
>>> assert path_filename.exists()
>>> path_filename.unlink()
- """
- pass
-
-
-class LockBase(abc.ABC):
+ '''
+ # `pathlib.Path` cast in case `path` is a `str`
+ path: pathlib.Path = pathlib.Path(filename)
+
+ assert not path.exists(), f'{path!r} exists'
+
+ # Create the parent directory if it doesn't exist
+ path.parent.mkdir(parents=True, exist_ok=True)
+
+ temp_fh = tempfile.NamedTemporaryFile(
+ mode=binary and 'wb' or 'w',
+ dir=str(path.parent),
+ delete=False,
+ )
+ yield temp_fh
+ temp_fh.flush()
+ os.fsync(temp_fh.fileno())
+ temp_fh.close()
+ try:
+ os.rename(temp_fh.name, path)
+ finally:
+ with contextlib.suppress(Exception):
+ os.remove(temp_fh.name)
+
+
+class LockBase(abc.ABC): # pragma: no cover
+ #: timeout when trying to acquire a lock
timeout: float
+ #: check interval while waiting for `timeout`
check_interval: float
+ #: skip the timeout and immediately fail if the initial lock fails
fail_when_locked: bool
- def __init__(self, timeout: typing.Optional[float]=None, check_interval:
- typing.Optional[float]=None, fail_when_locked: typing.Optional[bool
- ]=None):
+ def __init__(
+ self,
+ timeout: typing.Optional[float] = None,
+ check_interval: typing.Optional[float] = None,
+ fail_when_locked: typing.Optional[bool] = None,
+ ):
self.timeout = coalesce(timeout, DEFAULT_TIMEOUT)
self.check_interval = coalesce(check_interval, DEFAULT_CHECK_INTERVAL)
- self.fail_when_locked = coalesce(fail_when_locked,
- DEFAULT_FAIL_WHEN_LOCKED)
-
- def __enter__(self) ->typing.IO[typing.AnyStr]:
+ self.fail_when_locked = coalesce(
+ fail_when_locked,
+ DEFAULT_FAIL_WHEN_LOCKED,
+ )
+
+ @abc.abstractmethod
+ def acquire(
+ self,
+ timeout: typing.Optional[float] = None,
+ check_interval: typing.Optional[float] = None,
+ fail_when_locked: typing.Optional[bool] = None,
+ ) -> typing.IO[typing.AnyStr]: ...
+
+ def _timeout_generator(
+ self,
+ timeout: typing.Optional[float],
+ check_interval: typing.Optional[float],
+ ) -> typing.Iterator[int]:
+ f_timeout = coalesce(timeout, self.timeout, 0.0)
+ f_check_interval = coalesce(check_interval, self.check_interval, 0.0)
+
+ yield 0
+ i = 0
+
+ start_time = time.perf_counter()
+ while start_time + f_timeout > time.perf_counter():
+ i += 1
+ yield i
+
+ # Take low lock checks into account to stay within the interval
+ since_start_time = time.perf_counter() - start_time
+ time.sleep(max(0.001, (i * f_check_interval) - since_start_time))
+
+ @abc.abstractmethod
+ def release(self): ...
+
+ def __enter__(self) -> typing.IO[typing.AnyStr]:
return self.acquire()
- def __exit__(self, exc_type: typing.Optional[typing.Type[BaseException]
- ], exc_value: typing.Optional[BaseException], traceback: typing.Any
- ) ->typing.Optional[bool]:
+ def __exit__(
+ self,
+ exc_type: typing.Optional[typing.Type[BaseException]],
+ exc_value: typing.Optional[BaseException],
+ traceback: typing.Any, # Should be typing.TracebackType
+ ) -> typing.Optional[bool]:
self.release()
return None
@@ -101,7 +174,7 @@ class LockBase(abc.ABC):
class Lock(LockBase):
- """Lock manager with built-in timeout
+ '''Lock manager with built-in timeout
Args:
filename: filename
@@ -119,22 +192,32 @@ class Lock(LockBase):
Note that the file is opened first and locked later. So using 'w' as
mode will result in truncate _BEFORE_ the lock is checked.
- """
-
- def __init__(self, filename: Filename, mode: str='a', timeout: typing.
- Optional[float]=None, check_interval: float=DEFAULT_CHECK_INTERVAL,
- fail_when_locked: bool=DEFAULT_FAIL_WHEN_LOCKED, flags: constants.
- LockFlags=LOCK_METHOD, **file_open_kwargs):
+ '''
+
+ def __init__(
+ self,
+ filename: Filename,
+ mode: str = 'a',
+ timeout: typing.Optional[float] = None,
+ check_interval: float = DEFAULT_CHECK_INTERVAL,
+ fail_when_locked: bool = DEFAULT_FAIL_WHEN_LOCKED,
+ flags: constants.LockFlags = LOCK_METHOD,
+ **file_open_kwargs,
+ ):
if 'w' in mode:
truncate = True
mode = mode.replace('w', 'a')
else:
truncate = False
+
if timeout is None:
timeout = DEFAULT_TIMEOUT
- elif not flags & constants.LockFlags.NON_BLOCKING:
- warnings.warn('timeout has no effect in blocking mode',
- stacklevel=1)
+ elif not (flags & constants.LockFlags.NON_BLOCKING):
+ warnings.warn(
+ 'timeout has no effect in blocking mode',
+ stacklevel=1,
+ )
+
self.fh: typing.Optional[typing.IO] = None
self.filename: str = str(filename)
self.mode: str = mode
@@ -145,68 +228,195 @@ class Lock(LockBase):
self.flags: constants.LockFlags = flags
self.file_open_kwargs = file_open_kwargs
- def acquire(self, timeout: typing.Optional[float]=None, check_interval:
- typing.Optional[float]=None, fail_when_locked: typing.Optional[bool
- ]=None) ->typing.IO[typing.AnyStr]:
- """Acquire the locked filehandle"""
- pass
+ def acquire(
+ self,
+ timeout: typing.Optional[float] = None,
+ check_interval: typing.Optional[float] = None,
+ fail_when_locked: typing.Optional[bool] = None,
+ ) -> typing.IO[typing.AnyStr]:
+ '''Acquire the locked filehandle'''
+
+ fail_when_locked = coalesce(fail_when_locked, self.fail_when_locked)
- def __enter__(self) ->typing.IO[typing.AnyStr]:
+ if (
+ not (self.flags & constants.LockFlags.NON_BLOCKING)
+ and timeout is not None
+ ):
+ warnings.warn(
+ 'timeout has no effect in blocking mode',
+ stacklevel=1,
+ )
+
+ # If we already have a filehandle, return it
+ fh: typing.Optional[typing.IO] = self.fh
+ if fh:
+ return fh
+
+ # Get a new filehandler
+ fh = self._get_fh()
+
+ def try_close(): # pragma: no cover
+ # Silently try to close the handle if possible, ignore all issues
+ if fh is not None:
+ with contextlib.suppress(Exception):
+ fh.close()
+
+ exception = None
+ # Try till the timeout has passed
+ for _ in self._timeout_generator(timeout, check_interval):
+ exception = None
+ try:
+ # Try to lock
+ fh = self._get_lock(fh)
+ break
+ except exceptions.LockException as exc:
+ # Python will automatically remove the variable from memory
+ # unless you save it in a different location
+ exception = exc
+
+ # We already tried to the get the lock
+ # If fail_when_locked is True, stop trying
+ if fail_when_locked:
+ try_close()
+ raise exceptions.AlreadyLocked(exception) from exc
+ except Exception as exc:
+ # Something went wrong with the locking mechanism.
+ # Wrap in a LockException and re-raise:
+ try_close()
+ raise exceptions.LockException(exc) from exc
+
+ # Wait a bit
+
+ if exception:
+ try_close()
+ # We got a timeout... reraising
+ raise exception
+
+ # Prepare the filehandle (truncate if needed)
+ fh = self._prepare_fh(fh)
+
+ self.fh = fh
+ return fh
+
+ def __enter__(self) -> typing.IO[typing.AnyStr]:
return self.acquire()
def release(self):
- """Releases the currently locked file handle"""
- pass
-
- def _get_fh(self) ->typing.IO:
- """Get a new filehandle"""
- pass
-
- def _get_lock(self, fh: typing.IO) ->typing.IO:
- """
+ '''Releases the currently locked file handle'''
+ if self.fh:
+ portalocker.unlock(self.fh)
+ self.fh.close()
+ self.fh = None
+
+ def _get_fh(self) -> typing.IO:
+ '''Get a new filehandle'''
+ return open( # noqa: SIM115
+ self.filename,
+ self.mode,
+ **self.file_open_kwargs,
+ )
+
+ def _get_lock(self, fh: typing.IO) -> typing.IO:
+ '''
Try to lock the given filehandle
- returns LockException if it fails"""
- pass
+ returns LockException if it fails'''
+ portalocker.lock(fh, self.flags)
+ return fh
- def _prepare_fh(self, fh: typing.IO) ->typing.IO:
- """
+ def _prepare_fh(self, fh: typing.IO) -> typing.IO:
+ '''
Prepare the filehandle for usage
If truncate is a number, the file will be truncated to that amount of
bytes
- """
- pass
+ '''
+ if self.truncate:
+ fh.seek(0)
+ fh.truncate(0)
+
+ return fh
class RLock(Lock):
- """
+ '''
A reentrant lock, functions in a similar way to threading.RLock in that it
can be acquired multiple times. When the corresponding number of release()
calls are made the lock will finally release the underlying file lock.
- """
-
- def __init__(self, filename, mode='a', timeout=DEFAULT_TIMEOUT,
- check_interval=DEFAULT_CHECK_INTERVAL, fail_when_locked=False,
- flags=LOCK_METHOD):
- super().__init__(filename, mode, timeout, check_interval,
- fail_when_locked, flags)
+ '''
+
+ def __init__(
+ self,
+ filename,
+ mode='a',
+ timeout=DEFAULT_TIMEOUT,
+ check_interval=DEFAULT_CHECK_INTERVAL,
+ fail_when_locked=False,
+ flags=LOCK_METHOD,
+ ):
+ super().__init__(
+ filename,
+ mode,
+ timeout,
+ check_interval,
+ fail_when_locked,
+ flags,
+ )
self._acquire_count = 0
+ def acquire(
+ self,
+ timeout: typing.Optional[float] = None,
+ check_interval: typing.Optional[float] = None,
+ fail_when_locked: typing.Optional[bool] = None,
+ ) -> typing.IO:
+ if self._acquire_count >= 1:
+ fh = self.fh
+ else:
+ fh = super().acquire(timeout, check_interval, fail_when_locked)
+ self._acquire_count += 1
+ assert fh
+ return fh
-class TemporaryFileLock(Lock):
+ def release(self):
+ if self._acquire_count == 0:
+ raise exceptions.LockException(
+ 'Cannot release more times than acquired',
+ )
+
+ if self._acquire_count == 1:
+ super().release()
+ self._acquire_count -= 1
- def __init__(self, filename='.lock', timeout=DEFAULT_TIMEOUT,
- check_interval=DEFAULT_CHECK_INTERVAL, fail_when_locked=True, flags
- =LOCK_METHOD):
- Lock.__init__(self, filename=filename, mode='w', timeout=timeout,
- check_interval=check_interval, fail_when_locked=
- fail_when_locked, flags=flags)
+
+class TemporaryFileLock(Lock):
+ def __init__(
+ self,
+ filename='.lock',
+ timeout=DEFAULT_TIMEOUT,
+ check_interval=DEFAULT_CHECK_INTERVAL,
+ fail_when_locked=True,
+ flags=LOCK_METHOD,
+ ):
+ Lock.__init__(
+ self,
+ filename=filename,
+ mode='w',
+ timeout=timeout,
+ check_interval=check_interval,
+ fail_when_locked=fail_when_locked,
+ flags=flags,
+ )
atexit.register(self.release)
+ def release(self):
+ Lock.release(self)
+ if os.path.isfile(self.filename): # pragma: no branch
+ os.unlink(self.filename)
+
class BoundedSemaphore(LockBase):
- """
+ '''
Bounded semaphore to prevent too many parallel processes from running
This method is deprecated because multiple processes that are completely
@@ -219,29 +429,100 @@ class BoundedSemaphore(LockBase):
'bounded_semaphore.00.lock'
>>> str(sorted(semaphore.get_random_filenames())[1])
'bounded_semaphore.01.lock'
- """
+ '''
+
lock: typing.Optional[Lock]
- def __init__(self, maximum: int, name: str='bounded_semaphore',
- filename_pattern: str='{name}.{number:02d}.lock', directory: str=
- tempfile.gettempdir(), timeout: typing.Optional[float]=
- DEFAULT_TIMEOUT, check_interval: typing.Optional[float]=
- DEFAULT_CHECK_INTERVAL, fail_when_locked: typing.Optional[bool]=True):
+ def __init__(
+ self,
+ maximum: int,
+ name: str = 'bounded_semaphore',
+ filename_pattern: str = '{name}.{number:02d}.lock',
+ directory: str = tempfile.gettempdir(),
+ timeout: typing.Optional[float] = DEFAULT_TIMEOUT,
+ check_interval: typing.Optional[float] = DEFAULT_CHECK_INTERVAL,
+ fail_when_locked: typing.Optional[bool] = True,
+ ):
self.maximum = maximum
self.name = name
self.filename_pattern = filename_pattern
self.directory = directory
self.lock: typing.Optional[Lock] = None
- super().__init__(timeout=timeout, check_interval=check_interval,
- fail_when_locked=fail_when_locked)
+ super().__init__(
+ timeout=timeout,
+ check_interval=check_interval,
+ fail_when_locked=fail_when_locked,
+ )
+
if not name or name == 'bounded_semaphore':
warnings.warn(
- '`BoundedSemaphore` without an explicit `name` argument is deprecated, use NamedBoundedSemaphore'
- , DeprecationWarning, stacklevel=1)
+ '`BoundedSemaphore` without an explicit `name` '
+ 'argument is deprecated, use NamedBoundedSemaphore',
+ DeprecationWarning,
+ stacklevel=1,
+ )
+
+ def get_filenames(self) -> typing.Sequence[pathlib.Path]:
+ return [self.get_filename(n) for n in range(self.maximum)]
+
+ def get_random_filenames(self) -> typing.Sequence[pathlib.Path]:
+ filenames = list(self.get_filenames())
+ random.shuffle(filenames)
+ return filenames
+
+ def get_filename(self, number) -> pathlib.Path:
+ return pathlib.Path(self.directory) / self.filename_pattern.format(
+ name=self.name,
+ number=number,
+ )
+
+ def acquire( # type: ignore[override]
+ self,
+ timeout: typing.Optional[float] = None,
+ check_interval: typing.Optional[float] = None,
+ fail_when_locked: typing.Optional[bool] = None,
+ ) -> typing.Optional[Lock]:
+ assert not self.lock, 'Already locked'
+
+ filenames = self.get_filenames()
+
+ for n in self._timeout_generator(timeout, check_interval): # pragma:
+ logger.debug('trying lock (attempt %d) %r', n, filenames)
+ # no branch
+ if self.try_lock(filenames): # pragma: no branch
+ return self.lock # pragma: no cover
+
+ if fail_when_locked := coalesce(
+ fail_when_locked,
+ self.fail_when_locked,
+ ):
+ raise exceptions.AlreadyLocked()
+
+ return None
+
+ def try_lock(self, filenames: typing.Sequence[Filename]) -> bool:
+ filename: Filename
+ for filename in filenames:
+ logger.debug('trying lock for %r', filename)
+ self.lock = Lock(filename, fail_when_locked=True)
+ try:
+ self.lock.acquire()
+ except exceptions.AlreadyLocked:
+ self.lock = None
+ else:
+ logger.debug('locked %r', filename)
+ return True
+
+ return False
+
+ def release(self): # pragma: no cover
+ if self.lock is not None:
+ self.lock.release()
+ self.lock = None
class NamedBoundedSemaphore(BoundedSemaphore):
- """
+ '''
Bounded semaphore to prevent too many parallel processes from running
It's also possible to specify a timeout when acquiring the lock to wait
@@ -263,14 +544,26 @@ class NamedBoundedSemaphore(BoundedSemaphore):
>>> 'bounded_semaphore' in str(semaphore.get_filenames()[0])
True
- """
-
- def __init__(self, maximum: int, name: typing.Optional[str]=None,
- filename_pattern: str='{name}.{number:02d}.lock', directory: str=
- tempfile.gettempdir(), timeout: typing.Optional[float]=
- DEFAULT_TIMEOUT, check_interval: typing.Optional[float]=
- DEFAULT_CHECK_INTERVAL, fail_when_locked: typing.Optional[bool]=True):
+ '''
+
+ def __init__(
+ self,
+ maximum: int,
+ name: typing.Optional[str] = None,
+ filename_pattern: str = '{name}.{number:02d}.lock',
+ directory: str = tempfile.gettempdir(),
+ timeout: typing.Optional[float] = DEFAULT_TIMEOUT,
+ check_interval: typing.Optional[float] = DEFAULT_CHECK_INTERVAL,
+ fail_when_locked: typing.Optional[bool] = True,
+ ):
if name is None:
name = 'bounded_semaphore.%d' % random.randint(0, 1000000)
- super().__init__(maximum, name, filename_pattern, directory,
- timeout, check_interval, fail_when_locked)
+ super().__init__(
+ maximum,
+ name,
+ filename_pattern,
+ directory,
+ timeout,
+ check_interval,
+ fail_when_locked,
+ )