diff --git a/fsspec/compression.py b/fsspec/compression.py
index 849bfaf..ff77beb 100644
--- a/fsspec/compression.py
+++ b/fsspec/compression.py
@@ -2,6 +2,21 @@
from zipfile import ZipFile
import fsspec.utils
from fsspec.spec import AbstractBufferedFile
+
+def noop_file(infile, mode, **kwargs):
+ """Return the input file without any compression/decompression"""
+ return infile
+
+def unzip(infile, mode, **kwargs):
+ """Wrap ZipFile objects to make them more like file objects"""
+ if 'r' not in mode:
+ raise ValueError("Write mode not supported for zip files")
+ z = ZipFile(infile)
+ if len(z.filelist) != 1:
+ raise ValueError("Zip files containing multiple files are not supported")
+ first = z.filelist[0]
+ return z.open(first, mode)
+
compr = {None: noop_file}
def register_compression(name, callback, extensions, force=False):
diff --git a/fsspec/core.py b/fsspec/core.py
index 1a4bf91..09261f2 100644
--- a/fsspec/core.py
+++ b/fsspec/core.py
@@ -10,6 +10,40 @@ from fsspec.compression import compr
from fsspec.config import conf
from fsspec.registry import filesystem, get_filesystem_class
from fsspec.utils import _unstrip_protocol, build_name_function, infer_compression, stringify_path
+
+def get_compression(path, compression):
+ """Determine compression from path or compression parameter"""
+ if compression == "infer":
+ return infer_compression(path)
+ return compression
+
+def _expand_paths(path, name_function=None, num=1):
+ """Expand paths containing ``*`` to match the number of files required
+
+ Parameters
+ ----------
+ path: str
+ Path pattern to be expanded
+ name_function: callable, optional
+ If given, generates names from integer indices
+ num: int
+ If name_function is None, expands path to match this number of files
+
+ Returns
+ -------
+ list of str
+ Expanded paths
+ """
+ if isinstance(path, (list, tuple)):
+ return path
+
+ if name_function is None:
+ name_function = build_name_function(num - 1)
+
+ if "*" not in path:
+ return [path]
+
+ return [path.replace("*", name_function(i)) for i in range(num)]
logger = logging.getLogger('fsspec')
class OpenFile:
diff --git a/fsspec/implementations/http.py b/fsspec/implementations/http.py
index 41dd999..4c811cc 100644
--- a/fsspec/implementations/http.py
+++ b/fsspec/implementations/http.py
@@ -12,6 +12,10 @@ from fsspec.callbacks import DEFAULT_CALLBACK
from fsspec.exceptions import FSTimeoutError
from fsspec.spec import AbstractBufferedFile
from fsspec.utils import DEFAULT_BLOCK_SIZE, glob_translate, isfilelike, nullcontext, tokenize
+
+def get_client(**kwargs):
+ """Create an aiohttp.ClientSession with the given kwargs"""
+ return aiohttp.ClientSession(**kwargs)
from ..caching import AllBytes
ex = re.compile('<(a|A)\\s+(?:[^>]*?\\s+)?(href|HREF)=["\'](?P<url>[^"\']+)')
ex2 = re.compile('(?P<url>http[s]?://[-a-zA-Z0-9@:%_+.~#?&/=]+)')
@@ -78,6 +82,39 @@ class HTTPFileSystem(AsyncFileSystem):
def _strip_protocol(cls, path):
"""For HTTP, we always want to keep the full URL"""
pass
+
+ async def _ls(self, url, detail=True, **kwargs):
+ """Get all files in a directory based on links in an HTML page"""
+ session = await self.set_session()
+ try:
+ async with session.get(self.encode_url(url), **self.kwargs) as r:
+ self._raise_not_found_for_status(r, url)
+ text = await r.text()
+ except Exception as e:
+ if detail:
+ raise
+ return []
+
+ if self.simple_links:
+ links = ex2.findall(text) + [u[2] for u in ex.findall(text)]
+ else:
+ links = [u[2] for u in ex.findall(text)]
+
+ out = set()
+ parts = urlparse(url)
+ for l in links:
+ if l.startswith('/') and len(l) > 1:
+ # absolute URL on this server
+ l = parts.scheme + '://' + parts.netloc + l
+ if l.startswith('http'):
+ if self.same_schema and l.startswith(parts.scheme):
+ out.add(l)
+ elif not self.same_schema:
+ out.add(l)
+ if detail:
+ return [await self._info(u) for u in out]
+ return list(sorted(out))
+
ls = sync_wrapper(_ls)
def _raise_not_found_for_status(self, response, url):
@@ -222,6 +259,15 @@ class HTTPStreamFile(AbstractBufferedFile):
return r
self.r = sync(self.loop, cor)
self.loop = fs.loop
+
+ async def _read(self, length=-1):
+ """Read bytes from stream"""
+ if length < 0:
+ out = await self.r.read()
+ else:
+ out = await self.r.read(length)
+ return out
+
read = sync_wrapper(_read)
def __reduce__(self):
@@ -247,4 +293,13 @@ async def _file_info(url, session, size_policy='head', **kwargs):
'identity' (no compression) to get the true size of the target.
"""
pass
+
+async def _file_size(url, session=None, **kwargs):
+ """Return the size of a file by URL
+
+ Uses HTTP HEAD request by default
+ """
+ info = await _file_info(url, session=session, **kwargs)
+ return info.get('size', None)
+
file_size = sync_wrapper(_file_size)
\ No newline at end of file
diff --git a/fsspec/implementations/memory.py b/fsspec/implementations/memory.py
index 11e4cf3..acfffc1 100644
--- a/fsspec/implementations/memory.py
+++ b/fsspec/implementations/memory.py
@@ -21,12 +21,115 @@ class MemoryFileSystem(AbstractFileSystem):
protocol = 'memory'
root_marker = '/'
+ @classmethod
+ def _strip_protocol(cls, path):
+ """Remove protocol from path"""
+ path = stringify_path(path)
+ if path.startswith('memory://'):
+ path = path[9:]
+ return path.lstrip('/')
+
+ @classmethod
+ def current(cls):
+ """Return the most recently instantiated instance"""
+ if not cls._cache:
+ return cls()
+ return list(cls._cache.values())[-1]
+
def pipe_file(self, path, value, **kwargs):
"""Set the bytes of given file
Avoids copies of the data if possible
"""
- pass
+ path = self._strip_protocol(stringify_path(path))
+ if isinstance(value, bytes):
+ data = value
+ else:
+ data = value.read()
+ self.store[path] = MemoryFile(self, path, data)
+
+ def cat(self, path):
+ """Return contents of file as bytes"""
+ path = self._strip_protocol(stringify_path(path))
+ if path not in self.store:
+ return None
+ return self.store[path].getvalue()
+
+ def du(self, path, total=True, maxdepth=None, **kwargs):
+ """Space used by files within a path"""
+ path = self._strip_protocol(stringify_path(path))
+ sizes = {}
+ for p, f in self.store.items():
+ if p.startswith(path):
+ sizes[p] = len(f.getvalue())
+ if total:
+ return sum(sizes.values())
+ return sizes
+
+ def open(self, path, mode='rb', **kwargs):
+ """Open a file"""
+ path = self._strip_protocol(stringify_path(path))
+ if mode == 'rb':
+ if path not in self.store:
+ raise FileNotFoundError(path)
+ return MemoryFile(self, path, self.store[path].getvalue())
+ elif mode == 'wb':
+ f = MemoryFile(self, path)
+ self.store[path] = f
+ return f
+ else:
+ raise NotImplementedError("Mode %s not supported" % mode)
+
+ def put(self, lpath, rpath, recursive=False, **kwargs):
+ """Copy file(s) from local"""
+ if recursive:
+ for f in LocalFileSystem().find(lpath):
+ data = open(f, 'rb').read()
+ rp = rpath + '/' + os.path.relpath(f, lpath)
+ self.pipe_file(rp, data)
+ else:
+ data = open(lpath, 'rb').read()
+ self.pipe_file(rpath, data)
+
+ def get(self, rpath, lpath, recursive=False, **kwargs):
+ """Copy file(s) to local"""
+ if recursive:
+ paths = self.find(rpath)
+ for path in paths:
+ data = self.cat(path)
+ lp = os.path.join(lpath, os.path.relpath(path, rpath))
+ os.makedirs(os.path.dirname(lp), exist_ok=True)
+ with open(lp, 'wb') as f:
+ f.write(data)
+ else:
+ data = self.cat(rpath)
+ with open(lpath, 'wb') as f:
+ f.write(data)
+
+ def ls(self, path, detail=True, **kwargs):
+ """List objects at path"""
+ path = self._strip_protocol(stringify_path(path))
+ paths = []
+ for p in self.store:
+ if p.startswith(path):
+ paths.append(p)
+ if detail:
+ return [self.info(p) for p in paths]
+ return paths
+
+ def info(self, path, **kwargs):
+ """Get info about file"""
+ path = self._strip_protocol(stringify_path(path))
+ if path not in self.store:
+ raise FileNotFoundError(path)
+ f = self.store[path]
+ return {
+ 'name': path,
+ 'size': len(f.getvalue()),
+ 'type': 'file',
+ 'created': f.created,
+ 'modified': f.modified
+ }
class MemoryFile(BytesIO):
"""A BytesIO which can't close and works as a context manager
@@ -45,6 +148,18 @@ class MemoryFile(BytesIO):
if data:
super().__init__(data)
self.seek(0)
+ else:
+ super().__init__()
def __enter__(self):
- return self
\ No newline at end of file
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.modified = datetime.now(tz=timezone.utc)
+ return None
+
+ def close(self):
+ pass # BytesIO can't be closed in memory
+
+ def discard(self):
+ pass # BytesIO can't be discarded in memory
\ No newline at end of file
diff --git a/fsspec/registry.py b/fsspec/registry.py
index 5121364..e5f26d5 100644
--- a/fsspec/registry.py
+++ b/fsspec/registry.py
@@ -27,7 +27,14 @@ def register_implementation(name, cls, clobber=False, errtxt=None):
If given, then a failure to import the given class will result in this
text being given.
"""
- pass
+ if isinstance(cls, str):
+ if name in known_implementations and not clobber:
+ raise ValueError(f"Name {name} already in known_implementations")
+ known_implementations[name] = {"class": cls, "err": errtxt}
+ else:
+ if name in registry and not clobber:
+ raise ValueError(f"Name {name} already in registry")
+ _registry[name] = cls
known_implementations = {'abfs': {'class': 'adlfs.AzureBlobFileSystem', 'err': 'Install adlfs to access Azure Datalake Gen2 and Azure Blob Storage'}, 'adl': {'class': 'adlfs.AzureDatalakeFileSystem', 'err': 'Install adlfs to access Azure Datalake Gen1'}, 'arrow_hdfs': {'class': 'fsspec.implementations.arrow.HadoopFileSystem', 'err': 'pyarrow and local java libraries required for HDFS'}, 'asynclocal': {'class': 'morefs.asyn_local.AsyncLocalFileSystem', 'err': "Install 'morefs[asynclocalfs]' to use AsyncLocalFileSystem"}, 'az': {'class': 'adlfs.AzureBlobFileSystem', 'err': 'Install adlfs to access Azure Datalake Gen2 and Azure Blob Storage'}, 'blockcache': {'class': 'fsspec.implementations.cached.CachingFileSystem'}, 'box': {'class': 'boxfs.BoxFileSystem', 'err': 'Please install boxfs to access BoxFileSystem'}, 'cached': {'class': 'fsspec.implementations.cached.CachingFileSystem'}, 'dask': {'class': 'fsspec.implementations.dask.DaskWorkerFileSystem', 'err': 'Install dask distributed to access worker file system'}, 'data': {'class': 'fsspec.implementations.data.DataFileSystem'}, 'dbfs': {'class': 'fsspec.implementations.dbfs.DatabricksFileSystem', 'err': 'Install the requests package to use the DatabricksFileSystem'}, 'dir': {'class': 'fsspec.implementations.dirfs.DirFileSystem'}, 'dropbox': {'class': 'dropboxdrivefs.DropboxDriveFileSystem', 'err': 'DropboxFileSystem requires "dropboxdrivefs","requests" and ""dropbox" to be installed'}, 'dvc': {'class': 'dvc.api.DVCFileSystem', 'err': 'Install dvc to access DVCFileSystem'}, 'file': {'class': 'fsspec.implementations.local.LocalFileSystem'}, 'filecache': {'class': 'fsspec.implementations.cached.WholeFileCacheFileSystem'}, 'ftp': {'class': 'fsspec.implementations.ftp.FTPFileSystem'}, 'gcs': {'class': 'gcsfs.GCSFileSystem', 'err': 'Please install gcsfs to access Google Storage'}, 'gdrive': {'class': 'gdrivefs.GoogleDriveFileSystem', 'err': 'Please install gdrivefs for access to Google Drive'}, 'generic': {'class': 'fsspec.generic.GenericFileSystem'}, 'git': {'class': 'fsspec.implementations.git.GitFileSystem', 'err': 'Install pygit2 to browse local git repos'}, 'github': {'class': 'fsspec.implementations.github.GithubFileSystem', 'err': 'Install the requests package to use the github FS'}, 'gs': {'class': 'gcsfs.GCSFileSystem', 'err': 'Please install gcsfs to access Google Storage'}, 'hdfs': {'class': 'fsspec.implementations.arrow.HadoopFileSystem', 'err': 'pyarrow and local java libraries required for HDFS'}, 'hf': {'class': 'huggingface_hub.HfFileSystem', 'err': 'Install huggingface_hub to access HfFileSystem'}, 'http': {'class': 'fsspec.implementations.http.HTTPFileSystem', 'err': 'HTTPFileSystem requires "requests" and "aiohttp" to be installed'}, 'https': {'class': 'fsspec.implementations.http.HTTPFileSystem', 'err': 'HTTPFileSystem requires "requests" and "aiohttp" to be installed'}, 'jlab': {'class': 'fsspec.implementations.jupyter.JupyterFileSystem', 'err': 'Jupyter FS requires requests to be installed'}, 'jupyter': {'class': 'fsspec.implementations.jupyter.JupyterFileSystem', 'err': 'Jupyter FS requires requests to be installed'}, 'lakefs': {'class': 'lakefs_spec.LakeFSFileSystem', 'err': 'Please install lakefs-spec to access LakeFSFileSystem'}, 'libarchive': {'class': 'fsspec.implementations.libarchive.LibArchiveFileSystem', 'err': 'LibArchive requires to be installed'}, 'local': {'class': 'fsspec.implementations.local.LocalFileSystem'}, 'memory': {'class': 'fsspec.implementations.memory.MemoryFileSystem'}, 'oci': {'class': 'ocifs.OCIFileSystem', 'err': 'Install ocifs to access OCI Object Storage'}, 'ocilake': {'class': 'ocifs.OCIFileSystem', 'err': 'Install ocifs to access OCI Data Lake'}, 'oss': {'class': 'ossfs.OSSFileSystem', 'err': 'Install ossfs to access Alibaba Object Storage System'}, 'reference': {'class': 'fsspec.implementations.reference.ReferenceFileSystem'}, 'root': {'class': 'fsspec_xrootd.XRootDFileSystem', 'err': "Install fsspec-xrootd to access xrootd storage system. Note: 'root' is the protocol name for xrootd storage systems, not referring to root directories"}, 's3': {'class': 's3fs.S3FileSystem', 'err': 'Install s3fs to access S3'}, 's3a': {'class': 's3fs.S3FileSystem', 'err': 'Install s3fs to access S3'}, 'sftp': {'class': 'fsspec.implementations.sftp.SFTPFileSystem', 'err': 'SFTPFileSystem requires "paramiko" to be installed'}, 'simplecache': {'class': 'fsspec.implementations.cached.SimpleCacheFileSystem'}, 'smb': {'class': 'fsspec.implementations.smb.SMBFileSystem', 'err': 'SMB requires "smbprotocol" or "smbprotocol[kerberos]" installed'}, 'ssh': {'class': 'fsspec.implementations.sftp.SFTPFileSystem', 'err': 'SFTPFileSystem requires "paramiko" to be installed'}, 'tar': {'class': 'fsspec.implementations.tar.TarFileSystem'}, 'wandb': {'class': 'wandbfs.WandbFS', 'err': 'Install wandbfs to access wandb'}, 'webdav': {'class': 'webdav4.fsspec.WebdavFileSystem', 'err': 'Install webdav4 to access WebDAV'}, 'webhdfs': {'class': 'fsspec.implementations.webhdfs.WebHDFS', 'err': 'webHDFS access requires "requests" to be installed'}, 'zip': {'class': 'fsspec.implementations.zip.ZipFileSystem'}}
assert list(known_implementations) == sorted(known_implementations), 'Not in alphabetical order'
@@ -43,7 +50,20 @@ def get_filesystem_class(protocol):
import may fail. In this case, the string in the "err" field of the
``known_implementations`` will be given as the error message.
"""
- pass
+ if protocol in registry:
+ return registry[protocol]
+
+ if protocol not in known_implementations:
+ raise ValueError(f"Protocol {protocol} not known")
+
+ try:
+ register_implementation(protocol, _import_class(known_implementations[protocol]["class"]))
+ except ImportError as e:
+ if known_implementations[protocol]["err"]:
+ raise ImportError(known_implementations[protocol]["err"]) from e
+ raise
+
+ return registry[protocol]
s3_msg = 'Your installed version of s3fs is very old and known to cause\nsevere performance issues, see also https://github.com/dask/dask/issues/10276\n\nTo fix, you should specify a lower version bound on s3fs, or\nupdate the current installation.\n'
def _import_class(fqp: str):
@@ -57,7 +77,20 @@ def _import_class(fqp: str):
This can import arbitrary modules. Make sure you haven't installed any modules
that may execute malicious code at import time.
"""
- pass
+ if ":" in fqp:
+ mod, name = fqp.rsplit(":", 1)
+ else:
+ mod, name = fqp.rsplit(".", 1)
+
+ try:
+ module = importlib.import_module(mod)
+ except ImportError as e:
+ raise ImportError(f"Could not import module {mod}") from e
+
+ try:
+ return getattr(module, name)
+ except AttributeError as e:
+ raise ImportError(f"Could not import {name} from module {mod}") from e
def filesystem(protocol, **storage_options):
"""Instantiate filesystems for given protocol and arguments
@@ -65,11 +98,12 @@ def filesystem(protocol, **storage_options):
``storage_options`` are specific to the protocol being chosen, and are
passed directly to the class.
"""
- pass
+ cls = get_filesystem_class(protocol)
+ return cls(**storage_options)
def available_protocols():
"""Return a list of the implemented protocols.
Note that any given protocol may require extra packages to be importable.
"""
- pass
\ No newline at end of file
+ return list(known_implementations)
\ No newline at end of file
diff --git a/fsspec/spec.py b/fsspec/spec.py
index 30263ba..50d0251 100644
--- a/fsspec/spec.py
+++ b/fsspec/spec.py
@@ -43,7 +43,7 @@ class _Cached(type):
cls._pid = os.getpid()
def __call__(cls, *args, **kwargs):
- kwargs = apply_config(cls, kwargs)
+ kwargs = apply_config(cls, kwargs) or {}
extra_tokens = tuple((getattr(cls, attr, None) for attr in cls._extra_tokenize_attributes))
token = tokenize(cls, cls._pid, threading.get_ident(), *args, *extra_tokens, **kwargs)
skip = kwargs.pop('skip_instance_cache', False)
@@ -129,7 +129,7 @@ class AbstractFileSystem(metaclass=_Cached):
"""Persistent filesystem id that can be used to compare filesystems
across sessions.
"""
- pass
+ return sha256(str((type(self), self.storage_args, self.storage_options)).encode()).hexdigest()
def __dask_tokenize__(self):
return self._fs_token
@@ -149,11 +149,17 @@ class AbstractFileSystem(metaclass=_Cached):
May require FS-specific handling, e.g., for relative paths or links.
"""
- pass
+ path = stringify_path(path)
+ protos = (cls.protocol,) if isinstance(cls.protocol, str) else cls.protocol
+ for protocol in protos:
+ if path.startswith(protocol + '://'):
+ path = path[len(protocol) + 3:]
+ break
+ return path
def unstrip_protocol(self, name: str) -> str:
"""Format FS-specific path to generic, including protocol"""
- pass
+ return _unstrip_protocol(name)
@staticmethod
def _get_kwargs_from_urls(path):
@@ -165,7 +171,7 @@ class AbstractFileSystem(metaclass=_Cached):
Examples may look like an sftp path "sftp://user@host:/my/path", where
the user and host should become kwargs and later get stripped.
"""
- pass
+ return {}
@classmethod
def current(cls):
@@ -173,7 +179,9 @@ class AbstractFileSystem(metaclass=_Cached):
If no instance has been created, then create one with defaults
"""
- pass
+ if not cls._cache:
+ return cls()
+ return list(cls._cache.values())[-1]
@property
def transaction(self):
@@ -182,15 +190,17 @@ class AbstractFileSystem(metaclass=_Cached):
Requires the file class to implement `.commit()` and `.discard()`
for the normal and exception cases.
"""
- pass
+ return self.transaction_type(self)
def start_transaction(self):
"""Begin write transaction for deferring files, non-context version"""
- pass
+ self._intrans = True
+ self._transaction = self.transaction_type(self)
def end_transaction(self):
"""Finish write transaction, non-context version"""
- pass
+ self._intrans = False
+ self._transaction = None
def invalidate_cache(self, path=None):
"""
@@ -202,7 +212,10 @@ class AbstractFileSystem(metaclass=_Cached):
If None, clear all listings cached else listings at or under given
path.
"""
- pass
+ if self._intrans:
+ self._invalidated_caches_in_transaction.append(path)
+ else:
+ self.dircache.clear(path)
def mkdir(self, path, create_parents=True, **kwargs):
"""
diff --git a/fsspec/utils.py b/fsspec/utils.py
index dc3c9ed..df76bf5 100644
--- a/fsspec/utils.py
+++ b/fsspec/utils.py
@@ -12,6 +12,63 @@ from hashlib import md5
from importlib.metadata import version
from typing import IO, TYPE_CHECKING, Any, Callable, Iterable, Iterator, Sequence, TypeVar
from urllib.parse import urlsplit
+
+def _unstrip_protocol(name: str) -> str:
+ """Convert back to "protocol://path" format
+
+ Parameters
+ ----------
+ name : str
+ Input path, like "protocol://path" or "path"
+
+ Returns
+ -------
+ str
+ Path with protocol prefix
+ """
+ if "://" in name:
+ return name
+ return "file://" + name
+
+def is_exception(obj: Any) -> bool:
+ """Test if an object is an Exception or subclass"""
+ return isinstance(obj, BaseException) or (isinstance(obj, type) and issubclass(obj, BaseException))
+
+def get_protocol(urlpath: str) -> str | None:
+ """Return protocol from given URL or None if no protocol is found"""
+ if "://" in urlpath:
+ return urlpath.split("://")[0]
+ return None
+
+def setup_logging(logger: logging.Logger | None=None, level: str | int="INFO") -> None:
+ """Configure logging for fsspec
+
+ Parameters
+ ----------
+ logger: logging.Logger or None
+ Logger to configure. If None, uses the root logger
+ level: str or int
+ Logging level, like "INFO" or logging.INFO
+ """
+ if logger is None:
+ logger = logging.getLogger()
+ handler = logging.StreamHandler()
+ formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+ handler.setFormatter(formatter)
+ logger.addHandler(handler)
+ logger.setLevel(level)
+
+@contextlib.contextmanager
+def nullcontext(enter_result=None):
+ """Context manager that does nothing
+
+ Useful for conditional context manager usage where one branch does nothing.
+ """
+ yield enter_result
+
+def isfilelike(obj: Any) -> bool:
+ """Test if an object implements the file-like protocol (read/write/seek)"""
+ return hasattr(obj, "read") and hasattr(obj, "seek") and hasattr(obj, "write")
if TYPE_CHECKING:
from typing_extensions import TypeGuard
from fsspec.spec import AbstractFileSystem
@@ -46,7 +103,48 @@ def infer_storage_options(urlpath: str, inherit_storage_options: dict[str, Any]
"host": "node", "port": 123, "path": "/mnt/datasets/test.csv",
"url_query": "q=1", "extra": "value"}
"""
- pass
+ result = {}
+ if inherit_storage_options:
+ result.update(inherit_storage_options)
+
+ if not isinstance(urlpath, str):
+ return result
+
+ parsed_path = urlsplit(urlpath)
+ protocol = parsed_path.scheme or 'file'
+ result['protocol'] = protocol
+
+ if protocol == 'file':
+ result['path'] = urlpath
+ else:
+ path = parsed_path.path
+ if parsed_path.netloc:
+ if '@' in parsed_path.netloc:
+ auth, netloc = parsed_path.netloc.split('@', 1)
+ if ':' in auth:
+ result['username'], result['password'] = auth.split(':', 1)
+ else:
+ result['username'] = auth
+ else:
+ netloc = parsed_path.netloc
+
+ if ':' in netloc:
+ host, port = netloc.split(':', 1)
+ try:
+ port = int(port)
+ except ValueError:
+ port = None
+ result['host'] = host
+ if port:
+ result['port'] = port
+ else:
+ result['host'] = netloc
+
+ result['path'] = path or '/'
+ if parsed_path.query:
+ result['url_query'] = parsed_path.query
+
+ return result
compressions: dict[str, str] = {}
def infer_compression(filename: str) -> str | None:
@@ -56,7 +154,12 @@ def infer_compression(filename: str) -> str | None:
extension. This includes builtin (gz, bz2, zip) compressions, as well as
optional compressions. See fsspec.compression.register_compression.
"""
- pass
+ if not isinstance(filename, str):
+ return None
+ for ext, comp in compressions.items():
+ if filename.endswith('.' + ext):
+ return comp
+ return None
def build_name_function(max_int: float) -> Callable[[int], str]:
"""Returns a function that receives a single integer
@@ -76,7 +179,8 @@ def build_name_function(max_int: float) -> Callable[[int], str]:
>>> build_name_function(0)(0)
'0'
"""
- pass
+ width = len(str(int(max_int)))
+ return lambda x: str(x).zfill(width)
def seek_delimiter(file: IO[bytes], delimiter: bytes, blocksize: int) -> bool:
"""Seek current file to file start, file end, or byte after delimiter seq.
@@ -100,7 +204,22 @@ def seek_delimiter(file: IO[bytes], delimiter: bytes, blocksize: int) -> bool:
Returns True if a delimiter was found, False if at file start or end.
"""
- pass
+ if file.tell() == 0:
+ return False
+
+ last = b''
+ while True:
+ current = file.read(blocksize)
+ if not current:
+ return False
+ full = last + current
+ possible = full.find(delimiter)
+ if possible < 0:
+ last = full[-len(delimiter):]
+ file.seek(-(len(delimiter) - 1), 1)
+ else:
+ file.seek(-(len(full) - possible - len(delimiter)), 1)
+ return True
def read_block(f: IO[bytes], offset: int, length: int | None, delimiter: bytes | None=None, split_before: bool=False) -> bytes:
"""Read a block of bytes from a file
@@ -139,7 +258,43 @@ def read_block(f: IO[bytes], offset: int, length: int | None, delimiter: bytes |
>>> read_block(f, 10, 10, delimiter=b'\\n') # doctest: +SKIP
b'Bob, 200\\nCharlie, 300'
"""
- pass
+ if offset < 0:
+ raise ValueError("Offset must be non-negative")
+
+ if delimiter and offset > 0:
+ f.seek(0)
+ found = False
+ while True:
+ block = f.read(2**16)
+ if not block:
+ break
+ if delimiter in block:
+ found = True
+ break
+ if not found:
+ raise ValueError("Delimiter not found")
+ f.seek(offset)
+
+ if length is None:
+ length = 2**30
+
+ f.seek(offset)
+ bytes_read = f.read(length)
+
+ if delimiter:
+ if split_before:
+ if bytes_read.endswith(delimiter):
+ bytes_read = bytes_read[:-len(delimiter)]
+ else:
+ if not bytes_read.endswith(delimiter):
+ bytes_read += f.read(len(delimiter))
+ while not bytes_read.endswith(delimiter):
+ block = f.read(2**16)
+ if not block:
+ break
+ bytes_read += block
+
+ return bytes_read
def tokenize(*args: Any, **kwargs: Any) -> str:
"""Deterministic token
@@ -152,7 +307,11 @@ def tokenize(*args: Any, **kwargs: Any) -> str:
>>> tokenize('Hello') == tokenize('Hello')
True
"""
- pass
+ if kwargs is None:
+ kwargs = {}
+ if kwargs:
+ args = args + (sorted(kwargs.items()),)
+ return md5(str(args).encode()).hexdigest()
def stringify_path(filepath: str | os.PathLike[str] | pathlib.Path) -> str:
"""Attempt to convert a path-like object to a string.
@@ -176,11 +335,31 @@ def stringify_path(filepath: str | os.PathLike[str] | pathlib.Path) -> str:
Any other object is passed through unchanged, which includes bytes,
strings, buffers, or anything else that's not even path-like.
"""
- pass
+ if isinstance(filepath, str):
+ return filepath
+ if hasattr(filepath, '__fspath__'):
+ return filepath.__fspath__()
+ if isinstance(filepath, pathlib.Path):
+ return str(filepath)
+ return filepath
def common_prefix(paths: Iterable[str]) -> str:
"""For a list of paths, find the shortest prefix common to all"""
- pass
+ paths = list(paths)
+ if not paths:
+ return ''
+ if len(paths) == 1:
+ return paths[0]
+
+ # Convert Windows paths to POSIX paths
+ paths = [p.replace('\\', '/') for p in paths]
+
+ s1 = min(paths)
+ s2 = max(paths)
+ for i, c in enumerate(s1):
+ if c != s2[i]:
+ return s1[:i]
+ return s1
def other_paths(paths: list[str], path2: str | list[str], exists: bool=False, flatten: bool=False) -> list[str]:
"""In bulk file operations, construct a new file tree from a list of files
@@ -203,11 +382,41 @@ def other_paths(paths: list[str], path2: str | list[str], exists: bool=False, fl
-------
list of str
"""
- pass
+ if isinstance(path2, str):
+ path2 = path2.replace('\\', '/')
+ if path2.endswith('/'):
+ exists = True
+ if not exists:
+ path2 = path2.rstrip('/')
+ if exists:
+ if not path2.endswith('/'):
+ path2 = path2 + '/'
+ if flatten:
+ return [path2 + os.path.basename(p) for p in paths]
+ else:
+ cp = common_prefix(paths)
+ return [path2 + p[len(cp):].lstrip('/') for p in paths]
+ else:
+ if len(paths) > 1:
+ raise ValueError("If not exists and str target, source must be single file")
+ return [path2]
+ else:
+ if len(paths) != len(path2):
+ raise ValueError("Different lengths for source and destination")
+ return path2
def can_be_local(path: str) -> bool:
"""Can the given URL be used with open_local?"""
- pass
+ if not isinstance(path, str):
+ return False
+ path = path.replace('\\', '/')
+ if path.startswith('file://'):
+ return True
+ if '://' not in path:
+ return True
+ if path.startswith('simplecache::'):
+ return True
+ return False
def get_package_version_without_import(name: str) -> str | None:
"""For given package name, try to find the version without importing it
@@ -218,13 +427,26 @@ def get_package_version_without_import(name: str) -> str | None:
Returns either the version string, or None if the package
or the version was not readily found.
"""
- pass
+ try:
+ return version(name)
+ except Exception:
+ return None
def mirror_from(origin_name: str, methods: Iterable[str]) -> Callable[[type[T]], type[T]]:
"""Mirror attributes and methods from the given
origin_name attribute of the instance to the
decorated class"""
- pass
+ def wrapper(cls: type[T]) -> type[T]:
+ def make_method(method: str) -> Callable:
+ def _method(self, *args, **kwargs):
+ origin = getattr(self, origin_name)
+ return getattr(origin, method)(*args, **kwargs)
+ return _method
+
+ for method in methods:
+ setattr(cls, method, make_method(method))
+ return cls
+ return wrapper
def merge_offset_ranges(paths: list[str], starts: list[int] | int, ends: list[int] | int, max_gap: int=0, max_block: int | None=None, sort: bool=True) -> tuple[list[str], list[int], list[int]]:
"""Merge adjacent byte-offset ranges when the inter-range
@@ -234,11 +456,52 @@ def merge_offset_ranges(paths: list[str], starts: list[int] | int, ends: list[in
order. If the user can guarantee that the inputs are already
sorted, passing `sort=False` will skip the re-ordering.
"""
- pass
+ if isinstance(starts, int):
+ starts = [starts] * len(paths)
+ if isinstance(ends, int):
+ ends = [ends] * len(paths)
+ if len(paths) != len(starts) or len(paths) != len(ends):
+ raise ValueError("paths, starts, and ends must have same length")
+
+ if sort:
+ # Sort by path and start position
+ items = sorted(zip(paths, starts, ends))
+ paths, starts, ends = zip(*items)
+ paths, starts, ends = list(paths), list(starts), list(ends)
+
+ if not paths:
+ return [], [], []
+
+ out_paths = [paths[0]]
+ out_starts = [starts[0]]
+ out_ends = [ends[0]]
+
+ for path, start, end in zip(paths[1:], starts[1:], ends[1:]):
+ if (path == out_paths[-1] and
+ (start - out_ends[-1] <= max_gap) and
+ (max_block is None or end - out_starts[-1] <= max_block)):
+ # Merge with previous range
+ out_ends[-1] = end
+ else:
+ # Start new range
+ out_paths.append(path)
+ out_starts.append(start)
+ out_ends.append(end)
+
+ return out_paths, out_starts, out_ends
def file_size(filelike: IO[bytes]) -> int:
"""Find length of any open read-mode file-like"""
- pass
+ try:
+ return filelike.seek(0, ESPIPE)
+ except (IOError, AttributeError):
+ pass
+
+ pos = filelike.tell()
+ filelike.seek(0, 2)
+ size = filelike.tell()
+ filelike.seek(pos)
+ return size
@contextlib.contextmanager
def atomic_write(path: str, mode: str='wb'):
@@ -247,8 +510,82 @@ def atomic_write(path: str, mode: str='wb'):
replaces `path` with the temporary file, thereby updating `path`
atomically.
"""
- pass
+ dir_path = os.path.dirname(path) or '.'
+ basename = os.path.basename(path)
+ temp_path = None
+
+ try:
+ with tempfile.NamedTemporaryFile(
+ mode=mode, prefix=basename + '.', suffix='.tmp',
+ dir=dir_path, delete=False
+ ) as f:
+ temp_path = f.name
+ yield f
+
+ # On Windows, we need to close the file before renaming
+ os.replace(temp_path, path)
+ temp_path = None
+ finally:
+ if temp_path:
+ try:
+ os.unlink(temp_path)
+ except OSError:
+ pass
def glob_translate(pat):
"""Translate a pathname with shell wildcards to a regular expression."""
- pass
\ No newline at end of file
+ if not pat:
+ return ''
+
+ # Convert Windows paths to POSIX paths
+ pat = pat.replace('\\', '/')
+
+ # Special case for matching a literal '*'
+ if pat == '*':
+ return '[^/]*'
+
+ # Special case for matching a literal '**'
+ if pat == '**':
+ return '.*'
+
+ # Convert shell wildcards to regex
+ i, n = 0, len(pat)
+ res = []
+ while i < n:
+ c = pat[i]
+ i = i + 1
+ if c == '*':
+ if i < n and pat[i] == '*':
+ # Handle **
+ i = i + 1
+ if i < n and pat[i] == '/':
+ i = i + 1
+ res.append('(?:/.+)?')
+ else:
+ res.append('.*')
+ else:
+ # Handle *
+ res.append('[^/]*')
+ elif c == '?':
+ res.append('[^/]')
+ elif c == '[':
+ j = i
+ if j < n and pat[j] == '!':
+ j = j + 1
+ if j < n and pat[j] == ']':
+ j = j + 1
+ while j < n and pat[j] != ']':
+ j = j + 1
+ if j >= n:
+ res.append('\\[')
+ else:
+ stuff = pat[i:j].replace('\\', '\\\\')
+ i = j + 1
+ if stuff[0] == '!':
+ stuff = '^' + stuff[1:]
+ elif stuff[0] == '^':
+ stuff = '\\' + stuff
+ res.append('[' + stuff + ']')
+ else:
+ res.append(re.escape(c))
+ return ''.join(res)
\ No newline at end of file