back to Claude Sonnet 3.5 - Fill-in summary
Claude Sonnet 3.5 - Fill-in: filesystem_spec
Failed to run pytests for test tests
ImportError while loading conftest '/testbed/fsspec/conftest.py'.
fsspec/__init__.py:6: in <module>
from .compression import available_compressions
fsspec/compression.py:4: in <module>
from fsspec.spec import AbstractBufferedFile
fsspec/spec.py:17: in <module>
from .utils import _unstrip_protocol, glob_translate, isfilelike, other_paths, read_block, stringify_path, tokenize
E ImportError: cannot import name '_unstrip_protocol' from 'fsspec.utils' (/testbed/fsspec/utils.py)
Patch diff
diff --git a/fsspec/archive.py b/fsspec/archive.py
index 1a4570f..7ae4806 100644
--- a/fsspec/archive.py
+++ b/fsspec/archive.py
@@ -23,5 +23,15 @@ class AbstractArchiveFileSystem(AbstractFileSystem):
Parameters
----------
paths: Iterable of path strings
+
+ Returns
+ -------
+ set
+ A set of all unique directory names
"""
- pass
+ dirnames = set()
+ for path in paths:
+ parts = path.split('/')
+ for i in range(1, len(parts)):
+ dirnames.add('/'.join(parts[:i]))
+ return dirnames
diff --git a/fsspec/asyn.py b/fsspec/asyn.py
index 551290c..1f44155 100644
--- a/fsspec/asyn.py
+++ b/fsspec/asyn.py
@@ -27,7 +27,10 @@ def get_lock():
The lock is allocated on first use to allow setting one lock per forked process.
"""
- pass
+ global _lock
+ if _lock is None:
+ _lock = threading.Lock()
+ return _lock
def reset_lock():
@@ -36,7 +39,8 @@ def reset_lock():
This should be called only on the init of a forked process to reset the lock to
None, enabling the new forked process to get a new lock.
"""
- pass
+ global _lock
+ _lock = None
def sync(loop, func, *args, timeout=None, **kwargs):
@@ -48,7 +52,29 @@ def sync(loop, func, *args, timeout=None, **kwargs):
>>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args,
timeout=timeout, **kwargs)
"""
- pass
+ e = threading.Event()
+ result = [None]
+ error = [False]
+
+ async def f():
+ try:
+ result[0] = await func(*args, **kwargs)
+ except Exception as ex:
+ result[0] = ex
+ error[0] = True
+ finally:
+ e.set()
+
+ asyncio.run_coroutine_threadsafe(f(), loop)
+ if timeout is not None:
+ if not e.wait(timeout):
+ raise FSTimeoutError("Timed out after %s seconds" % timeout)
+ else:
+ e.wait()
+
+ if error[0]:
+ raise result[0]
+ return result[0]
def sync_wrapper(func, obj=None):
@@ -57,7 +83,11 @@ def sync_wrapper(func, obj=None):
Leave obj=None if defining within a class. Pass the instance if attaching
as an attribute of the instance.
"""
- pass
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ self = obj or args[0]
+ return sync(self.loop, func, *args, **kwargs)
+ return wrapper
def get_loop():
@@ -65,7 +95,14 @@ def get_loop():
The loop will be running on a separate thread.
"""
- pass
+ if loop[0] is None:
+ with get_lock():
+ if loop[0] is None:
+ loop[0] = asyncio.new_event_loop()
+ th = threading.Thread(target=loop[0].run_forever, daemon=True)
+ th.start()
+ iothread[0] = th
+ return loop[0]
if TYPE_CHECKING:
@@ -83,9 +120,13 @@ _DEFAULT_BATCH_SIZE = 128
_NOFILES_DEFAULT_BATCH_SIZE = 1280
-def running_async() ->bool:
+def running_async() -> bool:
"""Being executed by an event loop?"""
- pass
+ try:
+ asyncio.get_running_loop()
+ return True
+ except RuntimeError:
+ return False
async def _run_coros_in_chunks(coros, batch_size=None, callback=
diff --git a/fsspec/caching.py b/fsspec/caching.py
index c4fc674..e8a7ba5 100644
--- a/fsspec/caching.py
+++ b/fsspec/caching.py
@@ -47,11 +47,15 @@ class BaseCache:
def _reset_stats(self) ->None:
"""Reset hit and miss counts for a more ganular report e.g. by file."""
- pass
+ self.hit_count = 0
+ self.miss_count = 0
+ self.total_requested_bytes = 0
def _log_stats(self) ->str:
"""Return a formatted string of the cache statistics."""
- pass
+ return (f"Cache hits: {self.hit_count}, "
+ f"Cache misses: {self.miss_count}, "
+ f"Total requested bytes: {self.total_requested_bytes}")
def __repr__(self) ->str:
return f"""
@@ -164,7 +168,7 @@ class BlockCache(BaseCache):
NamedTuple
Returned directly from the LRU Cache used internally.
"""
- pass
+ return self._fetch_block_cached.cache_info()
def __getstate__(self) ->dict[str, Any]:
state = self.__dict__
@@ -180,7 +184,9 @@ class BlockCache(BaseCache):
"""
Fetch the block of data for `block_number`.
"""
- pass
+ start = block_number * self.blocksize
+ end = min(start + self.blocksize, self.size)
+ return self.fetcher(start, end)
def _read_cache(self, start: int, end: int, start_block_number: int,
end_block_number: int) ->bytes:
@@ -194,7 +200,21 @@ class BlockCache(BaseCache):
start_block_number, end_block_number : int
The start and end block numbers.
"""
- pass
+ if start_block_number == end_block_number:
+ block = self._fetch_block_cached(start_block_number)
+ return block[start % self.blocksize:end % self.blocksize or None]
+
+ parts = []
+ for block_number in range(start_block_number, end_block_number + 1):
+ block = self._fetch_block_cached(block_number)
+ if block_number == start_block_number:
+ parts.append(block[start % self.blocksize:])
+ elif block_number == end_block_number:
+ parts.append(block[:end % self.blocksize or None])
+ else:
+ parts.append(block)
+
+ return b''.join(parts)
class BytesCache(BaseCache):
@@ -369,7 +389,7 @@ class BackgroundBlockCache(BaseCache):
NamedTuple
Returned directly from the LRU Cache used internally.
"""
- pass
+ return self._fetch_block_cached.cache_info()
def __getstate__(self) ->dict[str, Any]:
state = self.__dict__
@@ -393,7 +413,25 @@ class BackgroundBlockCache(BaseCache):
"""
Fetch the block of data for `block_number`.
"""
- pass
+ start = block_number * self.blocksize
+ end = min(start + self.blocksize, self.size)
+ logger.debug(f"Fetching block {block_number} ({log_info})")
+ data = self.fetcher(start, end)
+
+ with self._fetch_future_lock:
+ if (self._fetch_future_block_number is not None and
+ self._fetch_future_block_number != block_number + 1):
+ self._fetch_future.cancel()
+ self._fetch_future = None
+ self._fetch_future_block_number = None
+
+ if self._fetch_future_block_number is None and block_number + 1 < self.nblocks:
+ self._fetch_future_block_number = block_number + 1
+ self._fetch_future = self._thread_executor.submit(
+ self._fetch_block, block_number + 1, 'async'
+ )
+
+ return data
def _read_cache(self, start: int, end: int, start_block_number: int,
end_block_number: int) ->bytes:
@@ -407,7 +445,21 @@ class BackgroundBlockCache(BaseCache):
start_block_number, end_block_number : int
The start and end block numbers.
"""
- pass
+ if start_block_number == end_block_number:
+ block = self._fetch_block_cached(start_block_number)
+ return block[start % self.blocksize:end % self.blocksize or None]
+
+ parts = []
+ for block_number in range(start_block_number, end_block_number + 1):
+ block = self._fetch_block_cached(block_number)
+ if block_number == start_block_number:
+ parts.append(block[start % self.blocksize:])
+ elif block_number == end_block_number:
+ parts.append(block[:end % self.blocksize or None])
+ else:
+ parts.append(block)
+
+ return b''.join(parts)
caches: dict[str | None, type[BaseCache]] = {None: BaseCache}
@@ -426,7 +478,9 @@ def register_cache(cls: type[BaseCache], clobber: bool=False) ->None:
------
ValueError
"""
- pass
+ if cls.name in caches and not clobber:
+ raise ValueError(f"Cache {cls.name} already exists, use clobber=True to overwrite")
+ caches[cls.name] = cls
for c in (BaseCache, MMapCache, BytesCache, ReadAheadCache, BlockCache,
diff --git a/fsspec/callbacks.py b/fsspec/callbacks.py
index fd7312d..58de20f 100644
--- a/fsspec/callbacks.py
+++ b/fsspec/callbacks.py
@@ -36,7 +36,7 @@ class Callback:
def close(self):
"""Close callback."""
- pass
+ self.call(hook_name="close")
def branched(self, path_1, path_2, **kwargs):
"""
@@ -67,13 +67,20 @@ class Callback:
callback: Callback
A callback instance to be passed to the child method
"""
- pass
+ new_kwargs = kwargs.copy()
+ self.branch(path_1, path_2, new_kwargs)
+ return new_kwargs.get('callback', self)
def branch_coro(self, fn):
"""
Wraps a coroutine, and pass a new child callback to it.
"""
- pass
+ @wraps(fn)
+ async def wrapper(*args, **kwargs):
+ new_callback = self.branched(None, None)
+ kwargs['callback'] = new_callback
+ return await fn(*args, **kwargs)
+ return wrapper
def set_size(self, size):
"""
@@ -86,7 +93,8 @@ class Callback:
----------
size: int
"""
- pass
+ self.size = size
+ self.call()
def absolute_update(self, value):
"""
@@ -98,7 +106,8 @@ class Callback:
----------
value: int
"""
- pass
+ self.value = value
+ self.call()
def relative_update(self, inc=1):
"""
@@ -110,7 +119,8 @@ class Callback:
----------
inc: int
"""
- pass
+ self.value += inc
+ self.call()
def call(self, hook_name=None, **kwargs):
"""
@@ -124,7 +134,11 @@ class Callback:
If given, execute on this hook
kwargs: passed on to (all) hook(s)
"""
- pass
+ if hook_name is not None and hook_name in self.hooks:
+ self.hooks[hook_name](self.size, self.value, **self.kw, **kwargs)
+ else:
+ for hook in self.hooks.values():
+ hook(self.size, self.value, **self.kw, **kwargs)
def wrap(self, iterable):
"""
@@ -135,7 +149,9 @@ class Callback:
iterable: Iterable
The iterable that is being wrapped
"""
- pass
+ for item in iterable:
+ yield item
+ self.relative_update()
def branch(self, path_1, path_2, kwargs):
"""
@@ -159,7 +175,8 @@ class Callback:
-------
"""
- pass
+ # By default, use the same callback instance for child transfers
+ kwargs['callback'] = self
def __getattr__(self, item):
"""
@@ -175,7 +192,12 @@ class Callback:
``NoOpCallback``. This is an alternative to including
``callback=DEFAULT_CALLBACK`` directly in a method signature.
"""
- pass
+ if maybe_callback is None:
+ return DEFAULT_CALLBACK
+ elif isinstance(maybe_callback, Callback):
+ return maybe_callback
+ else:
+ return cls(maybe_callback)
class NoOpCallback(Callback):
@@ -198,11 +220,11 @@ class DotPrinterCallback(Callback):
def branch(self, path_1, path_2, kwargs):
"""Mutate kwargs to add new instance with different print char"""
- pass
+ kwargs['callback'] = DotPrinterCallback(chr_to_print='.')
def call(self, **kwargs):
"""Just outputs a character"""
- pass
+ print(self.chr, end='', flush=True)
class TqdmCallback(Callback):
@@ -275,7 +297,30 @@ class TqdmCallback(Callback):
super().__init__(*args, **kwargs)
def __del__(self):
- return self.close()
+ self.close()
+
+ def close(self):
+ if self.tqdm is not None:
+ self.tqdm.close()
+ self.tqdm = None
+
+ def set_size(self, size):
+ self.size = size
+ if self.tqdm is None:
+ self.tqdm = self._tqdm_cls(total=size, **self._tqdm_kwargs)
+ else:
+ self.tqdm.reset(total=size)
+
+ def absolute_update(self, value):
+ if self.tqdm is None:
+ self.tqdm = self._tqdm_cls(total=self.size, **self._tqdm_kwargs)
+ self.tqdm.n = value
+ self.tqdm.refresh()
+
+ def relative_update(self, inc=1):
+ if self.tqdm is None:
+ self.tqdm = self._tqdm_cls(total=self.size, **self._tqdm_kwargs)
+ self.tqdm.update(inc)
DEFAULT_CALLBACK = _DEFAULT_CALLBACK = NoOpCallback()
diff --git a/fsspec/compression.py b/fsspec/compression.py
index 9562369..216c3f6 100644
--- a/fsspec/compression.py
+++ b/fsspec/compression.py
@@ -25,7 +25,20 @@ def register_compression(name, callback, extensions, force=False):
ValueError: If name or extensions already registered, and not force.
"""
- pass
+ global compr
+ if name in compr and not force:
+ raise ValueError(f"Compression {name} already registered")
+
+ if isinstance(extensions, str):
+ extensions = [extensions]
+
+ for ext in extensions:
+ if ext in fsspec.utils.compressions and not force:
+ raise ValueError(f"Extension {ext} already registered")
+
+ compr[name] = callback
+ for ext in extensions:
+ fsspec.utils.compressions[ext] = name
register_compression('zip', unzip, 'zip')
@@ -70,7 +83,21 @@ class SnappyFile(AbstractBufferedFile):
def _fetch_range(self, start, end):
"""Get the specified set of bytes from remote"""
- pass
+ if start is not None or end is not None:
+ raise ValueError("Range fetching not supported with Snappy compression")
+ return self.infile.read()
+
+ def read(self, length=-1):
+ return self.codec.decompress(self.infile.read(length))
+
+ def write(self, data):
+ compressed = self.codec.compress(data)
+ return self.infile.write(compressed)
+
+ def close(self):
+ if hasattr(self.infile, 'close'):
+ self.infile.close()
+ super().close()
try:
@@ -93,4 +120,4 @@ except ImportError:
def available_compressions():
"""Return a list of the implemented compressions."""
- pass
+ return list(compr.keys())
diff --git a/fsspec/config.py b/fsspec/config.py
index 00a7d90..6b5ec42 100644
--- a/fsspec/config.py
+++ b/fsspec/config.py
@@ -28,7 +28,21 @@ def set_conf_env(conf_dict, envdict=os.environ):
envdict : dict-like(str, str)
Source for the values - usually the real environment
"""
- pass
+ for key, value in envdict.items():
+ if key.startswith("FSSPEC_"):
+ parts = key.split("_")
+ if len(parts) == 2:
+ # FSSPEC_<protocol>
+ protocol = parts[1].lower()
+ try:
+ conf_dict.setdefault(protocol, {}).update(json.loads(value))
+ except json.JSONDecodeError:
+ warnings.warn(f"Failed to parse JSON for {key}")
+ elif len(parts) > 2:
+ # FSSPEC_<protocol>_<kwarg>
+ protocol = parts[1].lower()
+ kwarg = "_".join(parts[2:]).lower()
+ conf_dict.setdefault(protocol, {})[kwarg] = value
def set_conf_files(cdir, conf_dict):
@@ -48,7 +62,21 @@ def set_conf_files(cdir, conf_dict):
conf_dict : dict(str, dict)
This dict will be mutated
"""
- pass
+ if not os.path.exists(cdir):
+ return
+
+ for file in sorted(os.listdir(cdir)):
+ path = os.path.join(cdir, file)
+ if file.endswith('.json'):
+ with open(path, 'r') as f:
+ data = json.load(f)
+ for protocol, config in data.items():
+ conf_dict.setdefault(protocol, {}).update(config)
+ elif file.endswith('.ini'):
+ config = configparser.ConfigParser()
+ config.read(path)
+ for section in config.sections():
+ conf_dict.setdefault(section, {}).update(dict(config[section]))
def apply_config(cls, kwargs, conf_dict=None):
@@ -68,7 +96,18 @@ def apply_config(cls, kwargs, conf_dict=None):
-------
dict : the modified set of kwargs
"""
- pass
+ if conf_dict is None:
+ conf_dict = conf
+
+ protocols = cls.protocol if isinstance(cls.protocol, (list, tuple)) else [cls.protocol]
+
+ for protocol in protocols:
+ if protocol in conf_dict:
+ for key, value in conf_dict[protocol].items():
+ if key not in kwargs:
+ kwargs[key] = value
+
+ return kwargs
set_conf_files(conf_dir, conf)
diff --git a/fsspec/core.py b/fsspec/core.py
index 0ba5d69..561be7d 100644
--- a/fsspec/core.py
+++ b/fsspec/core.py
@@ -281,7 +281,7 @@ def open(urlpath, mode='rb', compression=None, encoding='utf8', errors=None,
newline: bytes or None
Used for line terminator in text mode. If None, uses system default;
if blank, uses no translation.
- expand: bool or Nonw
+ expand: bool or None
Whether to regard file paths containing special glob characters as needing
expansion (finding the first match) or absolute. Setting False allows using
paths which do embed such characters. If None (default), this argument
@@ -316,7 +316,15 @@ def open(urlpath, mode='rb', compression=None, encoding='utf8', errors=None,
- For implementations in separate packages see
https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations
"""
- pass
+ if expand is None:
+ expand = DEFAULT_EXPAND
+
+ fs, path = url_to_fs(urlpath, protocol=protocol, expand=expand, **kwargs)
+
+ compression = get_compression(path, compression)
+
+ return OpenFile(fs, path, mode=mode, compression=compression, encoding=encoding,
+ errors=errors, newline=newline)
def open_local(url: (str | list[str] | Path | list[Path]), mode: str='rb',
@@ -334,17 +342,36 @@ def open_local(url: (str | list[str] | Path | list[Path]), mode: str='rb',
storage_options:
passed on to FS for or used by open_files (e.g., compression)
"""
- pass
+ if 'r' not in mode:
+ raise ValueError("Only read mode is supported")
+
+ if isinstance(url, (str, Path)):
+ urls = [url]
+ else:
+ urls = url
+
+ fs, _ = url_to_fs(urls[0], **storage_options)
+
+ paths = [fs._strip_protocol(u) for u in urls]
+
+ if hasattr(fs, 'open_many'):
+ return fs.open_many(paths, mode=mode)
+ else:
+ return [fs.open(path, mode=mode) for path in paths]
def split_protocol(urlpath):
"""Return protocol, path pair"""
- pass
+ if '://' in urlpath:
+ protocol, path = urlpath.split('://', 1)
+ return protocol, path
+ return None, urlpath
def strip_protocol(urlpath):
"""Return only path part of full URL, according to appropriate backend"""
- pass
+ protocol, path = split_protocol(urlpath)
+ return path
def expand_paths_if_needed(paths, mode, num, fs, name_function):
@@ -363,7 +390,26 @@ def expand_paths_if_needed(paths, mode, num, fs, name_function):
``urlpath.replace('*', name_function(partition_index))``.
:return: list of paths
"""
- pass
+ expanded_paths = []
+
+ if 'w' in mode:
+ if isinstance(paths, str):
+ if '*' in paths:
+ expanded_paths = [paths.replace('*', name_function(i)) for i in range(num)]
+ else:
+ expanded_paths = paths if isinstance(paths, list) else [paths]
+ else:
+ expanded_paths = paths
+ else: # read mode
+ if isinstance(paths, str):
+ if any(char in paths for char in '*?[]'):
+ expanded_paths = fs.glob(paths)
+ else:
+ expanded_paths = [paths]
+ else:
+ expanded_paths = paths
+
+ return expanded_paths
def get_fs_token_paths(urlpath, mode='rb', num=1, name_function=None,
@@ -390,7 +436,21 @@ def get_fs_token_paths(urlpath, mode='rb', num=1, name_function=None,
expand: bool
Expand string paths for writing, assuming the path is a directory
"""
- pass
+ if isinstance(urlpath, (list, tuple)):
+ if not urlpath:
+ raise ValueError("Empty urlpath")
+ urlpath = urlpath[0]
+
+ fs, fs_token = url_to_fs(urlpath, protocol=protocol, storage_options=storage_options)
+
+ if 'w' in mode:
+ paths = expand_paths_if_needed([urlpath], mode, num, fs, name_function)
+ elif expand or isinstance(urlpath, str) and has_magic(urlpath):
+ paths = expand_paths_if_needed([urlpath], mode, num, fs, name_function)
+ else:
+ paths = [urlpath]
+
+ return fs, fs_token, paths
class PickleableTextIOWrapper(io.TextIOWrapper):
diff --git a/fsspec/fuse.py b/fsspec/fuse.py
index de1075f..36f9ea5 100644
--- a/fsspec/fuse.py
+++ b/fsspec/fuse.py
@@ -59,7 +59,21 @@ def run(fs, path, mount_point, foreground=True, threads=False, ready_file=
to file.
"""
- pass
+ fuse_ops = ops_class(fs, path, ready_file)
+ fuse = FUSE(
+ fuse_ops,
+ mount_point,
+ foreground=foreground,
+ nothreads=not threads,
+ allow_other=True,
+ )
+
+ if ready_file:
+ ready_file_path = os.path.join(mount_point, '.fuse_ready')
+ with open(ready_file_path, 'w') as f:
+ f.write('FUSE is ready')
+
+ return fuse
def main(args):
@@ -89,7 +103,44 @@ def main(args):
-o 'ftp-username=anonymous' \\
-o 'ftp-password=xieyanbo'
"""
- pass
+ parser = argparse.ArgumentParser(
+ description='Mount filesystem from chained URL to MOUNT_POINT'
+ )
+ parser.add_argument('protocol', help='Filesystem protocol or chained URL')
+ parser.add_argument('path', help='Path within the filesystem to mount')
+ parser.add_argument('mount_point', help='Local directory to mount to')
+ parser.add_argument('-f', '--foreground', action='store_true', help='Run in foreground')
+ parser.add_argument('-t', '--threads', action='store_true', help='Enable multi-threading')
+ parser.add_argument('-l', '--log', help='Log file path')
+ parser.add_argument('-o', '--option', action='append', help='Additional options')
+ parser.add_argument('--ready-file', action='store_true', help='Create .fuse_ready file when mounted')
+
+ args = parser.parse_args(args)
+
+ if args.log:
+ logging.basicConfig(filename=args.log, level=logging.INFO)
+
+ options = {}
+ if args.option:
+ for opt in args.option:
+ key, value = opt.split('=', 1)
+ if value.endswith('[int]'):
+ options[key] = int(value[:-5])
+ elif value.endswith('[bool]'):
+ options[key] = value[:-6].lower() in ('true', 'yes', '1')
+ else:
+ options[key] = value
+
+ fs, _ = url_to_fs(args.protocol, **options)
+
+ run(
+ fs,
+ args.path,
+ args.mount_point,
+ foreground=args.foreground,
+ threads=args.threads,
+ ready_file=args.ready_file
+ )
if __name__ == '__main__':
diff --git a/fsspec/generic.py b/fsspec/generic.py
index 48ba37c..32ea063 100644
--- a/fsspec/generic.py
+++ b/fsspec/generic.py
@@ -15,7 +15,23 @@ default_method = 'default'
def _resolve_fs(url, method=None, protocol=None, storage_options=None):
"""Pick instance of backend FS"""
- pass
+ if protocol is None:
+ protocol, _ = split_protocol(url)
+
+ if method is None:
+ method = default_method
+
+ if method == 'default':
+ fs_class = get_filesystem_class(protocol)
+ return fs_class(**(storage_options or {}))
+ elif method == 'generic':
+ if protocol not in _generic_fs:
+ raise ValueError(f"No generic filesystem found for protocol {protocol}")
+ return _generic_fs[protocol]
+ elif method == 'current':
+ return filesystem(protocol, **(storage_options or {}))
+ else:
+ raise ValueError(f"Unknown method: {method}")
def rsync(source, destination, delete_missing=False, source_field='size',
@@ -62,7 +78,43 @@ def rsync(source, destination, delete_missing=False, source_field='size',
-------
dict of the copy operations that were performed, {source: destination}
"""
- pass
+ if fs is None:
+ fs = GenericFileSystem(**(inst_kwargs or {}))
+
+ source_fs, source_path = fs.get_fs_token_paths(source)
+ dest_fs, dest_path = fs.get_fs_token_paths(destination)
+
+ source_files = source_fs.find(source_path)
+ dest_files = dest_fs.find(dest_path)
+
+ operations = {}
+
+ for src_file in source_files:
+ rel_path = os.path.relpath(src_file, source_path)
+ dest_file = os.path.join(dest_path, rel_path)
+
+ if update_cond == 'always' or not dest_fs.exists(dest_file):
+ source_fs.get(src_file, dest_file)
+ operations[src_file] = dest_file
+ elif update_cond == 'different':
+ src_info = source_fs.info(src_file)
+ dest_info = dest_fs.info(dest_file)
+
+ src_value = src_info[source_field] if isinstance(source_field, str) else source_field(src_info)
+ dest_value = dest_info[dest_field] if isinstance(dest_field, str) else dest_field(dest_info)
+
+ if src_value != dest_value:
+ source_fs.get(src_file, dest_file)
+ operations[src_file] = dest_file
+
+ if delete_missing:
+ for dest_file in dest_files:
+ rel_path = os.path.relpath(dest_file, dest_path)
+ src_file = os.path.join(source_path, rel_path)
+ if not source_fs.exists(src_file):
+ dest_fs.rm(dest_file)
+
+ return operations
class GenericFileSystem(AsyncFileSystem):
@@ -101,5 +153,22 @@ class GenericFileSystem(AsyncFileSystem):
See `func:rsync` for more details.
"""
- pass
+ return rsync(source, destination, fs=self, **kwargs)
+ async def _make_many_dirs(self, paths, exist_ok=False):
+ """Make multiple directories at once
+
+ Parameters
+ ----------
+ paths: list of str
+ List of paths to create
+ exist_ok: bool (optional)
+ If True, don't raise an exception if the directory already exists
+
+ Returns
+ -------
+ None
+ """
+ for path in paths:
+ await self._makedirs(path, exist_ok=exist_ok)
+
make_many_dirs = sync_wrapper(_make_many_dirs)
diff --git a/fsspec/gui.py b/fsspec/gui.py
index ad74c4c..4197561 100644
--- a/fsspec/gui.py
+++ b/fsspec/gui.py
@@ -58,11 +58,19 @@ class SigSlot:
If True, automatically connects with a method in this class of the
same name.
"""
- pass
+ if widget is not None:
+ self._sigs[name] = widget.param.watch(lambda event: self._signal(event), thing)
+ else:
+ self._sigs[name] = None
+
+ self._map[name] = {'widget': widget, 'thing': thing, 'log_level': log_level}
+
+ if auto:
+ self.connect(name, getattr(self, name))
def _repr_mimebundle_(self, *args, **kwargs):
"""Display in a notebook or a server"""
- pass
+ return self.show()._repr_mimebundle_(*args, **kwargs)
def connect(self, signal, slot):
"""Associate call back with given event
@@ -74,7 +82,15 @@ class SigSlot:
Alternatively, the callback can be a string, in which case it means
emitting the correspondingly-named event (i.e., connect to self)
"""
- pass
+ if signal not in self._sigs:
+ raise ValueError(f"Unknown signal: {signal}")
+
+ if isinstance(slot, str):
+ slot = lambda value: self._emit(slot, value)
+
+ if signal not in self._map:
+ self._map[signal] = {'callbacks': []}
+ self._map[signal]['callbacks'] = self._map[signal].get('callbacks', []) + [slot]
def _signal(self, event):
"""This is called by a an action on a widget
@@ -84,7 +100,13 @@ class SigSlot:
Tests can execute this method by directly changing the values of
widget components.
"""
- pass
+ if self._ignoring_events:
+ return
+
+ name = next((k for k, v in self._map.items() if v['widget'] == event.obj), None)
+ if name:
+ logger.log(self._map[name]['log_level'], f"Signal: {name}")
+ self._emit(name, event.new)
@contextlib.contextmanager
def ignore_events(self):
@@ -92,7 +114,11 @@ class SigSlot:
(does not propagate to children)
"""
- pass
+ self._ignoring_events = True
+ try:
+ yield
+ finally:
+ self._ignoring_events = False
def _emit(self, sig, value=None):
"""An event happened, call its callbacks
@@ -102,11 +128,16 @@ class SigSlot:
Calling of callbacks will halt whenever one returns False.
"""
- pass
+ if sig not in self._map:
+ return
+
+ for callback in self._map[sig].get('callbacks', []):
+ if callback(value) is False:
+ break
def show(self, threads=False):
"""Open a new browser tab and display this instance's interface"""
- pass
+ return pn.panel(self).show(threaded=threads)
class SingleSelect(SigSlot):
@@ -117,6 +148,27 @@ class SingleSelect(SigSlot):
def __init__(self, **kwargs):
self.kwargs = kwargs
super().__init__()
+ self.select = pn.widgets.Select(**self.kwargs)
+ self._register(self.select, '_selected')
+ self.connect('_selected', self._on_selected)
+
+ def _on_selected(self, value):
+ self._emit('selected', value)
+
+ def set_options(self, options):
+ self.select.options = options
+
+ def set_selection(self, value):
+ self.select.value = value
+
+ def add(self, option):
+ self.select.options = list(self.select.options) + [option]
+
+ def clear(self):
+ self.select.value = None
+
+ def _setup(self):
+ self._register(self.select, '_selected')
class FileSelector(SigSlot):
@@ -161,17 +213,26 @@ class FileSelector(SigSlot):
@property
def storage_options(self):
"""Value of the kwargs box as a dictionary"""
- pass
+ try:
+ return ast.literal_eval(self.kwargs_box.value)
+ except:
+ return {}
@property
def fs(self):
"""Current filesystem instance"""
- pass
+ if self._fs is None:
+ protocol = self.protocol_select.value
+ cls = get_filesystem_class(protocol)
+ self._fs = cls(**self.storage_options)
+ return self._fs
@property
def urlpath(self):
"""URL of currently selected item"""
- pass
+ protocol = self.protocol_select.value
+ path = self.path_box.value
+ return f"{protocol}://{path}"
def open_file(self, mode='rb', compression=None, encoding=None):
"""Create OpenFile instance for the currently selected item
@@ -197,4 +258,10 @@ class FileSelector(SigSlot):
encoding: str (optional)
If using text mode, use this encoding; defaults to UTF8.
"""
- pass
+ return OpenFile(
+ self.fs,
+ self.path_box.value,
+ mode=mode,
+ compression=compression,
+ encoding=encoding
+ )
diff --git a/fsspec/implementations/cache_mapper.py b/fsspec/implementations/cache_mapper.py
index 3294867..9b06d15 100644
--- a/fsspec/implementations/cache_mapper.py
+++ b/fsspec/implementations/cache_mapper.py
@@ -63,4 +63,7 @@ def create_cache_mapper(same_names: bool) ->AbstractCacheMapper:
"""Factory method to create cache mapper for backward compatibility with
``CachingFileSystem`` constructor using ``same_names`` kwarg.
"""
- pass
+ if same_names:
+ return BasenameCacheMapper()
+ else:
+ return HashCacheMapper()
diff --git a/fsspec/implementations/cache_metadata.py b/fsspec/implementations/cache_metadata.py
index 9a2c33e..af30cc0 100644
--- a/fsspec/implementations/cache_metadata.py
+++ b/fsspec/implementations/cache_metadata.py
@@ -45,11 +45,25 @@ class CacheMetadata:
def _load(self, fn: str) ->Detail:
"""Low-level function to load metadata from specific file"""
- pass
+ try:
+ with open(fn, "rb") as f:
+ try:
+ return json.load(f)
+ except json.JSONDecodeError:
+ # Fallback to pickle for backward compatibility
+ f.seek(0)
+ return pickle.load(f)
+ except FileNotFoundError:
+ return {}
def _save(self, metadata_to_save: Detail, fn: str) ->None:
"""Low-level function to save metadata to specific file"""
- pass
+ if self._force_save_pickle:
+ with atomic_write(fn, mode="wb") as f:
+ pickle.dump(metadata_to_save, f)
+ else:
+ with atomic_write(fn, mode="w") as f:
+ json.dump(metadata_to_save, f)
def _scan_locations(self, writable_only: bool=False) ->Iterator[tuple[
str, str, bool]]:
@@ -65,7 +79,11 @@ class CacheMetadata:
-------
Yields (str, str, bool)
"""
- pass
+ for storage in reversed(self._storage):
+ fn = os.path.join(storage, "cache")
+ writable = os.access(storage, os.W_OK)
+ if not writable_only or writable:
+ yield storage, fn, writable
def check_file(self, path: str, cfs: (CachingFileSystem | None)) ->(Literal
[False] | tuple[Detail, str]):
@@ -75,7 +93,16 @@ class CacheMetadata:
perform extra checks to reject possible matches, such as if they are
too old.
"""
- pass
+ for storage, cached_files in zip(self._storage, self.cached_files):
+ detail = cached_files.get(path)
+ if detail:
+ fn = os.path.join(storage, detail["fn"])
+ if os.path.exists(fn):
+ if cfs and cfs.check_file(path, detail):
+ return detail, fn
+ elif not cfs:
+ return detail, fn
+ return False
def clear_expired(self, expiry_time: int) ->tuple[list[str], bool]:
"""Remove expired metadata from the cache.
@@ -84,18 +111,32 @@ class CacheMetadata:
flag indicating whether the writable cache is empty. Caller is
responsible for deleting the expired files.
"""
- pass
+ expired_files = []
+ now = time.time()
+ for cached_files in self.cached_files:
+ for path, detail in list(cached_files.items()):
+ if now - detail.get("time", 0) > expiry_time:
+ expired_files.append(detail["fn"])
+ del cached_files[path]
+
+ writable_cache_empty = len(self.cached_files[-1]) == 0
+ return expired_files, writable_cache_empty
def load(self) ->None:
"""Load all metadata from disk and store in ``self.cached_files``"""
- pass
+ self.cached_files = []
+ for _, fn, _ in self._scan_locations():
+ self.cached_files.append(self._load(fn))
def on_close_cached_file(self, f: Any, path: str) ->None:
"""Perform side-effect actions on closing a cached file.
The actual closing of the file is the responsibility of the caller.
"""
- pass
+ detail = self.cached_files[-1].get(path)
+ if detail:
+ detail["size"] = f.tell()
+ detail["time"] = time.time()
def pop_file(self, path: str) ->(str | None):
"""Remove metadata of cached file.
@@ -104,12 +145,19 @@ class CacheMetadata:
otherwise return ``None``. Caller is responsible for deleting the
cached file.
"""
- pass
+ for cached_files in reversed(self.cached_files):
+ if path in cached_files:
+ detail = cached_files.pop(path)
+ return detail["fn"]
+ return None
def save(self) ->None:
"""Save metadata to disk"""
- pass
+ for storage, fn, writable in self._scan_locations(writable_only=True):
+ if writable:
+ self._save(self.cached_files[-1], fn)
+ break
def update_file(self, path: str, detail: Detail) ->None:
"""Update metadata for specific file in memory, do not save"""
- pass
+ self.cached_files[-1][path] = detail
diff --git a/fsspec/implementations/cached.py b/fsspec/implementations/cached.py
index bd56e3c..8db7702 100644
--- a/fsspec/implementations/cached.py
+++ b/fsspec/implementations/cached.py
@@ -144,23 +144,33 @@ class CachingFileSystem(AbstractFileSystem):
If more than one cache directory is in use, only the size of the last
one (the writable cache directory) is returned.
"""
- pass
+ if self._cache_size is None:
+ self._cache_size = sum(
+ os.path.getsize(os.path.join(self.storage[-1], f))
+ for f in os.listdir(self.storage[-1])
+ if os.path.isfile(os.path.join(self.storage[-1], f))
+ )
+ return self._cache_size
def load_cache(self):
"""Read set of stored blocks from file"""
- pass
+ self._metadata.load()
def save_cache(self):
"""Save set of stored blocks from file"""
- pass
+ self._metadata.save()
def _check_cache(self):
"""Reload caches if time elapsed or any disappeared"""
- pass
+ self._metadata.check_cache(self.cache_check)
def _check_file(self, path):
"""Is path in cache and still valid"""
- pass
+ path = self._strip_protocol(path)
+ details = self._metadata.get_metadata(path)
+ if details:
+ return time.time() - details["time"] < self.expiry if self.expiry else True
+ return False
def clear_cache(self):
"""Remove all files and metadata from the cache
@@ -168,7 +178,10 @@ class CachingFileSystem(AbstractFileSystem):
In the case of multiple cache locations, this clears only the last one,
which is assumed to be the read/write one.
"""
- pass
+ for file in os.listdir(self.storage[-1]):
+ os.remove(os.path.join(self.storage[-1], file))
+ self._metadata.clear()
+ self._cache_size = 0
def clear_expired_cache(self, expiry_time=None):
"""Remove all expired files and metadata from the cache
@@ -183,7 +196,13 @@ class CachingFileSystem(AbstractFileSystem):
If not defined the default is equivalent to the attribute from the
file caching instantiation.
"""
- pass
+ expiry_time = expiry_time or self.expiry
+ if not expiry_time:
+ return
+ now = time.time()
+ for path, details in list(self._metadata.items()):
+ if now - details["time"] > expiry_time:
+ self.pop_from_cache(path)
def pop_from_cache(self, path):
"""Remove cached version of given file
@@ -192,7 +211,16 @@ class CachingFileSystem(AbstractFileSystem):
location which is not the last, it is assumed to be read-only, and
raises PermissionError
"""
- pass
+ path = self._strip_protocol(path)
+ details = self._metadata.pop(path)
+ if details:
+ fn = self._mapper(path)
+ if os.path.exists(fn):
+ if fn.startswith(self.storage[-1]):
+ os.remove(fn)
+ self._cache_size = None # Reset cache size
+ else:
+ raise PermissionError(f"Cannot delete cached file {fn}")
def _open(self, path, mode='rb', block_size=None, autocommit=True,
cache_options=None, **kwargs):
@@ -208,11 +236,31 @@ class CachingFileSystem(AbstractFileSystem):
We monkey-patch this file, so that when it closes, we call
``close_and_update`` to save the state of the blocks.
"""
- pass
+ path = self._strip_protocol(path)
+ cache_path = self._mapper(path)
+
+ if self._check_file(path):
+ return open(cache_path, mode)
+
+ f = self.fs._open(path, mode=mode, block_size=block_size, **kwargs)
+ if 'r' in mode and self.compression:
+ comp = self.compression
+ f = compr[comp](f, mode='rb')
+ if 'r' in mode:
+ f.cache = MMapCache(f.size, cache_path)
+ close = f.close
+ f.close = lambda: self.close_and_update(f, close)
+
+ return f
def close_and_update(self, f, close):
"""Called when a file is closing, so store the set of blocks"""
- pass
+ if f.closed:
+ return
+ f.cache.close()
+ self._metadata.update_metadata(self._strip_protocol(f.path), {"time": time.time()})
+ self.save_cache()
+ close()
def __getattribute__(self, item):
if item in {'load_cache', '_open', 'save_cache', 'close_and_update',
diff --git a/fsspec/implementations/dask.py b/fsspec/implementations/dask.py
index ead2260..b014b29 100644
--- a/fsspec/implementations/dask.py
+++ b/fsspec/implementations/dask.py
@@ -29,6 +29,55 @@ class DaskWorkerFileSystem(AbstractFileSystem):
self.fs = fs
self._determine_worker()
+ def _determine_worker(self):
+ if isinstance(dask.config.get("scheduler"), Worker):
+ self.worker = dask.config.get("scheduler")
+ elif self.client is None:
+ self.client = _get_global_client()
+
+ def _get_fs(self):
+ if self.fs is None:
+ if self.worker:
+ self.fs = filesystem(self.target_protocol, **self.target_options)
+ else:
+ self.fs = self.client.submit(
+ filesystem, self.target_protocol, **self.target_options
+ ).result()
+ return self.fs
+
+ def _call_worker(self, method, *args, **kwargs):
+ if self.worker:
+ return getattr(self._get_fs(), method)(*args, **kwargs)
+ else:
+ return self.client.submit(
+ getattr, self._get_fs(), method
+ ).result()(*args, **kwargs)
+
+ def mkdir(self, path, create_parents=True, **kwargs):
+ return self._call_worker('mkdir', path, create_parents=create_parents, **kwargs)
+
+ def makedirs(self, path, exist_ok=False):
+ return self._call_worker('makedirs', path, exist_ok=exist_ok)
+
+ def rmdir(self, path):
+ return self._call_worker('rmdir', path)
+
+ def ls(self, path, detail=False, **kwargs):
+ return self._call_worker('ls', path, detail=detail, **kwargs)
+
+ def info(self, path, **kwargs):
+ return self._call_worker('info', path, **kwargs)
+
+ def cp_file(self, path1, path2, **kwargs):
+ return self._call_worker('cp_file', path1, path2, **kwargs)
+
+ def _open(self, path, mode="rb", block_size=None, **kwargs):
+ if mode != 'rb':
+ raise NotImplementedError("Only read-only access is currently supported")
+ return DaskFile(
+ self, path, mode=mode, block_size=block_size, **kwargs
+ )
+
class DaskFile(AbstractBufferedFile):
@@ -40,8 +89,8 @@ class DaskFile(AbstractBufferedFile):
def _initiate_upload(self):
"""Create remote file/upload"""
- pass
+ raise NotImplementedError("Upload is not supported for DaskFile")
def _fetch_range(self, start, end):
"""Get the specified set of bytes from remote"""
- pass
+ return self.fs._call_worker('cat_file', self.path, start=start, end=end)
diff --git a/fsspec/implementations/data.py b/fsspec/implementations/data.py
index 77435f6..369f0b8 100644
--- a/fsspec/implementations/data.py
+++ b/fsspec/implementations/data.py
@@ -28,4 +28,6 @@ class DataFileSystem(AbstractFileSystem):
This version always base64 encodes, even when the data is ascii/url-safe.
"""
- pass
+ encoded_data = base64.b64encode(data).decode('ascii')
+ mime_type = mime if mime else 'text/plain'
+ return f"data:{mime_type};base64,{encoded_data}"
diff --git a/fsspec/implementations/dbfs.py b/fsspec/implementations/dbfs.py
index bbf4358..2849d99 100644
--- a/fsspec/implementations/dbfs.py
+++ b/fsspec/implementations/dbfs.py
@@ -61,7 +61,22 @@ class DatabricksFileSystem(AbstractFileSystem):
but also additional information on file sizes
and types.
"""
- pass
+ json_data = {"path": path}
+ response = self._send_to_api("get", "dbfs/list", json_data)
+
+ files = response.get("files", [])
+ if not detail:
+ return [f["path"] for f in files]
+
+ return [
+ {
+ "name": f["path"].split("/")[-1],
+ "size": f.get("file_size", 0),
+ "type": "directory" if f["is_dir"] else "file",
+ "path": f["path"]
+ }
+ for f in files
+ ]
def makedirs(self, path, exist_ok=True):
"""
@@ -76,7 +91,11 @@ class DatabricksFileSystem(AbstractFileSystem):
exists before creating it (and raises an
Exception if this is the case)
"""
- pass
+ if not exist_ok:
+ if self.exists(path):
+ raise FileExistsError(f"Path already exists: {path}")
+
+ self.mkdir(path, create_parents=True)
def mkdir(self, path, create_parents=True, **kwargs):
"""
@@ -90,7 +109,11 @@ class DatabricksFileSystem(AbstractFileSystem):
Whether to create all parents or not.
"False" is not implemented so far.
"""
- pass
+ if not create_parents:
+ raise NotImplementedError("create_parents=False is not implemented")
+
+ json_data = {"path": path}
+ self._send_to_api("post", "dbfs/mkdirs", json_data)
def rm(self, path, recursive=False, **kwargs):
"""
@@ -103,7 +126,8 @@ class DatabricksFileSystem(AbstractFileSystem):
recursive: bool
Recursively delete all files in a folder.
"""
- pass
+ json_data = {"path": path, "recursive": recursive}
+ self._send_to_api("post", "dbfs/delete", json_data)
def mv(self, source_path, destination_path, recursive=False, maxdepth=
None, **kwargs):
@@ -125,11 +149,18 @@ class DatabricksFileSystem(AbstractFileSystem):
destination_path: str
To where to move (absolute path)
recursive: bool
- Not implemented to far.
+ Not implemented so far.
maxdepth:
- Not implemented to far.
+ Not implemented so far.
"""
- pass
+ if recursive or maxdepth is not None:
+ raise NotImplementedError("recursive and maxdepth are not implemented")
+
+ json_data = {
+ "source_path": source_path,
+ "destination_path": destination_path
+ }
+ self._send_to_api("post", "dbfs/move", json_data)
def _open(self, path, mode='rb', block_size='default', **kwargs):
"""
@@ -138,7 +169,7 @@ class DatabricksFileSystem(AbstractFileSystem):
Only the default blocksize is allowed.
"""
- pass
+ return DatabricksFile(self, path, mode=mode, block_size=block_size, **kwargs)
def _send_to_api(self, method, endpoint, json):
"""
@@ -154,7 +185,16 @@ class DatabricksFileSystem(AbstractFileSystem):
json: dict
Dictionary of information to send
"""
- pass
+ url = f"https://{self.instance}/api/2.0/{endpoint}"
+ if method.lower() == "get":
+ response = self.session.get(url, json=json)
+ elif method.lower() == "post":
+ response = self.session.post(url, json=json)
+ else:
+ raise ValueError(f"Unsupported HTTP method: {method}")
+
+ response.raise_for_status()
+ return response.json()
def _create_handle(self, path, overwrite=True):
"""
@@ -174,7 +214,9 @@ class DatabricksFileSystem(AbstractFileSystem):
If a file already exist at this location, either overwrite
it or raise an exception.
"""
- pass
+ json_data = {"path": path, "overwrite": overwrite}
+ response = self._send_to_api("post", "dbfs/create", json_data)
+ return response["handle"]
def _close_handle(self, handle):
"""
@@ -185,7 +227,8 @@ class DatabricksFileSystem(AbstractFileSystem):
handle: str
Which handle to close.
"""
- pass
+ json_data = {"handle": handle}
+ self._send_to_api("post", "dbfs/close", json_data)
def _add_data(self, handle, data):
"""
@@ -202,7 +245,9 @@ class DatabricksFileSystem(AbstractFileSystem):
data: bytes
Block of data to add to the handle.
"""
- pass
+ encoded_data = base64.b64encode(data).decode('utf-8')
+ json_data = {"handle": handle, "data": encoded_data}
+ self._send_to_api("post", "dbfs/add-block", json_data)
def _get_data(self, path, start, end):
"""
@@ -219,7 +264,10 @@ class DatabricksFileSystem(AbstractFileSystem):
end: int
End position of the block
"""
- pass
+ length = end - start
+ json_data = {"path": path, "offset": start, "length": length}
+ response = self._send_to_api("get", "dbfs/read", json_data)
+ return base64.b64decode(response["data"])
class DatabricksFile(AbstractBufferedFile):
@@ -244,16 +292,22 @@ class DatabricksFile(AbstractBufferedFile):
def _initiate_upload(self):
"""Internal function to start a file upload"""
- pass
+ self.handle = self.fs._create_handle(self.path, overwrite=self.mode == 'wb')
def _upload_chunk(self, final=False):
"""Internal function to add a chunk of data to a started upload"""
- pass
+ if self.buffer:
+ self.fs._add_data(self.handle, self.buffer.getvalue())
+ self.buffer.seek(0)
+ self.buffer.truncate()
+ if final:
+ self.fs._close_handle(self.handle)
def _fetch_range(self, start, end):
"""Internal function to download a block of data"""
- pass
+ return self.fs._get_data(self.path, start, end)
def _to_sized_blocks(self, length, start=0):
"""Helper function to split a range from 0 to total_length into bloksizes"""
- pass
+ for offset in range(start, start + length, self.blocksize):
+ yield offset, min(self.blocksize, start + length - offset)
diff --git a/fsspec/implementations/ftp.py b/fsspec/implementations/ftp.py
index 0658887..b2572ba 100644
--- a/fsspec/implementations/ftp.py
+++ b/fsspec/implementations/ftp.py
@@ -88,7 +88,28 @@ class FTPFile(AbstractBufferedFile):
Will fail if the server does not respect the REST command on
retrieve requests.
"""
- pass
+ data = []
+ total = [0]
+
+ def callback(chunk):
+ data.append(chunk)
+ total[0] += len(chunk)
+ if total[0] >= end - start:
+ raise TransferDone
+
+ try:
+ self.fs.ftp.retrbinary(
+ f"RETR {self.path}",
+ callback,
+ blocksize=self.blocksize,
+ rest=start
+ )
+ except TransferDone:
+ pass
+ except Error as e:
+ raise IOError(f"FTP error: {str(e)}")
+
+ return b"".join(data)[:end - start]
def _mlsd2(ftp, path='.'):
@@ -104,4 +125,46 @@ def _mlsd2(ftp, path='.'):
path: str
Expects to be given path, but defaults to ".".
"""
- pass
+ from datetime import datetime
+ import re
+
+ lines = []
+ ftp.dir(path, lines.append)
+
+ for line in lines:
+ parts = line.split(None, 8)
+ if len(parts) < 9:
+ continue
+
+ perms, _, owner, group, size, month, day, year_or_time, name = parts
+
+ if ':' in year_or_time:
+ year = datetime.now().year
+ time = year_or_time
+ else:
+ year = int(year_or_time)
+ time = "00:00"
+
+ date_str = f"{month} {day} {year} {time}"
+ mtime = datetime.strptime(date_str, "%b %d %Y %H:%M").timestamp()
+
+ is_dir = perms.startswith('d')
+ is_link = perms.startswith('l')
+
+ if is_link:
+ name, _, link_target = name.partition(' -> ')
+ else:
+ link_target = None
+
+ yield {
+ 'name': name,
+ 'size': int(size),
+ 'type': 'dir' if is_dir else 'file',
+ 'modify': mtime,
+ 'unix': {
+ 'mode': perms,
+ 'owner': owner,
+ 'group': group,
+ },
+ 'target': link_target
+ }
diff --git a/fsspec/implementations/github.py b/fsspec/implementations/github.py
index 27f9ccd..831710b 100644
--- a/fsspec/implementations/github.py
+++ b/fsspec/implementations/github.py
@@ -72,22 +72,34 @@ class GithubFileSystem(AbstractFileSystem):
-------
List of string
"""
- pass
+ url = f"https://api.github.com/{'orgs' if is_org else 'users'}/{org_or_user}/repos"
+ response = requests.get(url, timeout=cls.timeout)
+ response.raise_for_status()
+ return [repo['name'] for repo in response.json()]
@property
def tags(self):
"""Names of tags in the repo"""
- pass
+ url = f"https://api.github.com/repos/{self.org}/{self.repo}/tags"
+ response = requests.get(url, timeout=self.timeout, auth=(self.username, self.token) if self.username else None)
+ response.raise_for_status()
+ return [tag['name'] for tag in response.json()]
@property
def branches(self):
"""Names of branches in the repo"""
- pass
+ url = f"https://api.github.com/repos/{self.org}/{self.repo}/branches"
+ response = requests.get(url, timeout=self.timeout, auth=(self.username, self.token) if self.username else None)
+ response.raise_for_status()
+ return [branch['name'] for branch in response.json()]
@property
def refs(self):
"""Named references, tags and branches"""
- pass
+ return {
+ 'tags': self.tags,
+ 'branches': self.branches
+ }
def ls(self, path, detail=False, sha=None, _sha=None, **kwargs):
"""List files at given path
@@ -105,4 +117,31 @@ class GithubFileSystem(AbstractFileSystem):
_sha: str (optional)
List this specific tree object (used internally to descend into trees)
"""
- pass
+ sha = sha or self.root
+ if _sha is None:
+ url = self.url.format(org=self.org, repo=self.repo, sha=sha)
+ if path:
+ url += '/' + path.lstrip('/')
+ else:
+ url = f"https://api.github.com/repos/{self.org}/{self.repo}/git/trees/{_sha}"
+
+ response = requests.get(url, timeout=self.timeout, auth=(self.username, self.token) if self.username else None)
+ response.raise_for_status()
+ data = response.json()
+
+ out = []
+ for item in data.get('tree', []):
+ if detail:
+ out.append({
+ 'name': item['path'],
+ 'size': item.get('size', 0),
+ 'type': 'directory' if item['type'] == 'tree' else 'file',
+ 'sha': item['sha']
+ })
+ else:
+ out.append(item['path'])
+
+ if detail:
+ return out
+ else:
+ return sorted(out)
diff --git a/fsspec/implementations/http.py b/fsspec/implementations/http.py
index 94a6f71..e137fb8 100644
--- a/fsspec/implementations/http.py
+++ b/fsspec/implementations/http.py
@@ -84,14 +84,16 @@ class HTTPFileSystem(AsyncFileSystem):
@classmethod
def _strip_protocol(cls, path):
"""For HTTP, we always want to keep the full URL"""
- pass
+ return path
ls = sync_wrapper(_ls)
def _raise_not_found_for_status(self, response, url):
"""
Raises FileNotFoundError for 404s, otherwise uses raise_for_status.
"""
- pass
+ if response.status == 404:
+ raise FileNotFoundError(f"{url} not found")
+ response.raise_for_status()
def _open(self, path, mode='rb', block_size=None, autocommit=None,
cache_type=None, cache_options=None, size=None, **kwargs):
@@ -109,11 +111,21 @@ class HTTPFileSystem(AsyncFileSystem):
kwargs: key-value
Any other parameters, passed to requests calls
"""
- pass
+ if mode != 'rb':
+ raise NotImplementedError("Only 'rb' mode is supported for HTTP")
+ block_size = block_size or self.block_size
+ if block_size == 0:
+ return HTTPStreamFile(self, path, mode=mode, loop=self.loop, **kwargs)
+ else:
+ return HTTPFile(self, path, mode=mode, block_size=block_size,
+ cache_type=cache_type or self.cache_type,
+ cache_options=cache_options or self.cache_options,
+ size=size, loop=self.loop, asynchronous=self.asynchronous,
+ **kwargs)
def ukey(self, url):
"""Unique identifier; assume HTTP files are static, unchanging"""
- pass
+ return tokenize(url)
async def _info(self, url, **kwargs):
"""Get info of URL
@@ -125,17 +137,82 @@ class HTTPFileSystem(AsyncFileSystem):
which case size will be given as None (and certain operations on the
corresponding file will not work).
"""
- pass
+ session = await self.set_session()
+ try:
+ size, name = await _file_info(url, session=session, **self.kwargs)
+ except Exception:
+ # If HEAD fails, try GET
+ r = await session.get(self.encode_url(url), **self.kwargs)
+ self._raise_not_found_for_status(r, url)
+ await r.release()
+ size = int(r.headers.get('Content-Length', 0)) or None
+ name = url.split('/')[-1]
+ return {'name': name, 'size': size, 'type': 'file'}
async def _glob(self, path, maxdepth=None, **kwargs):
"""
Find files by glob-matching.
- This implementation is idntical to the one in AbstractFileSystem,
+ This implementation is identical to the one in AbstractFileSystem,
but "?" is not considered as a character for globbing, because it is
so common in URLs, often identifying the "query" part.
"""
- pass
+ import posixpath
+ import re
+ from fsspec.utils import other_paths
+
+ if maxdepth is not None:
+ return super()._glob(path, maxdepth)
+
+ url_parts = urlparse(path)
+ base_path = url_parts.scheme + '://' + url_parts.netloc
+ glob_path = url_parts.path
+
+ if '?' in path and '*' not in path:
+ # If there's a query string but no glob characters, treat it as a literal path
+ return [path] if await self._exists(path) else []
+
+ ends = glob_path.endswith('/')
+ sep = '/'
+ scheme = url_parts.scheme
+
+ path = self._strip_protocol(path)
+ indstar = path.find('*') if path.find('*') >= 0 else len(path)
+ indques = path.find('?') if path.find('?') >= 0 else len(path)
+ indbrace = path.find('[') if path.find('[') >= 0 else len(path)
+
+ ind = min(indstar, indques, indbrace)
+
+ detail = kwargs.pop('detail', False)
+
+ if not has_magic(glob_path):
+ if await self._exists(path):
+ return [{'name': path, 'type': 'file'}] if detail else [path]
+ else:
+ return []
+
+ parent = posixpath.dirname(glob_path[:ind])
+ fileglob = glob_path[ind:]
+ pattern = re.compile(glob_translate(fileglob))
+
+ try:
+ entries = await self._ls(base_path + parent)
+ except FileNotFoundError:
+ return []
+
+ out = []
+ for entry in entries:
+ if entry['type'] == 'file':
+ name = entry['name'].split('/')[-1]
+ if pattern.match(name):
+ out.append(entry if detail else base_path + entry['name'])
+ elif entry['type'] == 'directory' and fileglob:
+ path = base_path + entry['name']
+ if ends:
+ path = path + '/'
+ out.extend(await self._glob(path + fileglob, maxdepth=maxdepth, **kwargs))
+
+ return out
class HTTPFile(AbstractBufferedFile):
@@ -186,7 +263,15 @@ class HTTPFile(AbstractBufferedFile):
file. If the server has not supplied the filesize, attempting to
read only part of the data will raise a ValueError.
"""
- pass
+ if self.mode != 'rb':
+ raise ValueError('File not in read mode')
+ if length < 0:
+ length = self.size
+ if self.size is None:
+ if length >= 0:
+ raise ValueError("Cannot read partial file of unknown size")
+ return self._fetch_all()
+ return self.cache.read(self.loc, length)
async def async_fetch_all(self):
"""Read whole file in one shot, without caching
@@ -194,12 +279,26 @@ class HTTPFile(AbstractBufferedFile):
This is only called when position is still at zero,
and read() is called without a byte-count.
"""
- pass
+ if self.size is not None:
+ length = self.size
+ else:
+ length = None
+ r = await self.session.get(self.fs.encode_url(self.url), **self.kwargs)
+ self.fs._raise_not_found_for_status(r, self.url)
+ out = await r.read()
+ self.size = len(out)
+ return out
_fetch_all = sync_wrapper(async_fetch_all)
def _parse_content_range(self, headers):
"""Parse the Content-Range header"""
- pass
+ if 'Content-Range' in headers:
+ content_range = headers['Content-Range']
+ match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
+ if match:
+ start, end, total = match.groups()
+ return int(start), int(end), int(total) if total != '*' else None
+ return None, None, None
async def async_fetch_range(self, start, end):
"""Download a block of data
@@ -209,7 +308,29 @@ class HTTPFile(AbstractBufferedFile):
and then stream the output - if the data size is bigger than we
requested, an exception is raised.
"""
- pass
+ kwargs = self.kwargs.copy()
+ headers = kwargs.pop('headers', {}).copy()
+ headers['Range'] = f'bytes={start}-{end-1}'
+ r = await self.session.get(self.fs.encode_url(self.url), headers=headers, **kwargs)
+ self.fs._raise_not_found_for_status(r, self.url)
+ if r.status == 206:
+ # Partial content response, as expected
+ cr_start, cr_end, cr_total = self._parse_content_range(r.headers)
+ if cr_start is not None:
+ assert start == cr_start, "Requested range does not match Content-Range"
+ if cr_end is not None:
+ assert end == cr_end + 1, "Requested range does not match Content-Range"
+ if cr_total is not None:
+ self.size = cr_total
+ elif self.size is not None:
+ raise ValueError("Server ignored the range request")
+ else:
+ # No range information, have to stream all data
+ cl = int(r.headers.get('Content-Length', 0))
+ if cl and cl > end - start:
+ raise ValueError("Got more bytes than requested")
+ out = await r.read()
+ return out[: end - start]
_fetch_range = sync_wrapper(async_fetch_range)
def __reduce__(self):
@@ -268,7 +389,39 @@ async def _file_info(url, session, size_policy='head', **kwargs):
Default operation is to explicitly allow redirects and use encoding
'identity' (no compression) to get the true size of the target.
"""
- pass
+ kwargs = kwargs.copy()
+ kwargs.pop('method', None)
+ head_method = kwargs.pop('head_method', 'head')
+ kwargs['allow_redirects'] = True
+ kwargs['headers'] = {'Accept-Encoding': 'identity'}
+
+ info = {}
+ try:
+ if head_method == 'head':
+ r = await session.head(url, **kwargs)
+ elif head_method == 'get':
+ r = await session.get(url, **kwargs)
+ else:
+ raise ValueError(f"head_method must be 'head' or 'get', got {head_method}")
+ r.raise_for_status()
+
+ info['name'] = url.split('/')[-1]
+ info['size'] = int(r.headers.get('Content-Length', 0)) or None
+ if 'Content-Type' in r.headers:
+ info['type'] = r.headers['Content-Type']
+ if 'Last-Modified' in r.headers:
+ info['mtime'] = r.headers['Last-Modified']
+ if 'ETag' in r.headers:
+ info['etag'] = r.headers['ETag']
+
+ except Exception as exc:
+ if size_policy == 'head':
+ # Try GET request if HEAD fails
+ return await _file_info(url, session, size_policy='get', **kwargs)
+ else:
+ raise exc
+
+ return info['size'], info['name']
file_size = sync_wrapper(_file_size)
diff --git a/fsspec/implementations/jupyter.py b/fsspec/implementations/jupyter.py
index 7da1be6..03f6b38 100644
--- a/fsspec/implementations/jupyter.py
+++ b/fsspec/implementations/jupyter.py
@@ -34,12 +34,87 @@ class JupyterFileSystem(fsspec.AbstractFileSystem):
self.session.headers['Authorization'] = f'token {tok}'
super().__init__(**kwargs)
+ def ls(self, path, detail=True, **kwargs):
+ """List contents of path."""
+ url = f"{self.url}/{path.strip('/')}"
+ response = self.session.get(url)
+ response.raise_for_status()
+ data = response.json()
+
+ if data['type'] == 'directory':
+ contents = data['content']
+ else:
+ contents = [data]
+
+ if detail:
+ return [{
+ 'name': item['name'],
+ 'size': item['size'],
+ 'type': 'directory' if item['type'] == 'directory' else 'file',
+ 'mtime': item['last_modified']
+ } for item in contents]
+ else:
+ return [item['name'] for item in contents]
+
+ def _open(self, path, mode='rb', **kwargs):
+ """Open a file."""
+ if mode in ('rb', 'r'):
+ url = f"{self.url}/{path.strip('/')}"
+ response = self.session.get(url)
+ response.raise_for_status()
+ data = response.json()
+ content = base64.b64decode(data['content']) if data['format'] == 'base64' else data['content'].encode()
+ return io.BytesIO(content)
+ elif mode in ('wb', 'w'):
+ return SimpleFileWriter(self, path, mode, **kwargs)
+ else:
+ raise ValueError(f"Unsupported mode: {mode}")
+
+ def _rm(self, path):
+ """Remove a file."""
+ url = f"{self.url}/{path.strip('/')}"
+ response = self.session.delete(url)
+ response.raise_for_status()
+
+ def _mkdir(self, path, create_parents=True, **kwargs):
+ """Create a directory."""
+ url = f"{self.url}/{path.strip('/')}"
+ data = {'type': 'directory'}
+ response = self.session.put(url, json=data)
+ response.raise_for_status()
+
class SimpleFileWriter(fsspec.spec.AbstractBufferedFile):
+ def __init__(self, fs, path, mode='wb', **kwargs):
+ self.fs = fs
+ self.path = path
+ self.buffer = io.BytesIO()
+ super().__init__(fs=fs, path=path, mode=mode, **kwargs)
+
def _upload_chunk(self, final=False):
"""Never uploads a chunk until file is done
Not suitable for large files
"""
- pass
+ if final:
+ content = self.buffer.getvalue()
+ url = f"{self.fs.url}/{self.path.strip('/')}"
+ data = {
+ 'type': 'file',
+ 'format': 'base64',
+ 'content': base64.b64encode(content).decode()
+ }
+ response = self.fs.session.put(url, json=data)
+ response.raise_for_status()
+
+ def write(self, data):
+ """Write data to buffer."""
+ self.buffer.write(data)
+
+ def close(self):
+ """Close the file and upload the content."""
+ if not self.closed:
+ self._upload_chunk(final=True)
+ self.buffer.close()
+ super().close()
diff --git a/fsspec/implementations/libarchive.py b/fsspec/implementations/libarchive.py
index c2101dc..96813ee 100644
--- a/fsspec/implementations/libarchive.py
+++ b/fsspec/implementations/libarchive.py
@@ -19,7 +19,28 @@ def custom_reader(file, format_name='all', filter_name='all', block_size=
The `file` object must support the standard `readinto` and 'seek' methods.
"""
- pass
+ def read_cb(archive, context, buffer):
+ size = len(buffer)
+ data = file.read(size)
+ if data:
+ buffer[:len(data)] = data
+ return len(data)
+ return 0
+
+ def seek_cb(archive, context, offset, whence):
+ return file.seek(offset, whence)
+
+ read_func = CFUNCTYPE(c_size_t, c_void_p, c_void_p, POINTER(c_char))(read_cb)
+ seek_func = SEEK_CALLBACK(seek_cb)
+
+ with libarchive.read_memory(
+ None, block_size, format_name, filter_name
+ ) as archive:
+ libarchive.ffi.read_set_seek_callback(archive._pointer, seek_func)
+ libarchive.ffi.read_set_read_callback(archive._pointer, read_func)
+ libarchive.ffi.read_set_callback_data(archive._pointer, None)
+ libarchive.ffi.read_open1(archive._pointer)
+ yield archive
class LibArchiveFileSystem(AbstractArchiveFileSystem):
@@ -60,7 +81,7 @@ class LibArchiveFileSystem(AbstractArchiveFileSystem):
Kwargs passed when instantiating the target FS, if ``fo`` is
a string.
"""
- super().__init__(self, **kwargs)
+ super().__init__(**kwargs)
if mode != 'r':
raise ValueError('Only read from archive files accepted')
if isinstance(fo, str):
@@ -75,3 +96,41 @@ class LibArchiveFileSystem(AbstractArchiveFileSystem):
self.fo = fo.__enter__()
self.block_size = block_size
self.dir_cache = None
+ self._archive = None
+
+ def __exit__(self, *args, **kwargs):
+ self.fo.__exit__(*args, **kwargs)
+ self.of.__exit__(*args, **kwargs)
+
+ def _get_dirs(self):
+ if self.dir_cache is None:
+ self.dir_cache = {}
+ with custom_reader(self.fo) as archive:
+ for entry in archive:
+ path = entry.pathname.lstrip('/')
+ self.dir_cache[path] = dict(
+ size=entry.size,
+ mtime=entry.mtime
+ )
+ return self.dir_cache
+
+ def _open(self, path, mode="rb", **kwargs):
+ if mode != "rb":
+ raise NotImplementedError("Only read-binary mode is supported")
+
+ path = path.lstrip('/')
+ with custom_reader(self.fo) as archive:
+ for entry in archive:
+ if entry.pathname.lstrip('/') == path:
+ data = entry.read()
+ return MemoryFile(data)
+ raise FileNotFoundError(path)
+
+ def info(self, path, **kwargs):
+ path = path.lstrip('/')
+ dirs = self._get_dirs()
+ if path in dirs:
+ return dirs[path]
+ elif path + '/' in dirs:
+ return {'type': 'directory'}
+ raise FileNotFoundError(path)
diff --git a/fsspec/implementations/local.py b/fsspec/implementations/local.py
index af01bea..f8d5c55 100644
--- a/fsspec/implementations/local.py
+++ b/fsspec/implementations/local.py
@@ -34,7 +34,10 @@ class LocalFileSystem(AbstractFileSystem):
def make_path_posix(path):
"""Make path generic and absolute for current OS"""
- pass
+ path = os.path.abspath(os.path.expanduser(path))
+ if os.sep == '\\':
+ path = path.replace('\\', '/')
+ return path
def trailing_sep(path):
@@ -43,7 +46,7 @@ def trailing_sep(path):
A forward slash is always considered a path separator, even on Operating
Systems that normally use a backslash.
"""
- pass
+ return path.endswith('/') or (os.sep != '/' and path.endswith(os.sep))
class LocalFileOpener(io.IOBase):
diff --git a/fsspec/implementations/memory.py b/fsspec/implementations/memory.py
index e1fdbd3..cef0294 100644
--- a/fsspec/implementations/memory.py
+++ b/fsspec/implementations/memory.py
@@ -22,12 +22,118 @@ class MemoryFileSystem(AbstractFileSystem):
protocol = 'memory'
root_marker = '/'
+ def __init__(self, *args, **storage_options):
+ super().__init__(*args, **storage_options)
+
def pipe_file(self, path, value, **kwargs):
"""Set the bytes of given file
Avoids copies of the data if possible
"""
- pass
+ path = self._strip_protocol(stringify_path(path))
+ if isinstance(value, bytes):
+ data = value
+ else:
+ data = value.read()
+ self.store[path] = MemoryFile(self, path, data)
+ self._add_pseudo_dirs(path)
+
+ def _add_pseudo_dirs(self, path):
+ parts = path.split('/')
+ for i in range(1, len(parts)):
+ parent = '/'.join(parts[:i])
+ if parent not in self.pseudo_dirs:
+ self.pseudo_dirs.append(parent)
+
+ def mkdir(self, path, create_parents=True, **kwargs):
+ path = self._strip_protocol(stringify_path(path))
+ if path in self.store:
+ raise FileExistsError(f"File or directory {path} already exists")
+ self._add_pseudo_dirs(path)
+
+ def rmdir(self, path):
+ path = self._strip_protocol(stringify_path(path))
+ if path not in self.pseudo_dirs:
+ raise FileNotFoundError(f"Directory {path} does not exist")
+ if any(k.startswith(path + '/') for k in self.store):
+ raise OSError(ENOTEMPTY, "Directory not empty", path)
+ self.pseudo_dirs.remove(path)
+
+ def ls(self, path, detail=False, **kwargs):
+ path = self._strip_protocol(stringify_path(path)).rstrip('/')
+ paths = set()
+ for p in list(self.store) + self.pseudo_dirs:
+ if p.startswith(path + '/') or p == path:
+ paths.add(p.split('/', 1)[0] if path == '' else '/'.join([path, p[len(path) + 1:].split('/', 1)[0]]))
+ if not paths and path not in self.pseudo_dirs:
+ raise FileNotFoundError(path)
+ if detail:
+ return [self.info(p) for p in sorted(paths)]
+ else:
+ return sorted(paths)
+
+ def info(self, path, **kwargs):
+ path = self._strip_protocol(stringify_path(path))
+ if path in self.store:
+ return {
+ 'name': path,
+ 'size': len(self.store[path].getvalue()),
+ 'type': 'file',
+ 'created': self.store[path].created.isoformat(),
+ 'modified': self.store[path].modified.isoformat(),
+ }
+ elif path in self.pseudo_dirs:
+ return {
+ 'name': path,
+ 'size': 0,
+ 'type': 'directory',
+ }
+ else:
+ raise FileNotFoundError(path)
+
+ def _open(self, path, mode='rb', **kwargs):
+ path = self._strip_protocol(stringify_path(path))
+ if mode in ('rb', 'r'):
+ if path not in self.store:
+ raise FileNotFoundError(path)
+ return MemoryFile(self, path, self.store[path].getvalue())
+ elif mode in ('wb', 'w'):
+ self.store[path] = MemoryFile(self, path)
+ self._add_pseudo_dirs(path)
+ return self.store[path]
+ else:
+ raise ValueError("Unsupported file mode")
+
+ def rm(self, path, recursive=False, maxdepth=None):
+ path = self._strip_protocol(stringify_path(path))
+ if path in self.store:
+ del self.store[path]
+ elif recursive:
+ for p in list(self.store):
+ if p.startswith(path + '/'):
+ del self.store[p]
+ for p in list(self.pseudo_dirs):
+ if p.startswith(path + '/'):
+ self.pseudo_dirs.remove(p)
+ else:
+ raise FileNotFoundError(path)
+
+ def mv(self, path1, path2, **kwargs):
+ path1 = self._strip_protocol(stringify_path(path1))
+ path2 = self._strip_protocol(stringify_path(path2))
+ if path1 not in self.store:
+ raise FileNotFoundError(path1)
+ self.store[path2] = self.store.pop(path1)
+ self.store[path2].path = path2
+ self._add_pseudo_dirs(path2)
+
+ def cp(self, path1, path2, **kwargs):
+ path1 = self._strip_protocol(stringify_path(path1))
+ path2 = self._strip_protocol(stringify_path(path2))
+ if path1 not in self.store:
+ raise FileNotFoundError(path1)
+ self.store[path2] = MemoryFile(self, path2, self.store[path1].getvalue())
+ self._add_pseudo_dirs(path2)
class MemoryFile(BytesIO):
@@ -47,6 +153,23 @@ class MemoryFile(BytesIO):
if data:
super().__init__(data)
self.seek(0)
+ else:
+ super().__init__()
def __enter__(self):
return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.flush()
+
+ def close(self):
+ pass
+
+ def flush(self):
+ self.modified = datetime.now(tz=timezone.utc)
+ if self.fs is not None and self.path is not None:
+ self.fs.store[self.path] = self
+
+ def write(self, data):
+ self.modified = datetime.now(tz=timezone.utc)
+ return super().write(data)
diff --git a/fsspec/implementations/reference.py b/fsspec/implementations/reference.py
index 608bb67..a92f711 100644
--- a/fsspec/implementations/reference.py
+++ b/fsspec/implementations/reference.py
@@ -122,39 +122,106 @@ class LazyReferenceMapper(collections.abc.MutableMapping):
-------
LazyReferenceMapper instance
"""
- pass
+ if fs is None:
+ fs = fsspec.filesystem('file', **(storage_options or {}))
+
+ if fs.exists(root):
+ fs.rm(root, recursive=True)
+ fs.mkdir(root)
+
+ mapper = LazyReferenceMapper(root, fs=fs, record_size=record_size, **kwargs)
+ mapper.zmetadata = {}
+ mapper._items = {}
+ mapper.record_size = record_size
+ return mapper
def listdir(self, basename=True):
"""List top-level directories"""
- pass
+ if self.dirs is None:
+ self.dirs = [d for d in self.fs.ls(self.root) if self.fs.isdir(d)]
+ if basename:
+ return [os.path.basename(d) for d in self.dirs]
+ return self.dirs
def ls(self, path='', detail=True):
"""Shortcut file listings"""
- pass
+ if path:
+ out = self.fs.ls(os.path.join(self.root, path), detail=detail)
+ else:
+ out = self.fs.ls(self.root, detail=detail)
+
+ if detail:
+ return [{**o, 'name': o['name'].replace(self.root + '/', '')} for o in out]
+ return [o.replace(self.root + '/', '') for o in out]
def _load_one_key(self, key):
"""Get the reference for one key
Returns bytes, one-element list or three-element list.
"""
- pass
+ if key in self._items:
+ return self._items[key]
+ if key in self.zmetadata:
+ return json.dumps(self.zmetadata[key]).encode()
+ if '/' not in key or self._is_meta(key):
+ raise KeyError(key)
+
+ field, chunk = key.rsplit('/', 1)
+ record, i, _ = self._key_to_record(key)
+ df = self._generate_record(field, record)
+ if df is None or i >= len(df):
+ raise KeyError(key)
+ row = df.iloc[i]
+ if isinstance(row['url'], bytes):
+ return [row['url'], row['offset'], row['size']]
+ return row.tolist()
@lru_cache(4096)
def _key_to_record(self, key):
"""Details needed to construct a reference for one key"""
- pass
+ field, chunk = key.rsplit('/', 1)
+ chunk_sizes = self._get_chunk_sizes(field)
+ if not chunk_sizes:
+ return 0, int(chunk), None
+
+ parts = [int(p) for p in chunk.split('.')]
+ record = sum(p * s for p, s in zip(parts[:-1], chunk_sizes[1:]))
+ i = parts[-1]
+ return record // self.record_size, i % self.record_size, parts
def _get_chunk_sizes(self, field):
"""The number of chunks along each axis for a given field"""
- pass
+ if field not in self.chunk_sizes:
+ meta = self.zmetadata.get(f"{field}/.zarray")
+ if meta:
+ self.chunk_sizes[field] = meta['chunks']
+ else:
+ self.chunk_sizes[field] = []
+ return self.chunk_sizes[field]
def _generate_record(self, field, record):
"""The references for a given parquet file of a given field"""
- pass
+ import pandas as pd
+ url = self.url.format(field=field, record=record)
+ if not self.fs.exists(url):
+ return None
+ df = pd.read_parquet(self.fs.open(url, 'rb'), engine='pyarrow')
+ if self.cat_thresh:
+ n = len(df)
+ for col in ['url', 'offset', 'size']:
+ if df[col].nunique() / n >= 1 / self.cat_thresh:
+ df[col] = df[col].astype('category')
+ return df
def _generate_all_records(self, field):
"""Load all the references within a field by iterating over the parquet files"""
- pass
+ i = 0
+ while True:
+ df = self._generate_record(field, i)
+ if df is None:
+ break
+ yield from df.itertuples(index=False, name=None)
+ i += 1
def __hash__(self):
return id(self)
@@ -235,7 +302,14 @@ class LazyReferenceMapper(collections.abc.MutableMapping):
Produces strings like "field/x.y" appropriate from the chunking of the array
"""
- pass
+ chunk_sizes = self._get_chunk_sizes(field)
+ if not chunk_sizes:
+ yield f"{field}/0"
+ return
+
+ ranges = [range(0, s) for s in chunk_sizes]
+ for parts in itertools.product(*ranges):
+ yield f"{field}/{'.'.join(str(p) for p in parts)}"
class ReferenceFileSystem(AsyncFileSystem):
diff --git a/fsspec/implementations/sftp.py b/fsspec/implementations/sftp.py
index 95b7f25..7e02fda 100644
--- a/fsspec/implementations/sftp.py
+++ b/fsspec/implementations/sftp.py
@@ -43,10 +43,24 @@ class SFTPFileSystem(AbstractFileSystem):
self.ssh_kwargs = ssh_kwargs
self._connect()
+ def _connect(self):
+ self.client = paramiko.SSHClient()
+ self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ self.client.connect(self.host, **self.ssh_kwargs)
+ self.sftp = self.client.open_sftp()
+
def _open(self, path, mode='rb', block_size=None, **kwargs):
"""
block_size: int or None
If 0, no buffering, if 1, line buffering, if >1, buffer that many
bytes, if None use default from paramiko.
"""
- pass
+ if mode == 'rb':
+ f = self.sftp.open(path, mode='rb')
+ if block_size:
+ f.set_pipelined(block_size)
+ return f
+ elif mode in ('wb', 'ab'):
+ return self.sftp.open(path, mode=mode)
+ else:
+ raise NotImplementedError("Mode {} not supported".format(mode))
diff --git a/fsspec/implementations/smb.py b/fsspec/implementations/smb.py
index a4da1d4..f1ea120 100644
--- a/fsspec/implementations/smb.py
+++ b/fsspec/implementations/smb.py
@@ -106,11 +106,13 @@ class SMBFileSystem(AbstractFileSystem):
def created(self, path):
"""Return the created timestamp of a file as a datetime.datetime"""
- pass
+ with smbclient.open_file(f"//{self.host}/{path}", mode="r", username=self.username, password=self.password) as file:
+ return datetime.datetime.fromtimestamp(file.stat().st_ctime)
def modified(self, path):
"""Return the modified timestamp of a file as a datetime.datetime"""
- pass
+ with smbclient.open_file(f"//{self.host}/{path}", mode="r", username=self.username, password=self.password) as file:
+ return datetime.datetime.fromtimestamp(file.stat().st_mtime)
def _open(self, path, mode='rb', block_size=-1, autocommit=True,
cache_options=None, **kwargs):
@@ -123,11 +125,29 @@ class SMBFileSystem(AbstractFileSystem):
By specifying 'share_access' in 'kwargs' it is possible to override the
default shared access setting applied in the constructor of this object.
"""
- pass
+ share_access = kwargs.get('share_access', self.share_access)
+ full_path = f"//{self.host}/{path}"
+
+ if self.auto_mkdir and 'w' in mode:
+ self.makedirs(self._parent(path), exist_ok=True)
+
+ return SMBFileOpener(
+ full_path,
+ self.temppath,
+ mode,
+ port=self.port,
+ block_size=block_size,
+ username=self.username,
+ password=self.password,
+ share_access=share_access,
+ **kwargs
+ )
def copy(self, path1, path2, **kwargs):
"""Copy within two locations in the same filesystem"""
- pass
+ with self._open(path1, 'rb') as source:
+ with self._open(path2, 'wb') as destination:
+ destination.write(source.read())
class SMBFileOpener:
@@ -146,11 +166,15 @@ class SMBFileOpener:
def commit(self):
"""Move temp file to definitive on success."""
- pass
+ if self.temp:
+ smbclient.rename(self.temp, self.path, username=self.kwargs.get('username'), password=self.kwargs.get('password'))
+ self._open()
def discard(self):
"""Remove the temp file on failure."""
- pass
+ if self.temp:
+ smbclient.remove(self.temp, username=self.kwargs.get('username'), password=self.kwargs.get('password'))
+ self._open()
def __fspath__(self):
return self.path
diff --git a/fsspec/implementations/tests/local/local_fixtures.py b/fsspec/implementations/tests/local/local_fixtures.py
index a549f6d..969c50d 100644
--- a/fsspec/implementations/tests/local/local_fixtures.py
+++ b/fsspec/implementations/tests/local/local_fixtures.py
@@ -1,7 +1,94 @@
import pytest
+import os
+import tempfile
from fsspec.implementations.local import LocalFileSystem, make_path_posix
from fsspec.tests.abstract import AbstractFixtures
class LocalFixtures(AbstractFixtures):
- pass
+ @pytest.fixture
+ def fs(self):
+ return LocalFileSystem()
+
+ @pytest.fixture
+ def root(self):
+ with tempfile.TemporaryDirectory() as tmpdir:
+ yield make_path_posix(tmpdir)
+
+ @pytest.fixture
+ def tempdir(self):
+ with tempfile.TemporaryDirectory() as tmpdir:
+ yield make_path_posix(tmpdir)
+
+ def make_path(self, path):
+ return make_path_posix(os.path.join(self.root, path))
+
+ def test_simple(self, fs, root):
+ # Test basic file operations
+ test_file = os.path.join(root, 'test.txt')
+ fs.touch(test_file)
+ assert fs.exists(test_file)
+
+ fs.write_text(test_file, 'Hello, World!')
+ assert fs.read_text(test_file) == 'Hello, World!'
+
+ fs.rm(test_file)
+ assert not fs.exists(test_file)
+
+ def test_mkdir(self, fs, root):
+ # Test directory creation
+ test_dir = os.path.join(root, 'test_dir')
+ fs.mkdir(test_dir)
+ assert fs.isdir(test_dir)
+
+ nested_dir = os.path.join(test_dir, 'nested')
+ fs.mkdir(nested_dir, create_parents=True)
+ assert fs.isdir(nested_dir)
+
+ def test_ls(self, fs, root):
+ # Test listing directory contents
+ fs.touch(os.path.join(root, 'file1.txt'))
+ fs.touch(os.path.join(root, 'file2.txt'))
+ fs.mkdir(os.path.join(root, 'subdir'))
+
+ contents = fs.ls(root)
+ assert len(contents) == 3
+ assert set(os.path.basename(item) for item in contents) == {'file1.txt', 'file2.txt', 'subdir'}
+
+ def test_cp(self, fs, root):
+ # Test file copying
+ src_file = os.path.join(root, 'source.txt')
+ dst_file = os.path.join(root, 'destination.txt')
+
+ fs.write_text(src_file, 'Test content')
+ fs.cp(src_file, dst_file)
+
+ assert fs.exists(dst_file)
+ assert fs.read_text(dst_file) == 'Test content'
+
+ def test_rm(self, fs, root):
+ # Test file and directory removal
+ test_file = os.path.join(root, 'test.txt')
+ test_dir = os.path.join(root, 'test_dir')
+
+ fs.touch(test_file)
+ fs.mkdir(test_dir)
+ fs.touch(os.path.join(test_dir, 'nested.txt'))
+
+ fs.rm(test_file)
+ assert not fs.exists(test_file)
+
+ fs.rm(test_dir, recursive=True)
+ assert not fs.exists(test_dir)
+
+ def test_move(self, fs, root):
+ # Test moving files and directories
+ src_file = os.path.join(root, 'source.txt')
+ dst_file = os.path.join(root, 'moved.txt')
+
+ fs.write_text(src_file, 'Move me')
+ fs.move(src_file, dst_file)
+
+ assert not fs.exists(src_file)
+ assert fs.exists(dst_file)
+ assert fs.read_text(dst_file) == 'Move me'
diff --git a/fsspec/implementations/tests/local/local_test.py b/fsspec/implementations/tests/local/local_test.py
index 6f23e7a..a1acacb 100644
--- a/fsspec/implementations/tests/local/local_test.py
+++ b/fsspec/implementations/tests/local/local_test.py
@@ -3,12 +3,13 @@ from fsspec.implementations.tests.local.local_fixtures import LocalFixtures
class TestLocalCopy(abstract.AbstractCopyTests, LocalFixtures):
- pass
-
+ def setUp(self):
+ super().setUp()
class TestLocalGet(abstract.AbstractGetTests, LocalFixtures):
- pass
-
+ def setUp(self):
+ super().setUp()
class TestLocalPut(abstract.AbstractPutTests, LocalFixtures):
- pass
+ def setUp(self):
+ super().setUp()
diff --git a/fsspec/implementations/tests/memory/memory_fixtures.py b/fsspec/implementations/tests/memory/memory_fixtures.py
index 26f59cd..0823d2b 100644
--- a/fsspec/implementations/tests/memory/memory_fixtures.py
+++ b/fsspec/implementations/tests/memory/memory_fixtures.py
@@ -3,5 +3,45 @@ from fsspec import filesystem
from fsspec.tests.abstract import AbstractFixtures
+import pytest
+from fsspec.implementations.memory import MemoryFileSystem
+from fsspec.tests.abstract import AbstractFixtures
+
class MemoryFixtures(AbstractFixtures):
- pass
+ @pytest.fixture
+ def fs(self):
+ return MemoryFileSystem()
+
+ @pytest.fixture
+ def fs_path(self):
+ return ""
+
+ @pytest.fixture
+ def fs_join(self):
+ return lambda *args: "/".join(args)
+
+ @pytest.fixture
+ def fs_bulk(self):
+ return False
+
+ @pytest.fixture
+ def fs_supports_empty_directories(self):
+ return True
+
+ @pytest.fixture
+ def fs_cleanup(self):
+ yield
+ MemoryFileSystem.store.clear()
+
+ @pytest.fixture
+ def fs_create_file(self, fs):
+ def _create_file(path, contents=b""):
+ with fs.open(path, "wb") as f:
+ f.write(contents)
+ return _create_file
+
+ @pytest.fixture
+ def fs_create_dir(self, fs):
+ def _create_dir(path):
+ fs.mkdir(path)
+ return _create_dir
diff --git a/fsspec/implementations/tests/memory/memory_test.py b/fsspec/implementations/tests/memory/memory_test.py
index fd0ebaa..c77c4e8 100644
--- a/fsspec/implementations/tests/memory/memory_test.py
+++ b/fsspec/implementations/tests/memory/memory_test.py
@@ -1,4 +1,4 @@
-import fsspec.tests.abstract as abstract
+from fsspec.tests.abstract import AbstractCopyTests, AbstractGetTests, AbstractPutTests
from fsspec.implementations.tests.memory.memory_fixtures import MemoryFixtures
diff --git a/fsspec/implementations/tests/test_archive.py b/fsspec/implementations/tests/test_archive.py
index 0c8d230..6a9fd86 100644
--- a/fsspec/implementations/tests/test_archive.py
+++ b/fsspec/implementations/tests/test_archive.py
@@ -18,7 +18,16 @@ def tempzip(data=None):
"""
Provide test cases with temporary synthesized Zip archives.
"""
- pass
+ if data is None:
+ data = archive_data
+ with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp:
+ with zipfile.ZipFile(tmp, mode='w') as zf:
+ for k, v in data.items():
+ zf.writestr(k, v)
+ try:
+ yield tmp.name
+ finally:
+ os.remove(tmp.name)
@contextmanager
@@ -26,7 +35,16 @@ def temparchive(data=None):
"""
Provide test cases with temporary synthesized 7-Zip archives.
"""
- pass
+ if data is None:
+ data = archive_data
+ with tempfile.NamedTemporaryFile(suffix='.7z', delete=False) as tmp:
+ with py7zr.SevenZipFile(tmp.name, mode='w') as archive:
+ for k, v in data.items():
+ archive.writestr(v, k)
+ try:
+ yield tmp.name
+ finally:
+ os.remove(tmp.name)
@contextmanager
@@ -34,7 +52,18 @@ def temptar(data=None, mode='w', suffix='.tar'):
"""
Provide test cases with temporary synthesized .tar archives.
"""
- pass
+ if data is None:
+ data = archive_data
+ with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
+ with tarfile.open(tmp.name, mode=mode) as tf:
+ for k, v in data.items():
+ info = tarfile.TarInfo(name=k)
+ info.size = len(v)
+ tf.addfile(info, BytesIO(v))
+ try:
+ yield tmp.name
+ finally:
+ os.remove(tmp.name)
@contextmanager
@@ -42,7 +71,8 @@ def temptargz(data=None, mode='w', suffix='.tar.gz'):
"""
Provide test cases with temporary synthesized .tar.gz archives.
"""
- pass
+ with temptar(data, mode=mode + ':gz', suffix=suffix) as fn:
+ yield fn
@contextmanager
@@ -50,7 +80,8 @@ def temptarbz2(data=None, mode='w', suffix='.tar.bz2'):
"""
Provide test cases with temporary synthesized .tar.bz2 archives.
"""
- pass
+ with temptar(data, mode=mode + ':bz2', suffix=suffix) as fn:
+ yield fn
@contextmanager
@@ -58,7 +89,8 @@ def temptarxz(data=None, mode='w', suffix='.tar.xz'):
"""
Provide test cases with temporary synthesized .tar.xz archives.
"""
- pass
+ with temptar(data, mode=mode + ':xz', suffix=suffix) as fn:
+ yield fn
class ArchiveTestScenario:
@@ -86,7 +118,14 @@ def pytest_generate_tests(metafunc):
https://docs.pytest.org/en/latest/example/parametrize.html#a-quick-port-of-testscenarios
"""
- pass
+ idlist = []
+ argvalues = []
+ for scenario in metafunc.cls.scenarios:
+ idlist.append(f"{scenario.protocol}")
+ if scenario.variant:
+ idlist[-1] += f"-{scenario.variant}"
+ argvalues.append(scenario)
+ metafunc.parametrize("scenario", argvalues, ids=idlist, scope="class")
scenario_zip = ArchiveTestScenario(protocol='zip', provider=tempzip)
diff --git a/fsspec/implementations/tests/test_cached.py b/fsspec/implementations/tests/test_cached.py
index 3baa269..f0e4121 100644
--- a/fsspec/implementations/tests/test_cached.py
+++ b/fsspec/implementations/tests/test_cached.py
@@ -25,9 +25,34 @@ def test_equality(tmpdir):
Related: GitHub#577, GitHub#578
"""
- pass
+ fs1 = fsspec.filesystem("file")
+ fs2 = fsspec.filesystem("file")
+
+ cache1 = str(tmpdir.mkdir("cache1"))
+ cache2 = str(tmpdir.mkdir("cache2"))
+
+ cfs1 = CachingFileSystem(fs=fs1, cache_storage=cache1)
+ cfs2 = CachingFileSystem(fs=fs2, cache_storage=cache1)
+ cfs3 = CachingFileSystem(fs=fs1, cache_storage=cache2)
+
+ # Test equality
+ assert cfs1 == cfs2, "CachingFileSystems with same fs and cache should be equal"
+ assert cfs1 != cfs3, "CachingFileSystems with different caches should not be equal"
+ assert cfs1 != fs1, "CachingFileSystem should not equal its underlying filesystem"
+
+ # Test hashing
+ assert hash(cfs1) == hash(cfs2), "Equal CachingFileSystems should have the same hash"
+ assert hash(cfs1) != hash(cfs3), "Different CachingFileSystems should have different hashes"
+ assert hash(cfs1) != hash(fs1), "CachingFileSystem should have a different hash from its underlying filesystem"
def test_str():
"""Test that the str representation refers to correct class."""
- pass
+ fs = fsspec.filesystem("file")
+ cfs = CachingFileSystem(fs=fs, cache_storage="/tmp/cache")
+
+ str_repr = str(cfs)
+
+ assert "CachingFileSystem" in str_repr, "String representation should mention CachingFileSystem"
+ assert "LocalFileSystem" in str_repr, "String representation should mention the underlying filesystem"
+ assert "/tmp/cache" in str_repr, "String representation should include the cache storage path"
diff --git a/fsspec/implementations/tests/test_dbfs.py b/fsspec/implementations/tests/test_dbfs.py
index 18a8bb2..43c6394 100644
--- a/fsspec/implementations/tests/test_dbfs.py
+++ b/fsspec/implementations/tests/test_dbfs.py
@@ -45,4 +45,21 @@ def vcr_config():
If the DBFS_TOKEN env variable is set, we record with VCR.
If not, we only replay (to not accidentally record with a wrong URL).
"""
- pass
+ def before_record_request(request):
+ parsed = urlparse(request.uri)
+ request.uri = parsed._replace(netloc=DUMMY_INSTANCE).geturl()
+ if 'Authorization' in request.headers:
+ request.headers['Authorization'] = 'Bearer <TOKEN>'
+ return request
+
+ def before_record_response(response):
+ if 'Date' in response['headers']:
+ del response['headers']['Date']
+ return response
+
+ return {
+ 'filter_headers': ['Authorization'],
+ 'before_record_request': before_record_request,
+ 'before_record_response': before_record_response,
+ 'record_mode': 'once' if TOKEN else 'none'
+ }
diff --git a/fsspec/implementations/tests/test_local.py b/fsspec/implementations/tests/test_local.py
index ccabb9c..c398ff1 100644
--- a/fsspec/implementations/tests/test_local.py
+++ b/fsspec/implementations/tests/test_local.py
@@ -37,14 +37,33 @@ def filetexts(d, open=open, mode='t'):
automatically switch to a temporary current directory, to avoid
race conditions when running tests in parallel.
"""
- pass
+ original_dir = os.getcwd()
+ with tempfile.TemporaryDirectory() as dirname:
+ os.chdir(dirname)
+ for filename, text in d.items():
+ with open(filename, 'w' + mode) as f:
+ f.write(text)
+ yield
+ os.chdir(original_dir)
def test_urlpath_expand_read():
"""Make sure * is expanded in file paths when reading."""
- pass
+ with filetexts(csv_files):
+ fn = sorted(csv_files)[0]
+ data = open_files('.test.fakedata.*.csv')[0].read()
+ assert data == csv_files[fn]
+
+ fs = fsspec.filesystem('file')
+ files = fs.glob('.test.fakedata.*.csv')
+ assert len(files) == 2
def test_urlpath_expand_write():
"""Make sure * is expanded in file paths when writing."""
- pass
+ with tempfile.TemporaryDirectory() as dirname:
+ fs = fsspec.filesystem('file')
+ fs.open(os.path.join(dirname, 'test_file_*.txt'), 'w').write('test')
+ files = fs.glob(os.path.join(dirname, 'test_file_*.txt'))
+ assert len(files) == 1
+ assert fs.read_text(files[0]) == 'test'
diff --git a/fsspec/implementations/tests/test_sftp.py b/fsspec/implementations/tests/test_sftp.py
index c50f763..1caa7f8 100644
--- a/fsspec/implementations/tests/test_sftp.py
+++ b/fsspec/implementations/tests/test_sftp.py
@@ -10,4 +10,11 @@ pytest.importorskip('paramiko')
def make_tarfile(files_to_pack, tmp_path):
"""Create a tarfile with some files."""
- pass
+ tar_filename = tmp_path / "test.tar"
+ with TarFile.open(tar_filename, "w") as tar:
+ for file_name, content in files_to_pack.items():
+ file_path = tmp_path / file_name
+ with open(file_path, "w") as f:
+ f.write(content)
+ tar.add(file_path, arcname=file_name)
+ return tar_filename
diff --git a/fsspec/implementations/tests/test_tar.py b/fsspec/implementations/tests/test_tar.py
index 754de3b..b84ac72 100644
--- a/fsspec/implementations/tests/test_tar.py
+++ b/fsspec/implementations/tests/test_tar.py
@@ -22,7 +22,16 @@ def test_compressions(recipe):
"""
Run tests on all available tar file compression variants.
"""
- pass
+ with temptar(recipe) as fn:
+ with open(fn, 'rb') as f:
+ assert f.read(len(recipe['magic'])) == recipe['magic']
+
+ fs = TarFileSystem(fn)
+ assert fs.ls('/') == ['a', 'b']
+ assert fs.ls('/a') == ['a/1', 'a/2']
+ assert fs.cat('/a/1') == b'one'
+ assert fs.cat('/a/2') == b'two'
+ assert fs.cat('/b') == b'three'
@pytest.mark.parametrize('recipe', [{'mode': 'w', 'suffix': '.tar', 'magic':
@@ -35,7 +44,23 @@ def test_filesystem_direct(recipe, tmpdir):
Run tests through a real fsspec filesystem implementation.
Here: `LocalFileSystem`.
"""
- pass
+ fn = os.path.join(tmpdir, 'test' + recipe['suffix'])
+ with tarfile.open(fn, recipe['mode']) as tf:
+ for name, data in archive_data.items():
+ info = tarfile.TarInfo(name=name)
+ info.size = len(data)
+ tf.addfile(info, BytesIO(data))
+
+ fs = fsspec.filesystem('file')
+ with fs.open(fn, 'rb') as f:
+ assert f.read(len(recipe['magic'])) == recipe['magic']
+
+ tfs = TarFileSystem(fn)
+ assert tfs.ls('/') == ['a', 'b']
+ assert tfs.ls('/a') == ['a/1', 'a/2']
+ assert tfs.cat('/a/1') == b'one'
+ assert tfs.cat('/a/2') == b'two'
+ assert tfs.cat('/b') == b'three'
@pytest.mark.parametrize('recipe', [{'mode': 'w', 'suffix': '.tar', 'magic':
@@ -48,7 +73,31 @@ def test_filesystem_cached(recipe, tmpdir):
Run tests through a real, cached, fsspec filesystem implementation.
Here: `TarFileSystem` over `WholeFileCacheFileSystem` over `LocalFileSystem`.
"""
- pass
+ fn = os.path.join(tmpdir, 'test' + recipe['suffix'])
+ with tarfile.open(fn, recipe['mode']) as tf:
+ for name, data in archive_data.items():
+ info = tarfile.TarInfo(name=name)
+ info.size = len(data)
+ tf.addfile(info, BytesIO(data))
+
+ fs = fsspec.filesystem('file')
+ with fs.open(fn, 'rb') as f:
+ assert f.read(len(recipe['magic'])) == recipe['magic']
+
+ cache_dir = os.path.join(tmpdir, 'cache')
+ os.mkdir(cache_dir)
+
+ cached_fs = WholeFileCacheFileSystem(fs=fs, cache_storage=cache_dir)
+ tfs = TarFileSystem(OpenFile(cached_fs, fn))
+
+ assert tfs.ls('/') == ['a', 'b']
+ assert tfs.ls('/a') == ['a/1', 'a/2']
+ assert tfs.cat('/a/1') == b'one'
+ assert tfs.cat('/a/2') == b'two'
+ assert tfs.cat('/b') == b'three'
+
+ # Check if the file is cached
+ assert os.path.exists(os.path.join(cache_dir, os.path.basename(fn)))
@pytest.mark.parametrize('compression', ['', 'gz', 'bz2', 'xz'], ids=['tar',
@@ -59,4 +108,24 @@ def test_ls_with_folders(compression: str, tmp_path: Path):
but make sure that the reading filesystem is still able to resolve the
intermediate folders, like the ZipFileSystem.
"""
- pass
+ tar_path = tmp_path / f"test.tar{'.'+compression if compression else ''}"
+ mode = f"w:{compression}" if compression else "w"
+
+ with tarfile.open(tar_path, mode) as tar:
+ tar.add(BytesIO(b"content"), arcname="a/b/c/file.txt")
+
+ fs = TarFileSystem(str(tar_path))
+
+ assert fs.ls("/") == ["a"]
+ assert fs.ls("/a") == ["a/b"]
+ assert fs.ls("/a/b") == ["a/b/c"]
+ assert fs.ls("/a/b/c") == ["a/b/c/file.txt"]
+ assert fs.cat("/a/b/c/file.txt") == b"content"
+
+ # Test glob
+ assert fs.glob("/a/*/c/*.txt") == ["a/b/c/file.txt"]
+
+ # Test info
+ info = fs.info("a/b/c/file.txt")
+ assert info["type"] == "file"
+ assert info["size"] == 7
diff --git a/fsspec/implementations/tests/test_zip.py b/fsspec/implementations/tests/test_zip.py
index c554e22..4c9bb90 100644
--- a/fsspec/implementations/tests/test_zip.py
+++ b/fsspec/implementations/tests/test_zip.py
@@ -7,4 +7,17 @@ from fsspec.implementations.tests.test_archive import archive_data, tempzip
def test_fsspec_get_mapper():
"""Added for #788"""
- pass
+ with tempzip(archive_data) as fn:
+ # Test with explicit protocol
+ m = fsspec.get_mapper(f"zip://{fn}")
+ assert isinstance(m, collections.abc.MutableMapping)
+ assert set(m) == set(archive_data)
+ assert m["a"] == b"data1"
+ assert m["b"] == b"data2"
+
+ # Test with inferred protocol
+ m2 = fsspec.get_mapper(fn)
+ assert isinstance(m2, collections.abc.MutableMapping)
+ assert set(m2) == set(archive_data)
+ assert m2["a"] == b"data1"
+ assert m2["b"] == b"data2"
diff --git a/fsspec/implementations/webhdfs.py b/fsspec/implementations/webhdfs.py
index bc3c00b..99d5d52 100644
--- a/fsspec/implementations/webhdfs.py
+++ b/fsspec/implementations/webhdfs.py
@@ -141,19 +141,27 @@ class WebHDFS(AbstractFileSystem):
-------
WebHDFile instance
"""
- pass
+ return WebHDFile(self, path, mode=mode, block_size=block_size,
+ autocommit=autocommit, replication=replication,
+ permissions=permissions, **kwargs)
def content_summary(self, path):
"""Total numbers of files, directories and bytes under path"""
- pass
+ url = f"{self.url}{path}?op=GETCONTENTSUMMARY"
+ resp = self._call('get', url)
+ return resp['ContentSummary']
def ukey(self, path):
"""Checksum info of file, giving method and result"""
- pass
+ url = f"{self.url}{path}?op=GETFILECHECKSUM"
+ resp = self._call('get', url)
+ return resp['FileChecksum']
def home_directory(self):
"""Get user's home directory"""
- pass
+ url = f"{self.url}?op=GETHOMEDIRECTORY"
+ resp = self._call('get', url)
+ return resp['Path']
def get_delegation_token(self, renewer=None):
"""Retrieve token which can give the same authority to other uses
@@ -163,15 +171,22 @@ class WebHDFS(AbstractFileSystem):
renewer: str or None
User who may use this token; if None, will be current user
"""
- pass
+ url = f"{self.url}?op=GETDELEGATIONTOKEN"
+ if renewer:
+ url += f"&renewer={renewer}"
+ resp = self._call('get', url)
+ return resp['Token']['urlString']
def renew_delegation_token(self, token):
"""Make token live longer. Returns new expiry time"""
- pass
+ url = f"{self.url}?op=RENEWDELEGATIONTOKEN&token={token}"
+ resp = self._call('put', url)
+ return resp['long']
def cancel_delegation_token(self, token):
"""Stop the token from being useful"""
- pass
+ url = f"{self.url}?op=CANCELDELEGATIONTOKEN&token={token}"
+ self._call('put', url)
def chmod(self, path, mod):
"""Set the permission at path
@@ -184,11 +199,19 @@ class WebHDFS(AbstractFileSystem):
posix epresentation or permission, give as oct string, e.g, '777'
or 0o777
"""
- pass
+ if isinstance(mod, int):
+ mod = oct(mod)[2:]
+ url = f"{self.url}{path}?op=SETPERMISSION&permission={mod}"
+ self._call('put', url)
def chown(self, path, owner=None, group=None):
"""Change owning user and/or group"""
- pass
+ url = f"{self.url}{path}?op=SETOWNER"
+ if owner:
+ url += f"&owner={owner}"
+ if group:
+ url += f"&group={group}"
+ self._call('put', url)
def set_replication(self, path, replication):
"""
@@ -202,7 +225,8 @@ class WebHDFS(AbstractFileSystem):
Number of copies of file on the cluster. Should be smaller than
number of data nodes; normally 3 on most systems.
"""
- pass
+ url = f"{self.url}{path}?op=SETREPLICATION&replication={replication}"
+ self._call('put', url)
class WebHDFile(AbstractBufferedFile):
@@ -230,8 +254,30 @@ class WebHDFile(AbstractBufferedFile):
This is the last block, so should complete file, if
self.autocommit is True.
"""
- pass
+ data = self.buffer.getvalue()
+ self.buffer.seek(0)
+ self.buffer.truncate()
+
+ if not data:
+ return
+
+ if self.offset == 0:
+ self._initiate_upload()
+
+ url = f"{self.fs.url}{self.path}?op=APPEND&namenoderpcaddress={self.fs.host}:{self.fs.port}&offset={self.offset}"
+ self.fs._call('post', url, data=data)
+ self.offset += len(data)
+
+ if final and self.autocommit:
+ self.commit()
def _initiate_upload(self):
"""Create remote file/upload"""
- pass
+ url = f"{self.fs.url}{self.path}?op=CREATE"
+ if self.replication:
+ url += f"&replication={self.replication}"
+ if self.permissions:
+ url += f"&permission={self.permissions}"
+
+ redirect_url = self.fs._call('put', url, allow_redirects=False).headers['Location']
+ self.fs._call('put', redirect_url, data="")
diff --git a/fsspec/implementations/zip.py b/fsspec/implementations/zip.py
index b37820c..bcf9c58 100644
--- a/fsspec/implementations/zip.py
+++ b/fsspec/implementations/zip.py
@@ -59,4 +59,10 @@ class ZipFileSystem(AbstractArchiveFileSystem):
def close(self):
"""Commits any write changes to the file. Done on ``del`` too."""
- pass
+ if self.zip is not None:
+ self.zip.close()
+ self.zip = None
+ if self.of is not None:
+ self.of.__exit__(None, None, None)
+ self.of = None
+ self.fo = None
diff --git a/fsspec/json.py b/fsspec/json.py
index 54f0af3..1af47e3 100644
--- a/fsspec/json.py
+++ b/fsspec/json.py
@@ -9,13 +9,38 @@ from .spec import AbstractFileSystem
class FilesystemJSONEncoder(json.JSONEncoder):
include_password: ClassVar[bool] = True
- def make_serializable(self, obj: Any) ->Any:
+ def make_serializable(self, obj: Any) -> Any:
"""
Recursively converts an object so that it can be JSON serialized via
:func:`json.dumps` and :func:`json.dump`, without actually calling
said functions.
"""
- pass
+ if isinstance(obj, AbstractFileSystem):
+ return {
+ "__fsspec_type__": "filesystem",
+ "protocol": obj.protocol,
+ "args": obj.storage_args,
+ "kwargs": obj.storage_options
+ }
+ elif isinstance(obj, PurePath):
+ return {
+ "__fsspec_type__": "path",
+ "path": str(obj)
+ }
+ elif isinstance(obj, (list, tuple)):
+ return [self.make_serializable(item) for item in obj]
+ elif isinstance(obj, dict):
+ return {key: self.make_serializable(value) for key, value in obj.items()}
+ elif hasattr(obj, "__dict__"):
+ return {
+ "__fsspec_type__": "object",
+ "class": f"{obj.__class__.__module__}.{obj.__class__.__name__}",
+ "attributes": self.make_serializable(obj.__dict__)
+ }
+ return obj
+
+ def default(self, obj):
+ return self.make_serializable(obj)
class FilesystemJSONDecoder(json.JSONDecoder):
@@ -31,8 +56,29 @@ class FilesystemJSONDecoder(json.JSONDecoder):
parse_float, parse_int=parse_int, parse_constant=parse_constant,
strict=strict, object_pairs_hook=object_pairs_hook)
- def unmake_serializable(self, obj: Any) ->Any:
+ def unmake_serializable(self, obj: Any) -> Any:
"""
Inverse function of :meth:`FilesystemJSONEncoder.make_serializable`.
"""
- pass
+ if isinstance(obj, dict):
+ if "__fsspec_type__" in obj:
+ if obj["__fsspec_type__"] == "filesystem":
+ fs_class = get_filesystem_class(obj["protocol"])
+ return fs_class(*obj.get("args", []), **obj.get("kwargs", {}))
+ elif obj["__fsspec_type__"] == "path":
+ return PurePath(obj["path"])
+ elif obj["__fsspec_type__"] == "object":
+ cls = _import_class(obj["class"])
+ instance = cls.__new__(cls)
+ instance.__dict__.update(self.unmake_serializable(obj["attributes"]))
+ return instance
+ return {key: self.unmake_serializable(value) for key, value in obj.items()}
+ elif isinstance(obj, list):
+ return [self.unmake_serializable(item) for item in obj]
+ return obj
+
+ def custom_object_hook(self, obj):
+ result = self.unmake_serializable(obj)
+ if self.original_object_hook:
+ result = self.original_object_hook(result)
+ return result
diff --git a/fsspec/mapping.py b/fsspec/mapping.py
index 05bf237..6972c83 100644
--- a/fsspec/mapping.py
+++ b/fsspec/mapping.py
@@ -62,11 +62,13 @@ class FSMap(MutableMapping):
@cached_property
def dirfs(self):
"""dirfs instance that can be used with the same keys as the mapper"""
- pass
+ from fsspec.implementations.dirfs import DirFileSystem
+ return DirFileSystem(self.root, fs=self.fs)
def clear(self):
"""Remove all keys below root - empties out mapping"""
- pass
+ self.fs.rm(self.root, recursive=True)
+ self.fs.mkdir(self.root)
def getitems(self, keys, on_error='raise'):
"""Fetch multiple items from the store
@@ -88,7 +90,22 @@ class FSMap(MutableMapping):
-------
dict(key, bytes|exception)
"""
- pass
+ out = {}
+ for key in keys:
+ try:
+ out[key] = self[key]
+ except Exception as e:
+ if on_error == 'raise':
+ if isinstance(e, self.missing_exceptions):
+ raise KeyError(key) from e
+ raise
+ elif on_error == 'omit':
+ continue
+ elif on_error == 'return':
+ out[key] = e
+ else:
+ raise ValueError(f"on_error must be 'raise', 'omit', or 'return', got {on_error}")
+ return out
def setitems(self, values_dict):
"""Set the values of multiple items in the store
@@ -97,19 +114,24 @@ class FSMap(MutableMapping):
----------
values_dict: dict(str, bytes)
"""
- pass
+ for key, value in values_dict.items():
+ self[key] = value
def delitems(self, keys):
"""Remove multiple keys from the store"""
- pass
+ for key in keys:
+ try:
+ del self[key]
+ except KeyError:
+ pass
def _key_to_str(self, key):
"""Generate full path for the key"""
- pass
+ return posixpath.join(self._root_key_to_str, key)
def _str_to_key(self, s):
"""Strip path of to leave key name"""
- pass
+ return s[len(self._root_key_to_str) + 1:]
def __getitem__(self, key, default=None):
"""Retrieve data"""
@@ -124,7 +146,14 @@ class FSMap(MutableMapping):
def pop(self, key, default=None):
"""Pop data"""
- pass
+ try:
+ value = self[key]
+ del self[key]
+ return value
+ except KeyError:
+ if default is not None:
+ return default
+ raise
def __setitem__(self, key, value):
"""Store value in key"""
@@ -187,4 +216,8 @@ def get_mapper(url='', check=False, create=False, missing_exceptions=None,
-------
``FSMap`` instance, the dict-like key-value store.
"""
- pass
+ fs, root = url_to_fs(url, **kwargs)
+ if alternate_root is not None:
+ root = alternate_root
+
+ return FSMap(root, fs, check=check, create=create, missing_exceptions=missing_exceptions)
diff --git a/fsspec/parquet.py b/fsspec/parquet.py
index be64f8a..e85d6e5 100644
--- a/fsspec/parquet.py
+++ b/fsspec/parquet.py
@@ -71,7 +71,23 @@ def open_parquet_file(path, mode='rb', fs=None, metadata=None, columns=None,
**kwargs :
Optional key-word arguments to pass to `fs.open`
"""
- pass
+ if fs is None:
+ fs, path = url_to_fs(path, **(storage_options or {}))
+
+ byte_ranges = _get_parquet_byte_ranges(
+ [path], fs, metadata=metadata, columns=columns,
+ row_groups=row_groups, max_gap=max_gap, max_block=max_block,
+ footer_sample_size=footer_sample_size, engine=engine
+ )
+
+ return fs.open(
+ path,
+ mode=mode,
+ cache_type="parts",
+ cache_options={"data": byte_ranges[path]},
+ strict=strict,
+ **kwargs
+ )
def _get_parquet_byte_ranges(paths, fs, metadata=None, columns=None,
@@ -83,7 +99,43 @@ def _get_parquet_byte_ranges(paths, fs, metadata=None, columns=None,
is intended for use as the `data` argument for the
`KnownPartsOfAFile` caching strategy of a single path.
"""
- pass
+ if engine == 'auto':
+ try:
+ engine = FastparquetEngine()
+ except ImportError:
+ try:
+ engine = PyarrowEngine()
+ except ImportError:
+ raise ImportError("Neither 'fastparquet' nor 'pyarrow' is installed")
+ elif engine == 'fastparquet':
+ engine = FastparquetEngine()
+ elif engine == 'pyarrow':
+ engine = PyarrowEngine()
+ else:
+ raise ValueError(f"Unknown engine: {engine}")
+
+ if metadata is not None:
+ return _get_parquet_byte_ranges_from_metadata(
+ metadata, fs, engine, columns, row_groups, max_gap, max_block
+ )
+
+ byte_ranges = {}
+ for path in paths:
+ with fs.open(path, 'rb') as f:
+ f.seek(-footer_sample_size, 2)
+ footer = f.read()
+
+ try:
+ metadata = engine.read_metadata(footer)
+ except Exception:
+ # If footer is not in the sample, read the entire file
+ with fs.open(path, 'rb') as f:
+ metadata = engine.read_metadata(f.read())
+
+ path_ranges = engine.get_row_group_byte_ranges(metadata, columns, row_groups)
+ byte_ranges[path] = merge_offset_ranges(path_ranges, max_gap, max_block)
+
+ return byte_ranges
def _get_parquet_byte_ranges_from_metadata(metadata, fs, engine, columns=
@@ -93,7 +145,11 @@ def _get_parquet_byte_ranges_from_metadata(metadata, fs, engine, columns=
provided, and the remote footer metadata does not need to
be transferred before calculating the required byte ranges.
"""
- pass
+ byte_ranges = {}
+ for path in metadata.row_groups[0].columns[0].file_path:
+ path_ranges = engine.get_row_group_byte_ranges(metadata, columns, row_groups)
+ byte_ranges[path] = merge_offset_ranges(path_ranges, max_gap, max_block)
+ return byte_ranges
class FastparquetEngine:
@@ -102,9 +158,46 @@ class FastparquetEngine:
import fastparquet as fp
self.fp = fp
+ def read_metadata(self, data):
+ return self.fp.ParquetFile(io.BytesIO(data))
+
+ def get_row_group_byte_ranges(self, metadata, columns=None, row_groups=None):
+ if columns is None:
+ columns = metadata.columns
+
+ if row_groups is None:
+ row_groups = range(len(metadata.row_groups))
+
+ byte_ranges = []
+ for rg in row_groups:
+ for col in columns:
+ chunk = metadata.row_groups[rg].columns[col]
+ byte_ranges.append((chunk.file_offset, chunk.file_offset + chunk.total_compressed_size))
+
+ return byte_ranges
+
class PyarrowEngine:
def __init__(self):
import pyarrow.parquet as pq
self.pq = pq
+
+ def read_metadata(self, data):
+ return self.pq.read_metadata(io.BytesIO(data))
+
+ def get_row_group_byte_ranges(self, metadata, columns=None, row_groups=None):
+ if columns is None:
+ columns = metadata.schema.names
+
+ if row_groups is None:
+ row_groups = range(metadata.num_row_groups)
+
+ byte_ranges = []
+ for rg in row_groups:
+ for col in columns:
+ column_index = metadata.schema.names.index(col)
+ chunk = metadata.row_group(rg).column(column_index)
+ byte_ranges.append((chunk.data_page_offset, chunk.total_compressed_size + chunk.data_page_offset))
+
+ return byte_ranges
diff --git a/fsspec/registry.py b/fsspec/registry.py
index e2de702..4579925 100644
--- a/fsspec/registry.py
+++ b/fsspec/registry.py
@@ -28,7 +28,15 @@ def register_implementation(name, cls, clobber=False, errtxt=None):
If given, then a failure to import the given class will result in this
text being given.
"""
- pass
+ if isinstance(cls, str):
+ known_implementations[name] = {
+ "class": cls,
+ "err": errtxt or f"Install package to use {name} protocol"
+ }
+ elif name in _registry and not clobber:
+ raise ValueError(f"Name {name} already in registry and clobber is False")
+ else:
+ _registry[name] = cls
known_implementations = {'abfs': {'class': 'adlfs.AzureBlobFileSystem',
@@ -130,7 +138,19 @@ def get_filesystem_class(protocol):
import may fail. In this case, the string in the "err" field of the
``known_implementations`` will be given as the error message.
"""
- pass
+ if protocol in registry:
+ return registry[protocol]
+
+ if protocol not in known_implementations:
+ raise ValueError(f"Protocol {protocol} is not known")
+
+ impl = known_implementations[protocol]
+ try:
+ cls = _import_class(impl['class'])
+ register_implementation(protocol, cls)
+ return cls
+ except ImportError as e:
+ raise ImportError(impl.get('err', str(e)))
s3_msg = """Your installed version of s3fs is very old and known to cause
@@ -152,7 +172,17 @@ def _import_class(fqp: str):
This can import arbitrary modules. Make sure you haven't installed any modules
that may execute malicious code at import time.
"""
- pass
+ if ':' in fqp:
+ module, klass = fqp.split(':')
+ else:
+ module, klass = fqp.rsplit('.', 1)
+
+ mod = importlib.import_module(module)
+
+ if '.' in klass:
+ return eval(f"mod.{klass}")
+ else:
+ return getattr(mod, klass)
def filesystem(protocol, **storage_options):
@@ -161,7 +191,8 @@ def filesystem(protocol, **storage_options):
``storage_options`` are specific to the protocol being chosen, and are
passed directly to the class.
"""
- pass
+ cls = get_filesystem_class(protocol)
+ return cls(**storage_options)
def available_protocols():
@@ -169,4 +200,4 @@ def available_protocols():
Note that any given protocol may require extra packages to be importable.
"""
- pass
+ return list(set(list(registry) + list(known_implementations)))
diff --git a/fsspec/spec.py b/fsspec/spec.py
index 106214a..dd3eefb 100644
--- a/fsspec/spec.py
+++ b/fsspec/spec.py
@@ -133,7 +133,7 @@ class AbstractFileSystem(metaclass=_Cached):
"""Persistent filesystem id that can be used to compare filesystems
across sessions.
"""
- pass
+ return hash(self.__class__.__name__ + str(self.storage_args) + str(self.storage_options))
def __dask_tokenize__(self):
return self._fs_token
@@ -155,11 +155,21 @@ class AbstractFileSystem(metaclass=_Cached):
May require FS-specific handling, e.g., for relative paths or links.
"""
- pass
+ if isinstance(cls.protocol, str):
+ return path[len(cls.protocol) + 3:] if path.startswith(cls.protocol + '://') else path
+ elif isinstance(cls.protocol, tuple):
+ for protocol in cls.protocol:
+ if path.startswith(protocol + '://'):
+ return path[len(protocol) + 3:]
+ return path
def unstrip_protocol(self, name: str) ->str:
"""Format FS-specific path to generic, including protocol"""
- pass
+ if isinstance(self.protocol, str):
+ return f"{self.protocol}://{name}"
+ elif isinstance(self.protocol, tuple):
+ return f"{self.protocol[0]}://{name}"
+ return name
@staticmethod
def _get_kwargs_from_urls(path):
@@ -179,7 +189,9 @@ class AbstractFileSystem(metaclass=_Cached):
If no instance has been created, then create one with defaults
"""
- pass
+ if cls._latest is None:
+ return cls()
+ return cls._cache[cls._latest]
@property
def transaction(self):
@@ -208,7 +220,10 @@ class AbstractFileSystem(metaclass=_Cached):
If None, clear all listings cached else listings at or under given
path.
"""
- pass
+ if path is None:
+ self.dircache.clear()
+ else:
+ self.dircache.clear(path)
def mkdir(self, path, create_parents=True, **kwargs):
"""
@@ -226,7 +241,12 @@ class AbstractFileSystem(metaclass=_Cached):
kwargs:
may be permissions, etc.
"""
- pass
+ if create_parents:
+ self.makedirs(path, exist_ok=True)
+ else:
+ if self.exists(path):
+ raise FileExistsError(f"Directory {path} already exists")
+ self._mkdir(path, **kwargs)
def makedirs(self, path, exist_ok=False):
"""Recursively make directories
@@ -242,7 +262,13 @@ class AbstractFileSystem(metaclass=_Cached):
exist_ok: bool (False)
If False, will error if the target already exists
"""
- pass
+ parts = path.rstrip('/').split('/')
+ for i in range(1, len(parts) + 1):
+ part = '/'.join(parts[:i])
+ if part and not self.exists(part):
+ self._mkdir(part)
+ elif not exist_ok and i == len(parts) and self.exists(part):
+ raise FileExistsError(f"Directory {path} already exists")
def rmdir(self, path):
"""Remove a directory, if empty"""
diff --git a/fsspec/transaction.py b/fsspec/transaction.py
index 9a060ac..e7d27be 100644
--- a/fsspec/transaction.py
+++ b/fsspec/transaction.py
@@ -32,11 +32,18 @@ class Transaction:
def start(self):
"""Start a transaction on this FileSystem"""
- pass
+ if self.fs:
+ self.fs._intrans = True
+ self.fs._transaction = self
def complete(self, commit=True):
"""Finish transaction: commit or discard all deferred files"""
- pass
+ while self.files:
+ f = self.files.popleft()
+ if commit:
+ f.commit()
+ else:
+ f.discard()
class FileActor:
@@ -60,4 +67,18 @@ class DaskTransaction(Transaction):
def complete(self, commit=True):
"""Finish transaction: commit or discard all deferred files"""
- pass
+ import distributed
+ client = distributed.default_client()
+
+ def process_files(files, commit):
+ for f in files.files:
+ if commit:
+ f.commit()
+ else:
+ f.discard()
+ return len(files.files)
+
+ future = client.submit(process_files, self.files, commit)
+ processed_count = future.result()
+ self.files = client.submit(FileActor, actor=True).result()
+ return processed_count
diff --git a/fsspec/utils.py b/fsspec/utils.py
index 7257878..e7a7289 100644
--- a/fsspec/utils.py
+++ b/fsspec/utils.py
@@ -48,7 +48,37 @@ def infer_storage_options(urlpath: str, inherit_storage_options: (dict[str,
"host": "node", "port": 123, "path": "/mnt/datasets/test.csv",
"url_query": "q=1", "extra": "value"}
"""
- pass
+ result = {}
+ if inherit_storage_options:
+ result.update(inherit_storage_options)
+
+ parsed_url = urlsplit(urlpath)
+ protocol = parsed_url.scheme or 'file'
+
+ if protocol == 'file':
+ result['protocol'] = protocol
+ result['path'] = urlpath
+ else:
+ result['protocol'] = protocol
+ result['path'] = parsed_url.path
+ if parsed_url.netloc:
+ if '@' in parsed_url.netloc:
+ userinfo, hostinfo = parsed_url.netloc.split('@', 1)
+ if ':' in userinfo:
+ result['username'], result['password'] = userinfo.split(':', 1)
+ else:
+ result['username'] = userinfo
+ else:
+ hostinfo = parsed_url.netloc
+ if ':' in hostinfo:
+ result['host'], port = hostinfo.split(':', 1)
+ result['port'] = int(port)
+ else:
+ result['host'] = hostinfo
+ if parsed_url.query:
+ result['url_query'] = parsed_url.query
+
+ return result
compressions: dict[str, str] = {}
@@ -61,7 +91,10 @@ def infer_compression(filename: str) ->(str | None):
extension. This includes builtin (gz, bz2, zip) compressions, as well as
optional compressions. See fsspec.compression.register_compression.
"""
- pass
+ extension = os.path.splitext(filename)[-1].lower()
+ if extension.startswith('.'):
+ extension = extension[1:]
+ return compressions.get(extension)
def build_name_function(max_int: float) ->Callable[[int], str]:
@@ -82,7 +115,15 @@ def build_name_function(max_int: float) ->Callable[[int], str]:
>>> build_name_function(0)(0)
'0'
"""
- pass
+ if max_int == 0:
+ return lambda x: '0'
+
+ width = int(math.log10(max_int)) + 1
+
+ def name_function(x: int) -> str:
+ return f'{x:0{width}d}'
+
+ return name_function
def seek_delimiter(file: IO[bytes], delimiter: bytes, blocksize: int) ->bool:
@@ -107,7 +148,22 @@ def seek_delimiter(file: IO[bytes], delimiter: bytes, blocksize: int) ->bool:
Returns True if a delimiter was found, False if at file start or end.
"""
- pass
+ if file.tell() == 0:
+ return False
+
+ last = b''
+ while True:
+ current = file.read(blocksize)
+ if not current:
+ return False
+ full = last + current
+ try:
+ i = full.index(delimiter)
+ file.seek(file.tell() - (len(full) - i) + len(delimiter))
+ return True
+ except ValueError:
+ pass
+ last = full[-len(delimiter):]
def read_block(f: IO[bytes], offset: int, length: (int | None), delimiter: