back to SWE-Agent summary
SWE-Agent: portalocker
Failed to run pytests for test portalocker_tests
ImportError while loading conftest '/testbed/portalocker_tests/conftest.py'.
portalocker_tests/conftest.py:8: in <module>
from portalocker import utils
portalocker/__init__.py:37: in <module>
lock = portalocker.lock
E AttributeError: module 'portalocker.portalocker' has no attribute 'lock'
Patch diff
diff --git a/portalocker/utils.py b/portalocker/utils.py
index 3891691..1eb873c 100644
--- a/portalocker/utils.py
+++ b/portalocker/utils.py
@@ -40,7 +40,7 @@ def coalesce(*args: typing.Any, test_value: typing.Any=None) -> typing.Any:
>>> coalesce([], dict(spam='eggs'), test_value=[])
[]
"""
- pass
+ return next((arg for arg in args if arg is not test_value), None)
@contextlib.contextmanager
def open_atomic(filename: Filename, binary: bool=True) -> typing.Iterator[typing.IO]:
@@ -68,7 +68,20 @@ def open_atomic(filename: Filename, binary: bool=True) -> typing.Iterator[typing
>>> assert path_filename.exists()
>>> path_filename.unlink()
"""
- pass
+ temp_dir = os.path.dirname(os.path.abspath(filename))
+ temp_file = tempfile.NamedTemporaryFile(
+ mode='wb' if binary else 'w',
+ dir=temp_dir,
+ delete=False
+ )
+ try:
+ yield temp_file
+ temp_file.close()
+ os.rename(temp_file.name, filename)
+ except:
+ temp_file.close()
+ os.unlink(temp_file.name)
+ raise
class LockBase(abc.ABC):
timeout: float
@@ -133,25 +146,62 @@ class Lock(LockBase):
def acquire(self, timeout: typing.Optional[float]=None, check_interval: typing.Optional[float]=None, fail_when_locked: typing.Optional[bool]=None) -> typing.IO[typing.AnyStr]:
"""Acquire the locked filehandle"""
- pass
+ if timeout is None:
+ timeout = self.timeout
+ if check_interval is None:
+ check_interval = self.check_interval
+ if fail_when_locked is None:
+ fail_when_locked = self.fail_when_locked
+
+ # Get a new filehandle
+ fh = self._get_fh()
+ try:
+ # Try to lock
+ fh = self._get_lock(fh)
+ except exceptions.LockException as exception:
+ # If we fail, close the file handle
+ fh.close()
+ if fail_when_locked:
+ raise
+ # Wait till we can acquire the lock
+ timeout_end = time.time() + timeout
+ while True:
+ time.sleep(check_interval)
+ fh = self._get_fh()
+ try:
+ fh = self._get_lock(fh)
+ break
+ except exceptions.LockException:
+ fh.close()
+ if time.time() > timeout_end:
+ raise exceptions.LockException(exception)
+
+ # Prepare the filehandle (truncate if needed)
+ fh = self._prepare_fh(fh)
+ self.fh = fh
+ return fh
def __enter__(self) -> typing.IO[typing.AnyStr]:
return self.acquire()
def release(self):
"""Releases the currently locked file handle"""
- pass
+ if self.fh:
+ portalocker.unlock(self.fh)
+ self.fh.close()
+ self.fh = None
def _get_fh(self) -> typing.IO:
"""Get a new filehandle"""
- pass
+ return open(self.filename, self.mode, **self.file_open_kwargs)
def _get_lock(self, fh: typing.IO) -> typing.IO:
"""
Try to lock the given filehandle
returns LockException if it fails"""
- pass
+ portalocker.lock(fh, self.flags)
+ return fh
def _prepare_fh(self, fh: typing.IO) -> typing.IO:
"""
@@ -160,7 +210,10 @@ class Lock(LockBase):
If truncate is a number, the file will be truncated to that amount of
bytes
"""
- pass
+ if self.truncate:
+ fh.seek(0)
+ fh.truncate(0)
+ return fh
class RLock(Lock):
"""
@@ -234,4 +287,4 @@ class NamedBoundedSemaphore(BoundedSemaphore):
def __init__(self, maximum: int, name: typing.Optional[str]=None, filename_pattern: str='{name}.{number:02d}.lock', directory: str=tempfile.gettempdir(), timeout: typing.Optional[float]=DEFAULT_TIMEOUT, check_interval: typing.Optional[float]=DEFAULT_CHECK_INTERVAL, fail_when_locked: typing.Optional[bool]=True):
if name is None:
name = 'bounded_semaphore.%d' % random.randint(0, 1000000)
- super().__init__(maximum, name, filename_pattern, directory, timeout, check_interval, fail_when_locked)
\ No newline at end of file
+ super().__init__(maximum, name, filename_pattern, directory, timeout, check_interval, fail_when_locked)