Skip to content

back to SWE-Agent summary

SWE-Agent: portalocker

Failed to run pytests for test portalocker_tests

ImportError while loading conftest '/testbed/portalocker_tests/conftest.py'.
portalocker_tests/conftest.py:8: in <module>
    from portalocker import utils
portalocker/__init__.py:37: in <module>
    lock = portalocker.lock
E   AttributeError: module 'portalocker.portalocker' has no attribute 'lock'

Patch diff

diff --git a/portalocker/utils.py b/portalocker/utils.py
index 3891691..1eb873c 100644
--- a/portalocker/utils.py
+++ b/portalocker/utils.py
@@ -40,7 +40,7 @@ def coalesce(*args: typing.Any, test_value: typing.Any=None) -> typing.Any:
     >>> coalesce([], dict(spam='eggs'), test_value=[])
     []
     """
-    pass
+    return next((arg for arg in args if arg is not test_value), None)

 @contextlib.contextmanager
 def open_atomic(filename: Filename, binary: bool=True) -> typing.Iterator[typing.IO]:
@@ -68,7 +68,20 @@ def open_atomic(filename: Filename, binary: bool=True) -> typing.Iterator[typing
     >>> assert path_filename.exists()
     >>> path_filename.unlink()
     """
-    pass
+    temp_dir = os.path.dirname(os.path.abspath(filename))
+    temp_file = tempfile.NamedTemporaryFile(
+        mode='wb' if binary else 'w',
+        dir=temp_dir,
+        delete=False
+    )
+    try:
+        yield temp_file
+        temp_file.close()
+        os.rename(temp_file.name, filename)
+    except:
+        temp_file.close()
+        os.unlink(temp_file.name)
+        raise

 class LockBase(abc.ABC):
     timeout: float
@@ -133,25 +146,62 @@ class Lock(LockBase):

     def acquire(self, timeout: typing.Optional[float]=None, check_interval: typing.Optional[float]=None, fail_when_locked: typing.Optional[bool]=None) -> typing.IO[typing.AnyStr]:
         """Acquire the locked filehandle"""
-        pass
+        if timeout is None:
+            timeout = self.timeout
+        if check_interval is None:
+            check_interval = self.check_interval
+        if fail_when_locked is None:
+            fail_when_locked = self.fail_when_locked
+
+        # Get a new filehandle
+        fh = self._get_fh()
+        try:
+            # Try to lock
+            fh = self._get_lock(fh)
+        except exceptions.LockException as exception:
+            # If we fail, close the file handle
+            fh.close()
+            if fail_when_locked:
+                raise
+            # Wait till we can acquire the lock
+            timeout_end = time.time() + timeout
+            while True:
+                time.sleep(check_interval)
+                fh = self._get_fh()
+                try:
+                    fh = self._get_lock(fh)
+                    break
+                except exceptions.LockException:
+                    fh.close()
+                    if time.time() > timeout_end:
+                        raise exceptions.LockException(exception)
+
+        # Prepare the filehandle (truncate if needed)
+        fh = self._prepare_fh(fh)
+        self.fh = fh
+        return fh

     def __enter__(self) -> typing.IO[typing.AnyStr]:
         return self.acquire()

     def release(self):
         """Releases the currently locked file handle"""
-        pass
+        if self.fh:
+            portalocker.unlock(self.fh)
+            self.fh.close()
+            self.fh = None

     def _get_fh(self) -> typing.IO:
         """Get a new filehandle"""
-        pass
+        return open(self.filename, self.mode, **self.file_open_kwargs)

     def _get_lock(self, fh: typing.IO) -> typing.IO:
         """
         Try to lock the given filehandle

         returns LockException if it fails"""
-        pass
+        portalocker.lock(fh, self.flags)
+        return fh

     def _prepare_fh(self, fh: typing.IO) -> typing.IO:
         """
@@ -160,7 +210,10 @@ class Lock(LockBase):
         If truncate is a number, the file will be truncated to that amount of
         bytes
         """
-        pass
+        if self.truncate:
+            fh.seek(0)
+            fh.truncate(0)
+        return fh

 class RLock(Lock):
     """
@@ -234,4 +287,4 @@ class NamedBoundedSemaphore(BoundedSemaphore):
     def __init__(self, maximum: int, name: typing.Optional[str]=None, filename_pattern: str='{name}.{number:02d}.lock', directory: str=tempfile.gettempdir(), timeout: typing.Optional[float]=DEFAULT_TIMEOUT, check_interval: typing.Optional[float]=DEFAULT_CHECK_INTERVAL, fail_when_locked: typing.Optional[bool]=True):
         if name is None:
             name = 'bounded_semaphore.%d' % random.randint(0, 1000000)
-        super().__init__(maximum, name, filename_pattern, directory, timeout, check_interval, fail_when_locked)
\ No newline at end of file
+        super().__init__(maximum, name, filename_pattern, directory, timeout, check_interval, fail_when_locked)