back to Claude Sonnet 3.5 - Fill-in summary
Claude Sonnet 3.5 - Fill-in: click
Failed to run pytests for test tests
ImportError while loading conftest '/testbed/tests/conftest.py'.
tests/conftest.py:3: in <module>
from click.testing import CliRunner
src/click/__init__.py:7: in <module>
from .core import Argument as Argument
src/click/core.py:15: in <module>
from . import types
src/click/types.py:8: in <module>
from ._compat import _get_argv_encoding
src/click/_compat.py:126: in <module>
_default_text_stdin = _make_cached_stream_func(lambda : sys.stdin,
E NameError: name '_make_cached_stream_func' is not defined
Patch diff
diff --git a/src/click/_compat.py b/src/click/_compat.py
index 7f5b9af..0cb3f81 100644
--- a/src/click/_compat.py
+++ b/src/click/_compat.py
@@ -13,12 +13,13 @@ _ansi_re = re.compile('\\033\\[[;?0-9]*[a-zA-Z]')
def is_ascii_encoding(encoding: str) ->bool:
"""Checks if a given encoding is ascii."""
- pass
+ return encoding.replace('-', '').lower() in {'ascii', 'us_ascii'}
def get_best_encoding(stream: t.IO[t.Any]) ->str:
"""Returns the default stream encoding if not found."""
- pass
+ rv = getattr(stream, 'encoding', None) or sys.getdefaultencoding()
+ return rv if rv != 'ascii' else 'utf-8'
class _NonClosingTextIOWrapper(io.TextIOWrapper):
@@ -59,7 +60,7 @@ class _FixupStream:
def _stream_is_misconfigured(stream: t.TextIO) ->bool:
"""A stream is misconfigured if its encoding is ASCII."""
- pass
+ return is_ascii_encoding(getattr(stream, 'encoding', None) or '')
def _is_compat_stream_attr(stream: t.TextIO, attr: str, value: t.Optional[str]
@@ -68,7 +69,8 @@ def _is_compat_stream_attr(stream: t.TextIO, attr: str, value: t.Optional[str]
desired value or the desired value is unset and the attribute
has a value.
"""
- pass
+ stream_value = getattr(stream, attr, None)
+ return stream_value == value or (value is None and stream_value is not None)
def _is_compatible_text_stream(stream: t.TextIO, encoding: t.Optional[str],
@@ -76,13 +78,15 @@ def _is_compatible_text_stream(stream: t.TextIO, encoding: t.Optional[str],
"""Check if a stream's encoding and errors attributes are
compatible with the desired values.
"""
- pass
+ return _is_compat_stream_attr(stream, 'encoding', encoding) and _is_compat_stream_attr(stream, 'errors', errors)
def _wrap_io_open(file: t.Union[str, 'os.PathLike[str]', int], mode: str,
encoding: t.Optional[str], errors: t.Optional[str]) ->t.IO[t.Any]:
"""Handles not passing ``encoding`` and ``errors`` in binary mode."""
- pass
+ if 'b' in mode:
+ return open(file, mode)
+ return open(file, mode, encoding=encoding, errors=errors)
class _AtomicFile:
diff --git a/src/click/_termui_impl.py b/src/click/_termui_impl.py
index 4ed4097..3419b62 100644
--- a/src/click/_termui_impl.py
+++ b/src/click/_termui_impl.py
@@ -120,19 +120,40 @@ class ProgressBar(t.Generic[V]):
Only render when the number of steps meets the
``update_min_steps`` threshold.
"""
- pass
+ self.pos += n_steps
+ if current_item is not None:
+ self.current_item = current_item
+
+ self._completed_intervals += n_steps
+ if self._completed_intervals >= self.update_min_steps:
+ self._completed_intervals = 0
+ self.render_progress()
def generator(self) ->t.Iterator[V]:
"""Return a generator which yields the items added to the bar
during construction, and updates the progress bar *after* the
yielded block returns.
"""
- pass
+ for item in self.iter:
+ yield item
+ self.update(1, item)
+ self.finish()
def pager(generator: t.Iterable[str], color: t.Optional[bool]=None) ->None:
"""Decide what method to use for paging through text."""
- pass
+ stdout = _default_text_stdout()
+ if not isatty(stdout):
+ return _nullpager(stdout, generator, color)
+
+ pager_cmd = os.environ.get('PAGER', 'more')
+ if pager_cmd == 'more' and WIN:
+ return _tempfilepager(generator, 'more <', color)
+
+ if 'less' in pager_cmd and not WIN:
+ return _pipepager(generator, 'less -R', color)
+
+ return _tempfilepager(generator, pager_cmd, color)
def _pipepager(generator: t.Iterable[str], cmd: str, color: t.Optional[bool]
@@ -140,19 +161,50 @@ def _pipepager(generator: t.Iterable[str], cmd: str, color: t.Optional[bool]
"""Page through text by feeding it to another program. Invoking a
pager through this might support colors.
"""
- pass
+ import subprocess
+ env = dict(os.environ)
+ env['LESS'] = 'FRSX'
+
+ c = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, env=env)
+ encoding = get_best_encoding(c.stdin)
+
+ try:
+ for text in generator:
+ if not color:
+ text = strip_ansi(text)
+ c.stdin.write(text.encode(encoding, 'replace'))
+ except (IOError, KeyboardInterrupt):
+ pass
+ else:
+ c.stdin.close()
+ finally:
+ c.wait()
def _tempfilepager(generator: t.Iterable[str], cmd: str, color: t.Optional[
bool]) ->None:
"""Page through text by invoking a program on a temporary file."""
- pass
+ import tempfile
+ filename = tempfile.mktemp()
+ with open_stream(filename, 'wb')[0] as f:
+ for text in generator:
+ if not color:
+ text = strip_ansi(text)
+ f.write(text.encode('utf-8'))
+
+ try:
+ os.system(cmd + ' "' + filename + '"')
+ finally:
+ os.unlink(filename)
def _nullpager(stream: t.TextIO, generator: t.Iterable[str], color: t.
Optional[bool]) ->None:
"""Simply print unformatted text. This is the ultimate fallback."""
- pass
+ for text in generator:
+ if not color:
+ text = strip_ansi(text)
+ stream.write(text)
class Editor:
diff --git a/src/click/_textwrap.py b/src/click/_textwrap.py
index 681ea25..5fad1c0 100644
--- a/src/click/_textwrap.py
+++ b/src/click/_textwrap.py
@@ -4,4 +4,39 @@ from contextlib import contextmanager
class TextWrapper(textwrap.TextWrapper):
- pass
+ def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:
+ super().__init__(*args, **kwargs)
+ self._current_indent = 0
+
+ def wrap(self, text: str) -> t.List[str]:
+ """Wraps the single paragraph in text and returns a list of wrapped lines.
+
+ This method overrides the base class method to apply the current indent.
+ """
+ lines = super().wrap(text)
+ return [self._current_indent * ' ' + line for line in lines]
+
+ @contextmanager
+ def extra_indent(self, indent: int) -> t.Generator[None, None, None]:
+ """A context manager for temporarily increasing the indentation."""
+ self._current_indent += indent
+ try:
+ yield
+ finally:
+ self._current_indent -= indent
+
+ def dedent(self) -> None:
+ """Decrease the current indentation by one level."""
+ if self._current_indent > 0:
+ self._current_indent -= 1
+
+ def indent(self) -> None:
+ """Increase the current indentation by one level."""
+ self._current_indent += 1
+
+ def fill(self, text: str) -> str:
+ """Fill a single paragraph of text, returning a new string.
+
+ This method overrides the base class method to apply the current indent.
+ """
+ return '\n'.join(self.wrap(text))
diff --git a/src/click/_winconsole.py b/src/click/_winconsole.py
index 514d97e..0b5802c 100644
--- a/src/click/_winconsole.py
+++ b/src/click/_winconsole.py
@@ -69,11 +69,48 @@ class _WindowsConsoleRawIOBase(io.RawIOBase):
class _WindowsConsoleReader(_WindowsConsoleRawIOBase):
- pass
+ def readable(self):
+ return True
+
+ def readinto(self, b):
+ bytes_to_be_read = len(b)
+ if not bytes_to_be_read:
+ return 0
+
+ buffer = (c_char * bytes_to_be_read)()
+ bytes_read = c_ulong(0)
+ success = ReadConsoleW(self.handle, buffer, bytes_to_be_read, byref(bytes_read), None)
+
+ if not success:
+ raise IOError(f"Windows error: {GetLastError()}")
+
+ if bytes_read.value:
+ b[:bytes_read.value] = buffer.raw[:bytes_read.value]
+ return bytes_read.value
class _WindowsConsoleWriter(_WindowsConsoleRawIOBase):
- pass
+ def writable(self):
+ return True
+
+ def write(self, b):
+ bytes_to_be_written = len(b)
+ if not bytes_to_be_written:
+ return 0
+
+ buffer = c_char_p(b)
+ bytes_written = c_ulong(0)
+
+ while bytes_written.value < bytes_to_be_written:
+ chunk = min(bytes_to_be_written - bytes_written.value, MAX_BYTES_WRITTEN)
+ success = WriteConsoleW(self.handle, buffer, chunk, byref(bytes_written), None)
+
+ if not success:
+ raise IOError(f"Windows error: {GetLastError()}")
+
+ buffer = c_char_p(b[bytes_written.value:])
+
+ return bytes_written.value
class ConsoleStream:
diff --git a/src/click/core.py b/src/click/core.py
index 354452a..8f8ec25 100644
--- a/src/click/core.py
+++ b/src/click/core.py
@@ -50,14 +50,27 @@ def _complete_visible_commands(ctx: 'Context', incomplete: str) ->t.Iterator[t
:param ctx: Invocation context for the group.
:param incomplete: Value being completed. May be empty.
"""
- pass
+ for name, command in ctx.command.commands.items():
+ if (
+ not command.hidden
+ and name.startswith(incomplete)
+ and not ctx.resilient_parsing
+ ):
+ yield name, command
@contextmanager
def augment_usage_errors(ctx: 'Context', param: t.Optional['Parameter']=None
) ->t.Iterator[None]:
"""Context manager that attaches extra information to exceptions."""
- pass
+ try:
+ yield
+ except UsageError as e:
+ if e.ctx is None:
+ e.ctx = ctx
+ if param is not None and e.param is None:
+ e.param = param
+ raise
def iter_params_for_processing(invocation_order: t.Sequence['Parameter'],
@@ -66,7 +79,13 @@ def iter_params_for_processing(invocation_order: t.Sequence['Parameter'],
for processing and an iterable of parameters that exist, this returns
a list in the correct order as they should be processed.
"""
- pass
+ def sort_key(param: 'Parameter') -> t.Tuple[bool, int]:
+ if param not in declaration_order:
+ return (True, -1)
+ idx = declaration_order.index(param)
+ return (False, idx)
+
+ return sorted(invocation_order, key=sort_key)
class ParameterSource(enum.Enum):
@@ -784,7 +803,9 @@ class Command(BaseCommand):
Calls :meth:`format_usage` internally.
"""
- pass
+ formatter = ctx.make_formatter()
+ self.format_usage(ctx, formatter)
+ return formatter.getvalue().rstrip('\n')
def format_usage(self, ctx: Context, formatter: HelpFormatter) ->None:
"""Writes the usage line into the formatter.
@@ -797,7 +818,15 @@ class Command(BaseCommand):
"""Returns all the pieces that go into the usage line and returns
it as a list of strings.
"""
- pass
+ pieces = []
+ for param in self.get_params(ctx):
+ if param.nargs == -1:
+ pieces.append(f'[{param.make_metavar()}]...')
+ elif param.nargs > 1:
+ pieces.append(f'{param.make_metavar()}...')
+ else:
+ pieces.append(param.make_metavar())
+ return pieces
def get_help_option_names(self, ctx: Context) ->t.List[str]:
"""Returns the names for the help option."""
diff --git a/src/click/decorators.py b/src/click/decorators.py
index ec1bfc9..9e17579 100644
--- a/src/click/decorators.py
+++ b/src/click/decorators.py
@@ -25,7 +25,10 @@ def pass_context(f: 't.Callable[te.Concatenate[Context, P], R]'
"""Marks a callback as wanting to receive the current context
object as first argument.
"""
- pass
+ @t.wraps(f)
+ def new_func(*args: P.args, **kwargs: P.kwargs) -> R:
+ return f(get_current_context(), *args, **kwargs)
+ return t.cast('t.Callable[P, R]', new_func)
def pass_obj(f: 't.Callable[te.Concatenate[t.Any, P], R]'
@@ -34,7 +37,10 @@ def pass_obj(f: 't.Callable[te.Concatenate[t.Any, P], R]'
context onwards (:attr:`Context.obj`). This is useful if that object
represents the state of a nested system.
"""
- pass
+ @t.wraps(f)
+ def new_func(*args: P.args, **kwargs: P.kwargs) -> R:
+ return f(get_current_context().obj, *args, **kwargs)
+ return t.cast('t.Callable[P, R]', new_func)
def make_pass_decorator(object_type: t.Type[T], ensure: bool=False
@@ -61,7 +67,19 @@ def make_pass_decorator(object_type: t.Type[T], ensure: bool=False
:param ensure: if set to `True`, a new object will be created and
remembered on the context if it's not there yet.
"""
- pass
+ def decorator(f: 't.Callable[te.Concatenate[T, P], R]') -> 't.Callable[P, R]':
+ @t.wraps(f)
+ def new_func(*args: P.args, **kwargs: P.kwargs) -> R:
+ ctx = get_current_context()
+ obj = ctx.find_object(object_type)
+ if obj is None and ensure:
+ obj = object_type()
+ ctx.ensure_object(object_type)
+ if obj is None:
+ raise RuntimeError(f'Managed to invoke callback without a context object of type {object_type.__name__} existing')
+ return ctx.invoke(f, obj, *args, **kwargs)
+ return t.cast('t.Callable[P, R]', new_func)
+ return decorator
def pass_meta_key(key: str, *, doc_description: t.Optional[str]=None
@@ -77,7 +95,21 @@ def pass_meta_key(key: str, *, doc_description: t.Optional[str]=None
.. versionadded:: 8.0
"""
- pass
+ if doc_description is None:
+ doc_description = f"the '{key}' key from Context.meta"
+
+ def decorator(f: t.Callable[te.Concatenate[t.Any, P], R]) -> t.Callable[P, R]:
+ @t.wraps(f)
+ def new_func(*args: P.args, **kwargs: P.kwargs) -> R:
+ ctx = get_current_context()
+ if key not in ctx.meta:
+ raise RuntimeError(f"'{key}' not found in Context.meta")
+ return f(ctx.meta[key], *args, **kwargs)
+
+ new_func.__doc__ = f"Decorator that passes {doc_description} as the first argument to the decorated function.\n\n{new_func.__doc__}"
+ return t.cast(t.Callable[P, R], new_func)
+
+ return decorator
CmdType = t.TypeVar('CmdType', bound=Command)
@@ -114,7 +146,31 @@ def command(name: t.Union[t.Optional[str], _AnyCallable]=None, cls: t.
The ``params`` argument can be used. Decorated params are
appended to the end of the list.
"""
- pass
+ def decorator(f: _AnyCallable) -> t.Union[Command, CmdType]:
+ if isinstance(name, _AnyCallable):
+ # The decorator was applied without parentheses
+ cmd_name = name.__name__.replace('_', '-')
+ else:
+ cmd_name = name or f.__name__.replace('_', '-')
+
+ cmd_cls = cls or Command
+ cmd_attrs = attrs.copy()
+
+ # If 'params' is in attrs, we need to append any decorated params
+ if 'params' in cmd_attrs:
+ cmd_attrs['params'] = list(cmd_attrs['params'])
+ else:
+ cmd_attrs['params'] = []
+ cmd_attrs['params'].extend(getattr(f, '__click_params__', []))
+
+ cmd = cmd_cls(name=cmd_name, callback=f, **cmd_attrs)
+ cmd.__doc__ = f.__doc__
+ return cmd
+
+ if callable(name):
+ # The decorator was applied without parentheses
+ return decorator(name)
+ return decorator
GrpType = t.TypeVar('GrpType', bound=Group)
diff --git a/src/click/formatting.py b/src/click/formatting.py
index 2586652..ec937dd 100644
--- a/src/click/formatting.py
+++ b/src/click/formatting.py
@@ -26,7 +26,25 @@ def wrap_text(text: str, width: int=78, initial_indent: str='',
:param preserve_paragraphs: if this flag is set then the wrapping will
intelligently handle paragraphs.
"""
- pass
+ from textwrap import TextWrapper
+
+ if not preserve_paragraphs:
+ wrapper = TextWrapper(width=width, initial_indent=initial_indent,
+ subsequent_indent=subsequent_indent)
+ return wrapper.fill(text)
+
+ paragraphs = text.split('\n\n')
+ wrapped_paragraphs = []
+
+ for paragraph in paragraphs:
+ if paragraph.startswith('\b'):
+ wrapped_paragraphs.append(paragraph[1:])
+ else:
+ wrapper = TextWrapper(width=width, initial_indent=initial_indent,
+ subsequent_indent=subsequent_indent)
+ wrapped_paragraphs.append(wrapper.fill(paragraph))
+
+ return '\n\n'.join(wrapped_paragraphs)
class HelpFormatter:
@@ -58,15 +76,15 @@ class HelpFormatter:
def write(self, string: str) ->None:
"""Writes a unicode string into the internal buffer."""
- pass
+ self.buffer.append(string)
def indent(self) ->None:
"""Increases the indentation."""
- pass
+ self.current_indent += self.indent_increment
def dedent(self) ->None:
"""Decreases the indentation."""
- pass
+ self.current_indent = max(0, self.current_indent - self.indent_increment)
def write_usage(self, prog: str, args: str='', prefix: t.Optional[str]=None
) ->None:
@@ -77,21 +95,29 @@ class HelpFormatter:
:param prefix: The prefix for the first line. Defaults to
``"Usage: "``.
"""
- pass
+ if prefix is None:
+ prefix = _("Usage: ")
+ usage = f"{prefix}{prog} {args}".rstrip()
+ self.write(wrap_text(usage, width=self.width, initial_indent=self.current_indent * " ",
+ subsequent_indent=self.current_indent * " " + " " * len(prefix)))
+ self.write("\n")
def write_heading(self, heading: str) ->None:
"""Writes a heading into the buffer."""
- pass
+ self.write(f"{' ' * self.current_indent}{heading}:\n")
def write_paragraph(self) ->None:
"""Writes a paragraph into the buffer."""
- pass
+ self.write("\n")
def write_text(self, text: str) ->None:
"""Writes re-indented text into the buffer. This rewraps and
preserves paragraphs.
"""
- pass
+ indent = " " * self.current_indent
+ self.write(wrap_text(text, width=self.width, initial_indent=indent,
+ subsequent_indent=indent, preserve_paragraphs=True))
+ self.write("\n")
def write_dl(self, rows: t.Sequence[t.Tuple[str, str]], col_max: int=30,
col_spacing: int=2) ->None:
@@ -103,7 +129,20 @@ class HelpFormatter:
:param col_spacing: the number of spaces between the first and
second column.
"""
- pass
+ indent = " " * self.current_indent
+ for first, second in rows:
+ first_width = term_len(first)
+ if first_width <= col_max:
+ self.write(f"{indent}{first}")
+ self.write(" " * (col_max - first_width))
+ self.write(" " * col_spacing)
+ else:
+ self.write(f"{indent}{first}\n")
+ self.write(" " * (col_max + col_spacing))
+
+ wrapped_text = wrap_text(second, width=self.width - col_max - col_spacing,
+ initial_indent="", subsequent_indent=" " * (col_max + col_spacing))
+ self.write(f"{wrapped_text}\n")
@contextmanager
def section(self, name: str) ->t.Iterator[None]:
@@ -112,16 +151,26 @@ class HelpFormatter:
:param name: the section name that is written as heading.
"""
- pass
+ self.write_paragraph()
+ self.write_heading(name)
+ self.indent()
+ try:
+ yield
+ finally:
+ self.dedent()
@contextmanager
def indentation(self) ->t.Iterator[None]:
"""A context manager that increases the indentation."""
- pass
+ self.indent()
+ try:
+ yield
+ finally:
+ self.dedent()
def getvalue(self) ->str:
"""Returns the buffer contents."""
- pass
+ return "".join(self.buffer)
def join_options(options: t.Sequence[str]) ->t.Tuple[str, bool]:
@@ -130,4 +179,17 @@ def join_options(options: t.Sequence[str]) ->t.Tuple[str, bool]:
any_prefix_is_slash)`` where the second item in the tuple is a flag that
indicates if any of the option prefixes was a slash.
"""
- pass
+ rv = []
+ any_prefix_is_slash = False
+ for opt in options:
+ prefix = split_opt(opt)[0]
+ if prefix == '/':
+ any_prefix_is_slash = True
+ rv.append((len(prefix), opt))
+
+ rv.sort(key=lambda x: x[0])
+ rv = [x[1] for x in rv]
+
+ if len(rv) == 1:
+ return rv[0], any_prefix_is_slash
+ return f"{' / '.join(rv)}", any_prefix_is_slash
diff --git a/src/click/globals.py b/src/click/globals.py
index ca86d44..efbf733 100644
--- a/src/click/globals.py
+++ b/src/click/globals.py
@@ -21,17 +21,27 @@ def get_current_context(silent: bool=False) ->t.Optional['Context']:
is available. The default behavior is to raise a
:exc:`RuntimeError`.
"""
- pass
+ try:
+ return t.cast('Context', _local.stack[-1])
+ except (AttributeError, IndexError):
+ if silent:
+ return None
+ raise RuntimeError('There is no active click context.') from None
def push_context(ctx: 'Context') ->None:
"""Pushes a new context to the current stack."""
- pass
+ if not hasattr(_local, 'stack'):
+ _local.stack = []
+ _local.stack.append(ctx)
def pop_context() ->None:
"""Removes the top level from the stack."""
- pass
+ try:
+ _local.stack.pop()
+ except (AttributeError, IndexError):
+ raise RuntimeError('Cannot pop context. There is no active click context.') from None
def resolve_color_default(color: t.Optional[bool]=None) ->t.Optional[bool]:
@@ -39,4 +49,9 @@ def resolve_color_default(color: t.Optional[bool]=None) ->t.Optional[bool]:
value is passed it's returned unchanged, otherwise it's looked up from
the current context.
"""
- pass
+ if color is not None:
+ return color
+ ctx = get_current_context(silent=True)
+ if ctx is not None:
+ return ctx.color
+ return None
diff --git a/src/click/parser.py b/src/click/parser.py
index 5baffed..f836ae3 100644
--- a/src/click/parser.py
+++ b/src/click/parser.py
@@ -46,7 +46,25 @@ def _unpack_args(args: t.Sequence[str], nargs_spec: t.Sequence[int]) ->t.Tuple[
Missing items are filled with `None`.
"""
- pass
+ args = list(args)
+ nargs_spec = list(nargs_spec)
+ unpacked = []
+ remaining = []
+
+ for nargs in nargs_spec:
+ if nargs == -1:
+ unpacked.append(args)
+ break
+ if len(args) < nargs:
+ unpacked.append(args + [None] * (nargs - len(args)))
+ break
+ unpacked.append(args[:nargs])
+ args = args[nargs:]
+
+ if args:
+ remaining = args
+
+ return unpacked, remaining
def split_arg_string(string: str) ->t.List[str]:
@@ -64,7 +82,14 @@ def split_arg_string(string: str) ->t.List[str]:
:param string: String to split.
"""
- pass
+ import shlex
+ lex = shlex.shlex(string, posix=True)
+ lex.whitespace_split = True
+ lex.commenters = ''
+ try:
+ return list(lex)
+ except ValueError:
+ return string.split()
class Option:
@@ -149,7 +174,12 @@ class OptionParser:
The `obj` can be used to identify the option in the order list
that is returned from the parser.
"""
- pass
+ option = Option(obj, opts, dest, action, nargs, const)
+ self._opt_prefixes.update(option.prefixes)
+ for opt in option._short_opts:
+ self._short_opt[opt] = option
+ for opt in option._long_opts:
+ self._long_opt[opt] = option
def add_argument(self, obj: 'CoreArgument', dest: t.Optional[str],
nargs: int=1) ->None:
@@ -158,7 +188,7 @@ class OptionParser:
The `obj` can be used to identify the option in the order list
that is returned from the parser.
"""
- pass
+ self._args.append(Argument(obj, dest, nargs))
def parse_args(self, args: t.List[str]) ->t.Tuple[t.Dict[str, t.Any], t
.List[str], t.List['CoreParameter']]:
@@ -168,4 +198,119 @@ class OptionParser:
appear on the command line. If arguments appear multiple times they
will be memorized multiple times as well.
"""
- pass
+ state = ParsingState(args)
+ try:
+ self._process_args_for_options(state)
+ self._process_args_for_args(state)
+ except UsageError:
+ if self.ctx is None or not self.ctx.resilient_parsing:
+ raise
+ return state.opts, state.largs, state.order
+
+ def _process_args_for_options(self, state: ParsingState) ->None:
+ while state.rargs:
+ arg = state.rargs.pop(0)
+ arglen = len(arg)
+ # Double dashes
+ if arg == '--':
+ return
+ # Long option
+ elif arg.startswith('--'):
+ self._process_long_opt(arg[2:], state)
+ # Short option
+ elif arg[:1] in self._opt_prefixes and arglen > 1:
+ self._process_short_opts(arg, state)
+ elif self.allow_interspersed_args:
+ state.largs.append(arg)
+ else:
+ state.rargs.insert(0, arg)
+ return
+
+ def _process_args_for_args(self, state: ParsingState) ->None:
+ state.largs.extend(state.rargs)
+ state.rargs[:] = []
+ for idx, arg_spec in enumerate(self._args):
+ nargs = arg_spec.nargs
+ if len(state.largs) < nargs:
+ if nargs == -1:
+ break
+ raise BadArgumentUsage(f'Missing argument "{arg_spec.dest}"')
+ if nargs == -1:
+ state.opts[arg_spec.dest] = state.largs[idx:]
+ state.order.append(arg_spec.obj)
+ break
+ state.opts[arg_spec.dest] = state.largs[idx:idx + nargs]
+ state.order.append(arg_spec.obj)
+
+ def _process_long_opt(self, opt: str, state: ParsingState) ->None:
+ if '=' in opt:
+ opt, next_arg = opt.split('=', 1)
+ else:
+ next_arg = None
+
+ try:
+ option = self._long_opt[opt]
+ except KeyError:
+ if self.ignore_unknown_options:
+ return
+ raise NoSuchOption(opt)
+
+ if option.nargs == 0:
+ if next_arg:
+ raise BadOptionUsage(f'Option "--{opt}" does not take a value')
+ elif option.nargs == 1:
+ if next_arg is None:
+ if not state.rargs:
+ raise BadOptionUsage(f'Option "--{opt}" requires an argument')
+ next_arg = state.rargs.pop(0)
+ else:
+ if next_arg is None:
+ if not state.rargs:
+ raise BadOptionUsage(f'Option "--{opt}" requires {option.nargs} arguments')
+ next_arg = state.rargs[:option.nargs]
+ state.rargs[:] = state.rargs[option.nargs:]
+ else:
+ next_arg = [next_arg] + state.rargs[:option.nargs - 1]
+ state.rargs[:] = state.rargs[option.nargs - 1:]
+
+ self._process_option(option, opt, next_arg, state)
+
+ def _process_short_opts(self, arg: str, state: ParsingState) ->None:
+ i = 1
+ prefix = arg[0]
+ while i < len(arg):
+ opt = arg[i]
+ i += 1
+ try:
+ option = self._short_opt[f'{prefix}{opt}']
+ except KeyError:
+ if self.ignore_unknown_options:
+ continue
+ raise NoSuchOption(opt)
+
+ if option.nargs == 0:
+ self._process_option(option, opt, None, state)
+ else:
+ if i < len(arg):
+ next_arg = arg[i:]
+ i = len(arg)
+ else:
+ if not state.rargs:
+ raise BadOptionUsage(f'Option "-{opt}" requires an argument')
+ next_arg = state.rargs.pop(0)
+ self._process_option(option, opt, next_arg, state)
+
+ def _process_option(self, option: Option, opt: str, value: t.Any, state: ParsingState) ->None:
+ state.order.append(option.obj)
+ if option.action == 'store':
+ state.opts[option.dest] = value
+ elif option.action == 'store_const':
+ state.opts[option.dest] = option.const
+ elif option.action == 'append':
+ state.opts.setdefault(option.dest, []).append(value)
+ elif option.action == 'append_const':
+ state.opts.setdefault(option.dest, []).append(option.const)
+ elif option.action == 'count':
+ state.opts[option.dest] = state.opts.get(option.dest, 0) + 1
+ else:
+ raise ValueError(f'Unknown action "{option.action}"')
diff --git a/src/click/shell_completion.py b/src/click/shell_completion.py
index 6956829..055be50 100644
--- a/src/click/shell_completion.py
+++ b/src/click/shell_completion.py
@@ -27,7 +27,15 @@ def shell_complete(cli: BaseCommand, ctx_args: t.MutableMapping[str, t.Any],
instruction and shell, in the form ``instruction_shell``.
:return: Status code to exit with.
"""
- pass
+ shell, _, _ = instruction.partition("_")
+ comp_cls = get_completion_class(shell)
+ if comp_cls is None:
+ return 1
+
+ comp = comp_cls(cli, ctx_args, prog_name, complete_var)
+ completion = comp.complete()
+ echo(completion)
+ return 0
class CompletionItem:
@@ -185,7 +193,7 @@ class ShellComplete:
"""The name of the shell function defined by the completion
script.
"""
- pass
+ return f"_{self.prog_name}_completion"
def source_vars(self) ->t.Dict[str, t.Any]:
"""Vars for formatting :attr:`source_template`.
@@ -193,7 +201,11 @@ class ShellComplete:
By default this provides ``complete_func``, ``complete_var``,
and ``prog_name``.
"""
- pass
+ return {
+ "complete_func": self.func_name,
+ "complete_var": self.complete_var,
+ "prog_name": self.prog_name,
+ }
def source(self) ->str:
"""Produce the shell script that defines the completion
@@ -201,7 +213,7 @@ class ShellComplete:
:attr:`source_template` with the dict returned by
:meth:`source_vars`.
"""
- pass
+ return self.source_template % self.source_vars()
def get_completion_args(self) ->t.Tuple[t.List[str], str]:
"""Use the env vars defined by the shell script to return a
@@ -273,7 +285,11 @@ def add_completion_class(cls: ShellCompleteType, name: t.Optional[str]=None
:param name: Name to register the class under. Defaults to the
class's ``name`` attribute.
"""
- pass
+ if name is None:
+ name = cls.name
+
+ _available_shells[name] = cls
+ return cls
def get_completion_class(shell: str) ->t.Optional[t.Type[ShellComplete]]:
@@ -283,7 +299,7 @@ def get_completion_class(shell: str) ->t.Optional[t.Type[ShellComplete]]:
:param shell: Name the class is registered under.
"""
- pass
+ return _available_shells.get(shell)
def _is_incomplete_argument(ctx: Context, param: Parameter) ->bool:
@@ -294,12 +310,20 @@ def _is_incomplete_argument(ctx: Context, param: Parameter) ->bool:
parsed complete args.
:param param: Argument object being checked.
"""
- pass
+ if not isinstance(param, Argument):
+ return False
+
+ value = ctx.params.get(param.name)
+ if value is None:
+ return True
+ if isinstance(value, (tuple, list)):
+ return param.nargs == -1 or len(value) < param.nargs
+ return False
def _start_of_option(ctx: Context, value: str) ->bool:
"""Check if the value looks like the start of an option."""
- pass
+ return value and value[:1] in ctx.command.params.get("prefix_chars", "-")
def _is_incomplete_option(ctx: Context, args: t.List[str], param: Parameter
@@ -309,7 +333,25 @@ def _is_incomplete_option(ctx: Context, args: t.List[str], param: Parameter
:param args: List of complete args before the incomplete value.
:param param: Option object being checked.
"""
- pass
+ if not isinstance(param, Option):
+ return False
+
+ if param.is_flag:
+ return False
+
+ last_option = None
+ for index, arg in enumerate(reversed(args)):
+ if index + 1 > param.nargs:
+ break
+ if _start_of_option(ctx, arg):
+ last_option = arg
+
+ if last_option is None:
+ return False
+
+ if param.opts:
+ return last_option in param.opts
+ return last_option in param.secondary_opts
def _resolve_context(cli: BaseCommand, ctx_args: t.MutableMapping[str, t.
@@ -322,7 +364,22 @@ def _resolve_context(cli: BaseCommand, ctx_args: t.MutableMapping[str, t.
:param prog_name: Name of the executable in the shell.
:param args: List of complete args before the incomplete value.
"""
- pass
+ ctx = cli.make_context(prog_name, args, **ctx_args)
+ args = list(args)
+
+ while args:
+ try:
+ if isinstance(ctx.command, MultiCommand):
+ cmd_name, cmd, args = ctx.command.resolve_command(ctx, args)
+ if cmd is None:
+ return ctx
+ ctx = cmd.make_context(cmd_name, args, parent=ctx)
+ else:
+ break
+ except Exception:
+ break
+
+ return ctx
def _resolve_incomplete(ctx: Context, args: t.List[str], incomplete: str
@@ -335,4 +392,17 @@ def _resolve_incomplete(ctx: Context, args: t.List[str], incomplete: str
:param args: List of complete args before the incomplete value.
:param incomplete: Value being completed. May be empty.
"""
- pass
+ params = ctx.command.get_params(ctx)
+
+ for param in params:
+ if _is_incomplete_argument(ctx, param):
+ return param, incomplete
+
+ if _start_of_option(ctx, incomplete):
+ return ctx.command, incomplete
+
+ for param in params:
+ if _is_incomplete_option(ctx, args, param):
+ return param, incomplete
+
+ return ctx.command, incomplete
diff --git a/src/click/termui.py b/src/click/termui.py
index 277721a..6582c5b 100644
--- a/src/click/termui.py
+++ b/src/click/termui.py
@@ -71,7 +71,46 @@ def prompt(text: str, default: t.Optional[t.Any]=None, hide_input: bool=
Added the `err` parameter.
"""
- pass
+ prompt_text = text
+ if show_choices and isinstance(type, Choice):
+ prompt_text += f" ({', '.join(type.choices)})"
+ if default is not None and show_default:
+ prompt_text += f" [{default}]"
+ prompt_text += prompt_suffix
+
+ while True:
+ try:
+ if hide_input:
+ value = visible_prompt_func(prompt_text)
+ else:
+ value = input(prompt_text)
+ except (KeyboardInterrupt, EOFError):
+ raise Abort()
+
+ if value == "" and default is not None:
+ value = default
+
+ if value_proc is not None:
+ try:
+ value = value_proc(value)
+ except UsageError as e:
+ echo(f"Error: {e.message}", err=err)
+ continue
+
+ if type is not None:
+ try:
+ value = type(value)
+ except ValueError:
+ echo(f"Error: Invalid input type. Expected {type.__name__}.", err=err)
+ continue
+
+ if confirmation_prompt:
+ confirm_text = "Repeat for confirmation: " if confirmation_prompt is True else confirmation_prompt
+ if value != prompt(confirm_text, type=type, value_proc=value_proc, hide_input=hide_input, err=err):
+ echo("Error: The two entered values do not match", err=err)
+ continue
+
+ return value
def confirm(text: str, default: t.Optional[bool]=False, abort: bool=False,
@@ -97,7 +136,35 @@ def confirm(text: str, default: t.Optional[bool]=False, abort: bool=False,
.. versionadded:: 4.0
Added the ``err`` parameter.
"""
- pass
+ prompt_text = text
+ if show_default:
+ if default is None:
+ prompt_text += " (y/n)"
+ elif default:
+ prompt_text += " [Y/n]"
+ else:
+ prompt_text += " [y/N]"
+ prompt_text += prompt_suffix
+
+ while True:
+ try:
+ value = prompt(prompt_text, err=err).lower().strip()
+ except Abort:
+ raise
+
+ if value in ('y', 'yes'):
+ rv = True
+ elif value in ('n', 'no'):
+ rv = False
+ elif value == '' and default is not None:
+ rv = default
+ else:
+ echo("Error: invalid input", err=err)
+ continue
+
+ if abort and not rv:
+ raise Abort()
+ return rv
def echo_via_pager(text_or_generator: t.Union[t.Iterable[str], t.Callable[[
@@ -113,7 +180,8 @@ def echo_via_pager(text_or_generator: t.Union[t.Iterable[str], t.Callable[[
:param color: controls if the pager supports ANSI colors or not. The
default is autodetection.
"""
- pass
+ from ._termui_impl import pager
+ pager(text_or_generator, color)
def progressbar(iterable: t.Optional[t.Iterable[V]]=None, length: t.
@@ -237,7 +305,24 @@ def progressbar(iterable: t.Optional[t.Iterable[V]]=None, length: t.
.. versionadded:: 2.0
"""
- pass
+ from ._termui_impl import ProgressBar
+ return ProgressBar(
+ iterable=iterable,
+ length=length,
+ show_eta=show_eta,
+ show_percent=show_percent,
+ show_pos=show_pos,
+ item_show_func=item_show_func,
+ fill_char=fill_char,
+ empty_char=empty_char,
+ bar_template=bar_template,
+ info_sep=info_sep,
+ file=file,
+ label=label,
+ width=width,
+ color=color,
+ update_min_steps=update_min_steps,
+ )
def clear() ->None:
@@ -247,7 +332,15 @@ def clear() ->None:
.. versionadded:: 2.0
"""
- pass
+ if not isatty(sys.stdout):
+ return
+ # If we're on Windows, use os.system()
+ if sys.platform.startswith('win'):
+ os.system('cls')
+ else:
+ # Otherwise, assume POSIX-like system
+ sys.stdout.write('\033[2J\033[1;1H')
+ sys.stdout.flush()
def style(text: t.Any, fg: t.Optional[t.Union[int, t.Tuple[int, int, int],
@@ -333,7 +426,47 @@ def style(text: t.Any, fg: t.Optional[t.Union[int, t.Tuple[int, int, int],
.. versionadded:: 2.0
"""
- pass
+ if not isinstance(text, str):
+ text = str(text)
+
+ bits = []
+
+ if fg:
+ try:
+ bits.append(f'\033[38;5;{_ansi_colors[fg]}m')
+ except KeyError:
+ if isinstance(fg, int):
+ bits.append(f'\033[38;5;{fg}m')
+ elif isinstance(fg, tuple):
+ bits.append(f'\033[38;2;{fg[0]};{fg[1]};{fg[2]}m')
+ if bg:
+ try:
+ bits.append(f'\033[48;5;{_ansi_colors[bg]}m')
+ except KeyError:
+ if isinstance(bg, int):
+ bits.append(f'\033[48;5;{bg}m')
+ elif isinstance(bg, tuple):
+ bits.append(f'\033[48;2;{bg[0]};{bg[1]};{bg[2]}m')
+ if bold is not None:
+ bits.append('\033[1m' if bold else '\033[22m')
+ if dim is not None:
+ bits.append('\033[2m' if dim else '\033[22m')
+ if underline is not None:
+ bits.append('\033[4m' if underline else '\033[24m')
+ if overline is not None:
+ bits.append('\033[53m' if overline else '\033[55m')
+ if italic is not None:
+ bits.append('\033[3m' if italic else '\033[23m')
+ if blink is not None:
+ bits.append('\033[5m' if blink else '\033[25m')
+ if reverse is not None:
+ bits.append('\033[7m' if reverse else '\033[27m')
+ if strikethrough is not None:
+ bits.append('\033[9m' if strikethrough else '\033[29m')
+ bits.append(text)
+ if reset:
+ bits.append(_ansi_reset_all)
+ return ''.join(bits)
def unstyle(text: str) ->str:
@@ -345,7 +478,7 @@ def unstyle(text: str) ->str:
:param text: the text to remove style information from.
"""
- pass
+ return strip_ansi(text)
def secho(message: t.Optional[t.Any]=None, file: t.Optional[t.IO[t.AnyStr]]
@@ -371,7 +504,14 @@ def secho(message: t.Optional[t.Any]=None, file: t.Optional[t.IO[t.AnyStr]]
.. versionadded:: 2.0
"""
- pass
+ if message is not None:
+ if isinstance(message, bytes):
+ echo(message, file, nl, err, color)
+ return
+ else:
+ message = style(message, **styles)
+
+ echo(message, file, nl, err, color)
def edit(text: t.Optional[t.AnyStr]=None, editor: t.Optional[str]=None, env:
@@ -405,7 +545,42 @@ def edit(text: t.Optional[t.AnyStr]=None, editor: t.Optional[str]=None, env:
provided text contents. It will not use a temporary
file as an indirection in that case.
"""
- pass
+ import tempfile
+ import subprocess
+
+ if filename is None:
+ if text is None:
+ raise UsageError("Either 'text' or 'filename' must be provided.")
+
+ with tempfile.NamedTemporaryFile(suffix=extension, delete=False) as f:
+ if isinstance(text, bytes):
+ f.write(text)
+ else:
+ f.write(text.encode('utf-8'))
+ filename = f.name
+
+ if editor is None:
+ editor = os.environ.get('EDITOR', 'vim')
+
+ if env is None:
+ env = {}
+
+ try:
+ c = subprocess.Popen([editor, filename], env=env)
+ exit_code = c.wait()
+ if exit_code != 0:
+ raise UsageError(f"{editor}: Editing failed!")
+
+ if require_save and os.path.getsize(filename) == 0:
+ return None
+
+ with open(filename, 'rb') as f:
+ rv = f.read()
+
+ return rv.decode('utf-8')
+ finally:
+ if filename and text is not None:
+ os.unlink(filename)
def launch(url: str, wait: bool=False, locate: bool=False) ->int:
@@ -432,7 +607,45 @@ def launch(url: str, wait: bool=False, locate: bool=False) ->int:
might have weird effects if the URL does not point to
the filesystem.
"""
- pass
+ import subprocess
+
+ def _launch_windows(url, wait=False, locate=False):
+ if locate:
+ cmd = 'explorer /select,"{}"'.format(os.path.normpath(url))
+ else:
+ cmd = 'start "" "{}"'.format(url)
+ exit_code = os.system(cmd)
+ if wait:
+ os.system('pause')
+ return exit_code
+
+ def _launch_macos(url, wait=False, locate=False):
+ cmd = ['open']
+ if locate:
+ cmd.append('-R')
+ cmd.append(url)
+ c = subprocess.Popen(cmd)
+ if wait:
+ return c.wait()
+ return 0
+
+ def _launch_linux(url, wait=False, locate=False):
+ cmd = ['xdg-open']
+ if locate:
+ cmd.append(os.path.dirname(url))
+ else:
+ cmd.append(url)
+ c = subprocess.Popen(cmd)
+ if wait:
+ return c.wait()
+ return 0
+
+ if sys.platform.startswith('win'):
+ return _launch_windows(url, wait, locate)
+ elif sys.platform == 'darwin':
+ return _launch_macos(url, wait, locate)
+ else:
+ return _launch_linux(url, wait, locate)
_getchar: t.Optional[t.Callable[[bool], str]] = None
@@ -458,7 +671,37 @@ def getchar(echo: bool=False) ->str:
:param echo: if set to `True`, the character read will also show up on
the terminal. The default is to not show it.
"""
- pass
+ global _getchar
+ if _getchar is None:
+ if sys.platform.startswith('win'):
+ import msvcrt
+
+ def _getchar_windows(echo):
+ char = msvcrt.getwch()
+ if echo:
+ msvcrt.putwch(char)
+ return char
+
+ _getchar = _getchar_windows
+ else:
+ import tty
+ import termios
+
+ def _getchar_unix(echo):
+ fd = sys.stdin.fileno()
+ old_settings = termios.tcgetattr(fd)
+ try:
+ tty.setraw(fd)
+ ch = sys.stdin.read(1)
+ finally:
+ termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
+ if echo:
+ sys.stdout.write(ch)
+ return ch
+
+ _getchar = _getchar_unix
+
+ return _getchar(echo)
def pause(info: t.Optional[str]=None, err: bool=False) ->None:
@@ -477,4 +720,19 @@ def pause(info: t.Optional[str]=None, err: bool=False) ->None:
:param err: if set to message goes to ``stderr`` instead of
``stdout``, the same as with echo.
"""
- pass
+ if info is None:
+ info = "Press any key to continue..."
+
+ if not isatty(sys.stdin):
+ return
+
+ try:
+ if info:
+ echo(info, nl=False, err=err)
+ try:
+ getchar()
+ except (KeyboardInterrupt, EOFError):
+ pass
+ finally:
+ if info:
+ echo(err=err)
diff --git a/src/click/testing.py b/src/click/testing.py
index 320d223..b588f84 100644
--- a/src/click/testing.py
+++ b/src/click/testing.py
@@ -59,17 +59,19 @@ class Result:
@property
def output(self) ->str:
"""The (standard) output as unicode string."""
- pass
+ return self.stdout
@property
def stdout(self) ->str:
"""The standard output as unicode string."""
- pass
+ return self.stdout_bytes.decode(self.runner.charset, 'replace')
@property
def stderr(self) ->str:
"""The standard error as unicode string."""
- pass
+ if self.stderr_bytes is None:
+ return None
+ return self.stderr_bytes.decode(self.runner.charset, 'replace')
def __repr__(self) ->str:
exc_str = repr(self.exception) if self.exception else 'okay'
@@ -108,12 +110,15 @@ class CliRunner:
for it. The default is the `name` attribute or ``"root"`` if not
set.
"""
- pass
+ return cli.name or "root"
def make_env(self, overrides: t.Optional[t.Mapping[str, t.Optional[str]
]]=None) ->t.Mapping[str, t.Optional[str]]:
"""Returns the environment overrides for invoking a script."""
- pass
+ env = dict(self.env)
+ if overrides:
+ env.update(overrides)
+ return env
@contextlib.contextmanager
def isolation(self, input: t.Optional[t.Union[str, bytes, t.IO[t.Any]]]
@@ -139,7 +144,71 @@ class CliRunner:
.. versionchanged:: 4.0
Added the ``color`` parameter.
"""
- pass
+ old_env = {}
+ try:
+ for key, value in self.make_env(env).items():
+ old_env[key] = os.environ.get(key)
+ if value is None:
+ os.environ.pop(key, None)
+ else:
+ os.environ[key] = value
+
+ old_stdin = sys.stdin
+ old_stdout = sys.stdout
+ old_stderr = sys.stderr
+ old_testing = termui.isatty
+ old_color = utils.should_strip_ansi
+
+ bytes_input = None
+ if input is not None:
+ if isinstance(input, str):
+ bytes_input = input.encode(self.charset)
+ elif isinstance(input, bytes):
+ bytes_input = input
+ elif hasattr(input, "read"):
+ bytes_input = input.read()
+ if isinstance(bytes_input, str):
+ bytes_input = bytes_input.encode(self.charset)
+ else:
+ raise TypeError("Invalid input type")
+ sys.stdin = io.BytesIO(bytes_input)
+ else:
+ sys.stdin = io.BytesIO(b"")
+
+ stdout = io.BytesIO()
+ stderr = io.BytesIO() if not self.mix_stderr else None
+
+ if self.echo_stdin:
+ input = EchoingStdin(input, stdout)
+
+ sys.stdout = _NamedTextIOWrapper(
+ stdout, "stdout", "w", encoding=self.charset, errors="backslashreplace"
+ )
+ if stderr is not None:
+ sys.stderr = _NamedTextIOWrapper(
+ stderr, "stderr", "w", encoding=self.charset, errors="backslashreplace"
+ )
+ else:
+ sys.stderr = sys.stdout
+
+ def is_testing():
+ return True
+
+ termui.isatty = is_testing
+ utils.should_strip_ansi = lambda *args, **kwargs: not color
+
+ yield stdout, stderr
+ finally:
+ for key, value in old_env.items():
+ if value is None:
+ os.environ.pop(key, None)
+ else:
+ os.environ[key] = value
+ sys.stdin = old_stdin
+ sys.stdout = old_stdout
+ sys.stderr = old_stderr
+ termui.isatty = old_testing
+ utils.should_strip_ansi = old_color
def invoke(self, cli: 'BaseCommand', args: t.Optional[t.Union[str, t.
Sequence[str]]]=None, input: t.Optional[t.Union[str, bytes, t.IO[t.
@@ -180,7 +249,59 @@ class CliRunner:
The result object has the ``exc_info`` attribute with the
traceback if available.
"""
- pass
+ exc_info = None
+ with self.isolation(input=input, env=env, color=color) as outstreams:
+ return_value = None
+ exception = None
+ exit_code = 0
+
+ if isinstance(args, str):
+ args = shlex.split(args)
+
+ try:
+ prog_name = extra.pop("prog_name", None)
+ if prog_name is None:
+ prog_name = self.get_default_prog_name(cli)
+
+ return_value = cli.main(
+ args=args or (),
+ prog_name=prog_name,
+ standalone_mode=False,
+ **extra
+ )
+ except SystemExit as e:
+ exc_info = sys.exc_info()
+ exit_code = e.code
+ if exit_code is None:
+ exit_code = 0
+
+ if exit_code != 0:
+ exception = e
+
+ if not isinstance(exit_code, int):
+ sys.stdout.write(str(exit_code))
+ sys.stdout.write("\n")
+ exit_code = 1
+
+ except Exception as e:
+ if not catch_exceptions:
+ raise
+ exception = e
+ exit_code = 1
+ exc_info = sys.exc_info()
+
+ stdout_bytes = outstreams[0].getvalue()
+ stderr_bytes = outstreams[1].getvalue() if outstreams[1] else None
+
+ return Result(
+ runner=self,
+ stdout_bytes=stdout_bytes,
+ stderr_bytes=stderr_bytes,
+ return_value=return_value,
+ exit_code=exit_code,
+ exception=exception,
+ exc_info=exc_info,
+ )
@contextlib.contextmanager
def isolated_filesystem(self, temp_dir: t.Optional[t.Union[str,
@@ -197,4 +318,20 @@ class CliRunner:
.. versionchanged:: 8.0
Added the ``temp_dir`` parameter.
"""
- pass
+ cwd = os.getcwd()
+ if temp_dir is not None:
+ temp_dir = os.path.abspath(temp_dir)
+ fs = tempfile.mkdtemp(dir=temp_dir)
+ os.chdir(fs)
+ else:
+ fs = tempfile.mkdtemp()
+ os.chdir(fs)
+ try:
+ yield fs
+ finally:
+ os.chdir(cwd)
+ if temp_dir is None:
+ try:
+ shutil.rmtree(fs)
+ except OSError:
+ pass
diff --git a/src/click/types.py b/src/click/types.py
index 4527388..f33a2a0 100644
--- a/src/click/types.py
+++ b/src/click/types.py
@@ -49,7 +49,10 @@ class ParamType:
.. versionadded:: 8.0
"""
- pass
+ return {
+ "name": self.name,
+ "param_type": self.__class__.__name__,
+ }
def __call__(self, value: t.Any, param: t.Optional['Parameter']=None,
ctx: t.Optional['Context']=None) ->t.Any:
@@ -58,7 +61,7 @@ class ParamType:
def get_metavar(self, param: 'Parameter') ->t.Optional[str]:
"""Returns the metavar default for this param if it provides one."""
- pass
+ return self.name.upper()
def get_missing_message(self, param: 'Parameter') ->t.Optional[str]:
"""Optionally might return extra information about a missing
@@ -66,7 +69,7 @@ class ParamType:
.. versionadded:: 2.0
"""
- pass
+ return f"Missing {self.name} value."
def convert(self, value: t.Any, param: t.Optional['Parameter'], ctx: t.
Optional['Context']) ->t.Any:
@@ -89,7 +92,7 @@ class ParamType:
:param ctx: The current context that arrived at this value. May
be ``None``.
"""
- pass
+ return value # Default implementation: no conversion
def split_envvar_value(self, rv: str) ->t.Sequence[str]:
"""Given a value from an environment variable this splits it up
@@ -99,12 +102,14 @@ class ParamType:
then leading and trailing whitespace is ignored. Otherwise, leading
and trailing splitters usually lead to empty items being included.
"""
- pass
+ if self.envvar_list_splitter is None:
+ return [rv.strip()]
+ return [x for x in rv.split(self.envvar_list_splitter) if x]
def fail(self, message: str, param: t.Optional['Parameter']=None, ctx:
t.Optional['Context']=None) ->'t.NoReturn':
"""Helper method to fail with an invalid value message."""
- pass
+ raise BadParameter(message, ctx=ctx, param=param)
def shell_complete(self, ctx: 'Context', param: 'Parameter', incomplete:
str) ->t.List['CompletionItem']:
@@ -120,7 +125,7 @@ class ParamType:
.. versionadded:: 8.0
"""
- pass
+ return [] # Default implementation: no completions
class CompositeParamType(ParamType):
diff --git a/src/click/utils.py b/src/click/utils.py
index 0b2575c..7602f67 100644
--- a/src/click/utils.py
+++ b/src/click/utils.py
@@ -24,17 +24,35 @@ R = t.TypeVar('R')
def safecall(func: 't.Callable[P, R]') ->'t.Callable[P, t.Optional[R]]':
"""Wraps a function so that it swallows exceptions."""
- pass
+ def wrapper(*args: P.args, **kwargs: P.kwargs) -> t.Optional[R]:
+ try:
+ return func(*args, **kwargs)
+ except Exception:
+ return None
+ return wrapper
def make_str(value: t.Any) ->str:
"""Converts a value into a valid string."""
- pass
+ if isinstance(value, bytes):
+ try:
+ return value.decode('utf-8')
+ except UnicodeDecodeError:
+ return value.decode('utf-8', 'replace')
+ return str(value)
def make_default_short_help(help: str, max_length: int=45) ->str:
"""Returns a condensed version of help string."""
- pass
+ words = help.split()
+ total_length = 0
+ result = []
+ for word in words:
+ if total_length + len(word) + 1 > max_length:
+ break
+ result.append(word)
+ total_length += len(word) + 1
+ return ' '.join(result) + ('...' if len(result) < len(words) else '')
class LazyFile:
@@ -76,17 +94,35 @@ class LazyFile:
a :exc:`FileError`. Not handling this error will produce an error
that Click shows.
"""
- pass
+ if self._f is not None:
+ return self._f
+ try:
+ if self.atomic:
+ import tempfile
+ f = tempfile.NamedTemporaryFile(
+ mode=self.mode, encoding=self.encoding,
+ errors=self.errors, delete=False
+ )
+ self._f = f
+ else:
+ self._f = open(self.name, self.mode, encoding=self.encoding, errors=self.errors)
+ return self._f
+ except (IOError, OSError) as e:
+ from click import FileError
+ raise FileError(self.name, hint=str(e))
def close(self) ->None:
"""Closes the underlying file, no matter what."""
- pass
+ if self._f is not None:
+ self._f.close()
+ self._f = None
def close_intelligently(self) ->None:
"""This function only closes the file if it was opened by the lazy
file wrapper. For instance this will never close stdin.
"""
- pass
+ if self.should_close:
+ self.close()
def __enter__(self) ->'LazyFile':
return self
@@ -164,7 +200,30 @@ def echo(message: t.Optional[t.Any]=None, file: t.Optional[t.IO[t.Any]]=
.. versionchanged:: 2.0
Support colors on Windows if colorama is installed.
"""
- pass
+ if file is None:
+ file = _default_text_stderr() if err else _default_text_stdout()
+
+ if message is not None:
+ if not isinstance(message, (str, bytes, bytearray)):
+ message = str(message)
+
+ if isinstance(file, binary_streams):
+ if isinstance(message, str):
+ message = message.encode(file.encoding or 'utf-8', 'replace')
+ elif isinstance(message, (bytes, bytearray)):
+ message = message.decode('utf-8', 'replace')
+
+ if color is None:
+ color = resolve_color_default()
+
+ if should_strip_ansi(file, color):
+ message = strip_ansi(message)
+
+ if nl:
+ message = message + '\n'
+
+ file.write(message)
+ file.flush()
def get_binary_stream(name: "te.Literal['stdin', 'stdout', 'stderr']"