back to Claude Sonnet 3.5 - Fill-in summary
Claude Sonnet 3.5 - Fill-in: python-prompt-toolkit
Failed to run pytests for test tests
Pytest collection failure.
Patch diff
diff --git a/src/prompt_toolkit/application/application.py b/src/prompt_toolkit/application/application.py
index a630e551..88ef723e 100644
--- a/src/prompt_toolkit/application/application.py
+++ b/src/prompt_toolkit/application/application.py
@@ -223,7 +223,16 @@ class Application(Generic[_AppResult]):
Create a `Style` object that merges the default UI style, the default
pygments style, and the custom user style.
"""
- pass
+ styles = []
+ styles.append(default_ui_style())
+
+ if include_default_pygments_style():
+ styles.append(default_pygments_style())
+
+ if self.style:
+ styles.append(self.style)
+
+ return merge_styles(styles)
@property
def color_depth(self) ->ColorDepth:
@@ -239,7 +248,12 @@ class Application(Generic[_AppResult]):
created using `output.defaults.create_output`, then this value is
coming from the $PROMPT_TOOLKIT_COLOR_DEPTH environment variable.
"""
- pass
+ if callable(self._color_depth):
+ value = self._color_depth()
+ if value is not None:
+ return value
+
+ return self.output.get_default_color_depth()
@property
def current_buffer(self) ->Buffer:
@@ -250,7 +264,7 @@ class Application(Generic[_AppResult]):
has the focus. In this case, it's really not practical to check for
`None` values or catch exceptions every time.)
"""
- pass
+ return self.layout.current_control.buffer or Buffer()
@property
def current_search_state(self) ->SearchState:
@@ -258,19 +272,42 @@ class Application(Generic[_AppResult]):
Return the current :class:`.SearchState`. (The one for the focused
:class:`.BufferControl`.)
"""
- pass
+ control = self.layout.current_control
+ if isinstance(control, BufferControl):
+ return control.search_state
+ return SearchState()
def reset(self) ->None:
"""
Reset everything, for reading the next input.
"""
- pass
+ self.layout.reset()
+ self.vi_state.reset()
+ self.emacs_state.reset()
+
+ self.renderer.reset()
+
+ self.exit_style = ''
+
+ self.background_tasks = []
+
+ self.on_reset.fire()
def invalidate(self) ->None:
"""
Thread safe way of sending a repaint trigger to the input event loop.
"""
- pass
+ if self.loop is None:
+ return
+
+ if not self._invalidated:
+ self._invalidated = True
+
+ def redraw() ->None:
+ self._invalidated = False
+ self._redraw()
+
+ call_soon_threadsafe(redraw, loop=self.loop)
@property
def invalidated(self) ->bool:
diff --git a/src/prompt_toolkit/application/current.py b/src/prompt_toolkit/application/current.py
index 74edbaba..8c031f64 100644
--- a/src/prompt_toolkit/application/current.py
+++ b/src/prompt_toolkit/application/current.py
@@ -9,6 +9,9 @@ if TYPE_CHECKING:
__all__ = ['AppSession', 'get_app_session', 'get_app', 'get_app_or_none',
'set_app', 'create_app_session', 'create_app_session_from_tty']
+def get_app_session() -> AppSession:
+ return _current_app_session.get()
+
class AppSession:
"""
@@ -60,7 +63,11 @@ def get_app() ->Application[Any]:
(For applications like pymux where we can have more than one `Application`,
we'll use a work-around to handle that.)
"""
- pass
+ app = get_app_session().app
+ if app is None:
+ from .dummy import DummyApplication
+ return DummyApplication()
+ return app
def get_app_or_none() ->(Application[Any] | None):
@@ -68,7 +75,7 @@ def get_app_or_none() ->(Application[Any] | None):
Get the current active (running) Application, or return `None` if no
application is running.
"""
- pass
+ return get_app_session().app
@contextmanager
@@ -83,7 +90,13 @@ def set_app(app: Application[Any]) ->Generator[None, None, None]:
the case, use `contextvars.copy_context()`, or use `Application.context` to
run it in the appropriate context.
"""
- pass
+ session = get_app_session()
+ previous_app = session.app
+ session.app = app
+ try:
+ yield
+ finally:
+ session.app = previous_app
@contextmanager
@@ -95,7 +108,12 @@ def create_app_session(input: (Input | None)=None, output: (Output | None)=None
This is useful if there can be multiple individual `AppSession`s going on.
Like in the case of an Telnet/SSH server.
"""
- pass
+ session = AppSession(input=input, output=output)
+ token = _current_app_session.set(session)
+ try:
+ yield session
+ finally:
+ _current_app_session.reset(token)
@contextmanager
@@ -114,4 +132,11 @@ def create_app_session_from_tty() ->Generator[AppSession, None, None]:
with create_app_session_from_tty():
prompt('>')
"""
- pass
+ from prompt_toolkit.input.defaults import create_input
+ from prompt_toolkit.output.defaults import create_output
+
+ input = create_input(always_prefer_tty=True)
+ output = create_output(always_prefer_tty=True)
+
+ with create_app_session(input=input, output=output) as session:
+ yield session
diff --git a/src/prompt_toolkit/application/run_in_terminal.py b/src/prompt_toolkit/application/run_in_terminal.py
index 2d3682ca..e80ebc9f 100644
--- a/src/prompt_toolkit/application/run_in_terminal.py
+++ b/src/prompt_toolkit/application/run_in_terminal.py
@@ -34,12 +34,22 @@ def run_in_terminal(func: Callable[[], _T], render_cli_done: bool=False,
:returns: A `Future`.
"""
- pass
+ async def run():
+ app = get_app_or_none()
+ if app is None:
+ return await run_in_executor_with_context(func) if in_executor else func()
+
+ async with in_terminal(render_cli_done):
+ if in_executor:
+ return await run_in_executor_with_context(func)
+ else:
+ return func()
+
+ return ensure_future(run())
@asynccontextmanager
-async def in_terminal(render_cli_done: bool=False) ->AsyncGenerator[None, None
- ]:
+async def in_terminal(render_cli_done: bool=False) ->AsyncGenerator[None, None]:
"""
Asynchronous context manager that suspends the current application and runs
the body in the terminal.
@@ -51,4 +61,23 @@ async def in_terminal(render_cli_done: bool=False) ->AsyncGenerator[None, None
call_some_function()
await call_some_async_function()
"""
- pass
+ app = get_app_or_none()
+ if app is None:
+ yield
+ return
+
+ if render_cli_done:
+ app.pre_run_callables.append(app.renderer.erase)
+ else:
+ await app.run_system_command(lambda: None)
+
+ app.output.flush()
+ app._running_in_terminal = True
+
+ try:
+ yield
+ finally:
+ app._running_in_terminal = False
+ app.renderer.reset()
+ app._request_absolute_cursor_position()
+ app._redraw()
diff --git a/src/prompt_toolkit/auto_suggest.py b/src/prompt_toolkit/auto_suggest.py
index 2e73a69c..8efb440c 100644
--- a/src/prompt_toolkit/auto_suggest.py
+++ b/src/prompt_toolkit/auto_suggest.py
@@ -87,19 +87,32 @@ class ThreadedAutoSuggest(AutoSuggest):
"""
Run the `get_suggestion` function in a thread.
"""
- pass
+ return await run_in_executor_with_context(
+ lambda: self.auto_suggest.get_suggestion(buff, document)
+ )
class DummyAutoSuggest(AutoSuggest):
"""
AutoSuggest class that doesn't return any suggestion.
"""
+ def get_suggestion(self, buffer: Buffer, document: Document) -> None:
+ return None
class AutoSuggestFromHistory(AutoSuggest):
"""
Give suggestions based on the lines in the history.
"""
+ def get_suggestion(self, buffer: Buffer, document: Document) -> Suggestion | None:
+ history = buffer.history
+ text = document.text.lstrip()
+
+ for string in reversed(list(history.get_strings())):
+ for line in reversed(string.splitlines()):
+ if line.startswith(text) and line != text:
+ return Suggestion(line[len(text):])
+ return None
class ConditionalAutoSuggest(AutoSuggest):
@@ -112,6 +125,11 @@ class ConditionalAutoSuggest(AutoSuggest):
self.auto_suggest = auto_suggest
self.filter = to_filter(filter)
+ def get_suggestion(self, buffer: Buffer, document: Document) -> Suggestion | None:
+ if self.filter():
+ return self.auto_suggest.get_suggestion(buffer, document)
+ return None
+
class DynamicAutoSuggest(AutoSuggest):
"""
@@ -123,3 +141,9 @@ class DynamicAutoSuggest(AutoSuggest):
def __init__(self, get_auto_suggest: Callable[[], AutoSuggest | None]
) ->None:
self.get_auto_suggest = get_auto_suggest
+
+ def get_suggestion(self, buffer: Buffer, document: Document) -> Suggestion | None:
+ auto_suggest = self.get_auto_suggest()
+ if auto_suggest is not None:
+ return auto_suggest.get_suggestion(buffer, document)
+ return None
diff --git a/src/prompt_toolkit/buffer.py b/src/prompt_toolkit/buffer.py
index b2382429..9cf234d1 100644
--- a/src/prompt_toolkit/buffer.py
+++ b/src/prompt_toolkit/buffer.py
@@ -226,7 +226,15 @@ class Buffer:
"""
:param append_to_history: Append current input to history first.
"""
- pass
+ if append_to_history:
+ self.append_to_history()
+
+ if document is None:
+ document = Document()
+
+ self.document = document
+ self._undo_stack = []
+ self._redo_stack = []
def load_history_if_not_yet_loaded(self) ->None:
"""
@@ -249,15 +257,22 @@ class Buffer:
thread, but history loading is the only place where it matters, and
this solves it.
"""
- pass
+ if self._load_history_task is None:
+ async def load_history():
+ await self.history.load()
+ self._load_history_task = asyncio.create_task(load_history())
def _set_text(self, value: str) ->bool:
"""set text at current working_index. Return whether it changed."""
- pass
+ original_value = self.text
+ self.text = value
+ return original_value != value
def _set_cursor_position(self, value: int) ->bool:
"""Set cursor position. Return whether it changed."""
- pass
+ original_position = self.cursor_position
+ self.cursor_position = max(0, min(value, len(self.text)))
+ return original_position != self.cursor_position
@text.setter
def text(self, value: str) ->None:
@@ -266,14 +281,17 @@ class Buffer:
valid for this text. text/cursor_position should be consistent at any time,
otherwise set a Document instead.)
"""
- pass
+ if self._set_text(value):
+ self.cursor_position = min(self.cursor_position, len(value))
+ self.on_text_changed.fire()
@cursor_position.setter
def cursor_position(self, value: int) ->None:
"""
Setting cursor position.
"""
- pass
+ if self._set_cursor_position(value):
+ self.on_cursor_position_changed.fire()
@property
def document(self) ->Document:
@@ -281,7 +299,10 @@ class Buffer:
Return :class:`~prompt_toolkit.document.Document` instance from the
current text, cursor position and selection state.
"""
- pass
+ return self._document_cache.get(
+ (self.text, self.cursor_position, self.selection_state),
+ lambda: Document(self.text, self.cursor_position, self.selection_state)
+ )
@document.setter
def document(self, value: Document) ->None:
@@ -291,7 +312,7 @@ class Buffer:
This will set both the text and cursor position at the same time, but
atomically. (Change events will be triggered only after both have been set.)
"""
- pass
+ self.set_document(value)
def set_document(self, value: Document, bypass_readonly: bool=False
) ->None:
@@ -312,21 +333,32 @@ class Buffer:
you expect, and there won't be a stack trace. Use try/finally
around this function if you need some cleanup code.
"""
- pass
+ if not bypass_readonly and self.read_only():
+ raise EditReadOnlyBuffer()
+
+ text_changed = self._set_text(value.text)
+ cursor_position_changed = self._set_cursor_position(value.cursor_position)
+
+ if text_changed:
+ self.on_text_changed.fire()
+ if cursor_position_changed:
+ self.on_cursor_position_changed.fire()
@property
def is_returnable(self) ->bool:
"""
True when there is something handling accept.
"""
- pass
+ return self.accept_handler is not None
def save_to_undo_stack(self, clear_redo_stack: bool=True) ->None:
"""
Safe current state (input text and cursor position), so that we can
restore it by calling undo.
"""
- pass
+ self._undo_stack.append((self.text, self.cursor_position))
+ if clear_redo_stack:
+ self._redo_stack = []
def transform_lines(self, line_index_iterator: Iterable[int],
transform_callback: Callable[[str], str]) ->str:
@@ -345,7 +377,16 @@ class Buffer:
:returns: The new text.
"""
- pass
+ lines = self.text.splitlines(True)
+ new_lines = []
+
+ for index, original_line in enumerate(lines):
+ if index in line_index_iterator:
+ new_lines.append(transform_callback(original_line))
+ else:
+ new_lines.append(original_line)
+
+ return ''.join(new_lines)
def transform_current_line(self, transform_callback: Callable[[str], str]
) ->None:
diff --git a/src/prompt_toolkit/cache.py b/src/prompt_toolkit/cache.py
index bfff0829..03b8455a 100644
--- a/src/prompt_toolkit/cache.py
+++ b/src/prompt_toolkit/cache.py
@@ -27,11 +27,23 @@ class SimpleCache(Generic[_T, _U]):
If not found, call `getter_func` to resolve it, and put that on the top
of the cache instead.
"""
- pass
+ if key in self._data:
+ return self._data[key]
+
+ value = getter_func()
+ self._data[key] = value
+ self._keys.append(key)
+
+ if len(self._keys) > self.maxsize:
+ oldest_key = self._keys.popleft()
+ del self._data[oldest_key]
+
+ return value
def clear(self) ->None:
"""Clear cache."""
- pass
+ self._data.clear()
+ self._keys.clear()
_K = TypeVar('_K', bound=Tuple[Hashable, ...])
@@ -75,4 +87,14 @@ def memoized(maxsize: int=1024) ->Callable[[_F], _F]:
"""
Memoization decorator for immutable classes and pure functions.
"""
- pass
+ def decorator(func: _F) ->_F:
+ cache = SimpleCache(maxsize)
+
+ @wraps(func)
+ def wrapper(*args: Any, **kwargs: Any) ->Any:
+ key = (args, frozenset(kwargs.items()))
+ return cache.get(cast(_T, key), lambda: func(*args, **kwargs))
+
+ return cast(_F, wrapper)
+
+ return decorator
diff --git a/src/prompt_toolkit/clipboard/base.py b/src/prompt_toolkit/clipboard/base.py
index e09febd7..c92e9582 100644
--- a/src/prompt_toolkit/clipboard/base.py
+++ b/src/prompt_toolkit/clipboard/base.py
@@ -42,13 +42,14 @@ class Clipboard(metaclass=ABCMeta):
"""
Shortcut for setting plain text on clipboard.
"""
- pass
+ self.set_data(ClipboardData(text))
def rotate(self) ->None:
"""
For Emacs mode, rotate the kill ring.
"""
- pass
+ # This is a base implementation, which doesn't do anything.
+ # Subclasses can override this method if they support rotation.
@abstractmethod
def get_data(self) ->ClipboardData:
@@ -62,6 +63,19 @@ class DummyClipboard(Clipboard):
"""
Clipboard implementation that doesn't remember anything.
"""
+ def set_data(self, data: ClipboardData) ->None:
+ """
+ Set data to the clipboard.
+
+ :param data: :class:`~.ClipboardData` instance.
+ """
+ pass # Dummy implementation, doesn't store anything
+
+ def get_data(self) ->ClipboardData:
+ """
+ Return clipboard data.
+ """
+ return ClipboardData() # Always return empty clipboard data
class DynamicClipboard(Clipboard):
@@ -73,3 +87,38 @@ class DynamicClipboard(Clipboard):
def __init__(self, get_clipboard: Callable[[], Clipboard | None]) ->None:
self.get_clipboard = get_clipboard
+
+ def set_data(self, data: ClipboardData) ->None:
+ """
+ Set data to the clipboard.
+
+ :param data: :class:`~.ClipboardData` instance.
+ """
+ clipboard = self.get_clipboard()
+ if clipboard:
+ clipboard.set_data(data)
+
+ def get_data(self) ->ClipboardData:
+ """
+ Return clipboard data.
+ """
+ clipboard = self.get_clipboard()
+ if clipboard:
+ return clipboard.get_data()
+ return ClipboardData() # Return empty clipboard data if no clipboard is available
+
+ def set_text(self, text: str) ->None:
+ """
+ Shortcut for setting plain text on clipboard.
+ """
+ clipboard = self.get_clipboard()
+ if clipboard:
+ clipboard.set_text(text)
+
+ def rotate(self) ->None:
+ """
+ For Emacs mode, rotate the kill ring.
+ """
+ clipboard = self.get_clipboard()
+ if clipboard:
+ clipboard.rotate()
diff --git a/src/prompt_toolkit/completion/base.py b/src/prompt_toolkit/completion/base.py
index 62214619..66210e72 100644
--- a/src/prompt_toolkit/completion/base.py
+++ b/src/prompt_toolkit/completion/base.py
@@ -65,17 +65,20 @@ class Completion:
@property
def display_text(self) ->str:
"""The 'display' field as plain text."""
- pass
+ return ''.join(text for _, text in self.display)
@property
def display_meta(self) ->StyleAndTextTuples:
"""Return meta-text. (This is lazy when using a callable)."""
- pass
+ from prompt_toolkit.formatted_text import to_formatted_text
+ if callable(self._display_meta):
+ return to_formatted_text(self._display_meta())
+ return to_formatted_text(self._display_meta)
@property
def display_meta_text(self) ->str:
"""The 'meta' field as plain text."""
- pass
+ return ''.join(text for _, text in self.display_meta)
def new_completion_from_position(self, position: int) ->Completion:
"""
@@ -84,7 +87,14 @@ class Completion:
it needs to have a list of new completions after inserting the common
prefix.
"""
- pass
+ return Completion(
+ text=self.text[position:],
+ start_position=self.start_position + position,
+ display=self.display,
+ display_meta=self._display_meta,
+ style=self.style,
+ selected_style=self.selected_style
+ )
class CompleteEvent:
@@ -143,7 +153,8 @@ class Completer(metaclass=ABCMeta):
Asynchronous generator of :class:`.Completion` objects.
"""
- pass
+ async for completion in generator_to_async_generator(lambda: self.get_completions(document, complete_event)):
+ yield completion
class ThreadedCompleter(Completer):
@@ -165,7 +176,12 @@ class ThreadedCompleter(Completer):
"""
Asynchronous generator of completions.
"""
- pass
+ import asyncio
+ loop = asyncio.get_event_loop()
+ return await loop.run_in_executor(
+ None,
+ lambda: generator_to_async_generator(lambda: self.completer.get_completions(document, complete_event))
+ )
def __repr__(self) ->str:
return f'ThreadedCompleter({self.completer!r})'
@@ -233,7 +249,12 @@ def merge_completers(completers: Sequence[Completer], deduplicate: bool=False
so that completions that would result in the same text will be
deduplicated.
"""
- pass
+ from prompt_toolkit.completion.deduplicate import DeduplicateCompleter
+
+ result = _MergedCompleter(completers)
+ if deduplicate:
+ return DeduplicateCompleter(result)
+ return result
def get_common_complete_suffix(document: Document, completions: Sequence[
@@ -241,4 +262,19 @@ def get_common_complete_suffix(document: Document, completions: Sequence[
"""
Return the common prefix for all completions.
"""
- pass
+ if not completions:
+ return ''
+
+ # Get all suffixes.
+ suffixes = [c.text for c in completions]
+
+ # Compute common suffix.
+ common_suffix = suffixes[0]
+ for s in suffixes[1:]:
+ common_suffix = common_suffix[:len(s)]
+ for i in range(len(common_suffix)):
+ if common_suffix[i] != s[i]:
+ common_suffix = common_suffix[:i]
+ break
+
+ return common_suffix
diff --git a/src/prompt_toolkit/completion/fuzzy_completer.py b/src/prompt_toolkit/completion/fuzzy_completer.py
index fc4b7e20..d4505a56 100644
--- a/src/prompt_toolkit/completion/fuzzy_completer.py
+++ b/src/prompt_toolkit/completion/fuzzy_completer.py
@@ -52,7 +52,24 @@ class FuzzyCompleter(Completer):
"""
Generate formatted text for the display label.
"""
- pass
+ match_start = fuzzy_match.start_pos
+ match_end = match_start + fuzzy_match.match_length
+ word = fuzzy_match.completion.text
+
+ result: StyleAndTextTuples = []
+
+ # Add characters before match
+ if match_start > 0:
+ result.append(('', word[:match_start]))
+
+ # Add matched characters
+ result.append(('class:fuzzy-match', word[match_start:match_end]))
+
+ # Add characters after match
+ if match_end < len(word):
+ result.append(('', word[match_end:]))
+
+ return result
class FuzzyWordCompleter(Completer):
diff --git a/src/prompt_toolkit/completion/nested.py b/src/prompt_toolkit/completion/nested.py
index 130f3429..fed3c5b3 100644
--- a/src/prompt_toolkit/completion/nested.py
+++ b/src/prompt_toolkit/completion/nested.py
@@ -57,4 +57,16 @@ class NestedCompleter(Completer):
Values in this data structure can be a completers as well.
"""
- pass
+ options: dict[str, Completer | None] = {}
+
+ for key, value in data.items():
+ if isinstance(value, Mapping):
+ options[key] = cls.from_nested_dict(value)
+ elif isinstance(value, Set):
+ options[key] = WordCompleter(list(value))
+ elif isinstance(value, Completer):
+ options[key] = value
+ else:
+ options[key] = None
+
+ return cls(options)
diff --git a/src/prompt_toolkit/contrib/regular_languages/compiler.py b/src/prompt_toolkit/contrib/regular_languages/compiler.py
index b154d948..5e0d9208 100644
--- a/src/prompt_toolkit/contrib/regular_languages/compiler.py
+++ b/src/prompt_toolkit/contrib/regular_languages/compiler.py
@@ -87,13 +87,19 @@ class _CompiledGrammar:
"""
Escape `value` to fit in the place of this variable into the grammar.
"""
- pass
+ escape_func = self.escape_funcs.get(varname)
+ if escape_func:
+ return escape_func(value)
+ return re.escape(value)
def unescape(self, varname: str, value: str) ->str:
"""
Unescape `value`.
"""
- pass
+ unescape_func = self.unescape_funcs.get(varname)
+ if unescape_func:
+ return unescape_func(value)
+ return value
@classmethod
def _transform(cls, root_node: Node, create_group_func: Callable[[
@@ -105,7 +111,20 @@ class _CompiledGrammar:
:param create_group_func: A callable which takes a `Node` and returns the next
free name for this node.
"""
- pass
+ if isinstance(root_node, Variable):
+ return f'(?P<{create_group_func(root_node)}>.+?)'
+ elif isinstance(root_node, Regex):
+ return root_node.regex
+ elif isinstance(root_node, Repeat):
+ return f'({cls._transform(root_node.childnode, create_group_func)}){root_node.cardinality}'
+ elif isinstance(root_node, Lookahead):
+ return f'(?={cls._transform(root_node.childnode, create_group_func)})'
+ elif isinstance(root_node, NodeSequence):
+ return ''.join(cls._transform(c, create_group_func) for c in root_node.children)
+ elif isinstance(root_node, AnyNode):
+ return f"({'|'.join(cls._transform(c, create_group_func) for c in root_node.children)})"
+ else:
+ raise ValueError(f"Invalid node type: {type(root_node)}")
@classmethod
def _transform_prefix(cls, root_node: Node, create_group_func: Callable
@@ -131,7 +150,28 @@ class _CompiledGrammar:
:param create_group_func: A callable which takes a `Node` and returns the next
free name for this node.
"""
- pass
+ if isinstance(root_node, Variable):
+ yield f'^(?P<{create_group_func(root_node)}>.+?)$'
+ elif isinstance(root_node, Regex):
+ yield f'^{root_node.regex}$'
+ elif isinstance(root_node, Repeat):
+ child_patterns = list(cls._transform_prefix(root_node.childnode, create_group_func))
+ for i in range(1, root_node.max_repeat + 1):
+ yield from (f'^{p * i}$' for p in child_patterns)
+ elif isinstance(root_node, Lookahead):
+ yield from (f'^(?={p})$' for p in cls._transform_prefix(root_node.childnode, create_group_func))
+ elif isinstance(root_node, NodeSequence):
+ current = '^'
+ for child in root_node.children:
+ child_patterns = list(cls._transform_prefix(child, create_group_func))
+ for pattern in child_patterns:
+ yield f'{current}{pattern[1:-1]}$'
+ current += cls._transform(child, create_group_func)
+ elif isinstance(root_node, AnyNode):
+ for child in root_node.children:
+ yield from cls._transform_prefix(child, create_group_func)
+ else:
+ raise ValueError(f"Invalid node type: {type(root_node)}")
def match(self, string: str) ->(Match | None):
"""
@@ -140,7 +180,10 @@ class _CompiledGrammar:
:param string: The input string.
"""
- pass
+ m = self._re.match(string)
+ if m:
+ return Match(string, [(self._re, m)], self._group_names_to_nodes, self.unescape_funcs)
+ return None
def match_prefix(self, string: str) ->(Match | None):
"""
@@ -151,7 +194,19 @@ class _CompiledGrammar:
:param string: The input string.
"""
- pass
+ matches = []
+ for re in self._re_prefix_with_trailing_input:
+ m = re.match(string)
+ if m:
+ matches.append((re, m))
+
+ if not matches:
+ # If no match, consider all input as trailing
+ dummy_re = re.compile(f'^(?P<{_INVALID_TRAILING_INPUT}>.*)$')
+ m = dummy_re.match(string)
+ matches.append((dummy_re, m))
+
+ return Match(string, matches, self._group_names_to_nodes, self.unescape_funcs)
class Match:
@@ -173,19 +228,29 @@ class Match:
"""
Return a list of (varname, reg) tuples.
"""
- pass
+ result = []
+ for re, match in self._re_matches:
+ for group_name, node_name in self._group_names_to_nodes.items():
+ if group_name in match.groupdict():
+ start, end = match.span(group_name)
+ result.append((node_name, (start, end)))
+ return result
def _nodes_to_values(self) ->list[tuple[str, str, tuple[int, int]]]:
"""
Returns list of (Node, string_value) tuples.
"""
- pass
+ result = []
+ for varname, reg in self._nodes_to_regs():
+ value = self.string[reg[0]:reg[1]]
+ result.append((varname, value, reg))
+ return result
def variables(self) ->Variables:
"""
Returns :class:`Variables` instance.
"""
- pass
+ return Variables(self._nodes_to_values())
def trailing_input(self) ->(MatchVariable | None):
"""
@@ -193,14 +258,21 @@ class Match:
"Trailing input" is input at the end that does not match the grammar anymore, but
when this is removed from the end of the input, the input would be a valid string.
"""
- pass
+ for re, match in self._re_matches:
+ if _INVALID_TRAILING_INPUT in match.groupdict():
+ start, end = match.span(_INVALID_TRAILING_INPUT)
+ return MatchVariable(_INVALID_TRAILING_INPUT, self.string[start:end], (start, end))
+ return None
def end_nodes(self) ->Iterable[MatchVariable]:
"""
Yields `MatchVariable` instances for all the nodes having their end
position at the end of the input string.
"""
- pass
+ input_len = len(self.string)
+ for varname, value, (start, end) in self._nodes_to_values():
+ if end == input_len:
+ yield MatchVariable(varname, value, (start, end))
class Variables:
@@ -251,7 +323,9 @@ def compile(expression: str, escape_funcs: (EscapeFuncDict | None)=None,
Compile grammar (given as regex string), returning a `CompiledGrammar`
instance.
"""
- pass
+ tokens = list(tokenize_regex(expression))
+ root_node = parse_regex(tokens)
+ return _compile_from_parse_tree(root_node, escape_funcs, unescape_funcs)
def _compile_from_parse_tree(root_node: Node, escape_funcs: (EscapeFuncDict |
@@ -261,4 +335,4 @@ def _compile_from_parse_tree(root_node: Node, escape_funcs: (EscapeFuncDict |
Compile grammar (given as parse tree), returning a `CompiledGrammar`
instance.
"""
- pass
+ return _CompiledGrammar(root_node, escape_funcs, unescape_funcs)
diff --git a/src/prompt_toolkit/contrib/regular_languages/completion.py b/src/prompt_toolkit/contrib/regular_languages/completion.py
index cf287636..a8050055 100644
--- a/src/prompt_toolkit/contrib/regular_languages/completion.py
+++ b/src/prompt_toolkit/contrib/regular_languages/completion.py
@@ -31,7 +31,12 @@ class GrammarCompleter(Completer):
(The completer assumes that the cursor position was at the end of the
input string.)
"""
- pass
+ for var_name, value in match.variables():
+ if var_name in self.completers:
+ completer = self.completers[var_name]
+ # Create a new document for the completer
+ document = Document(value, cursor_position=len(value))
+ yield from completer.get_completions(document, complete_event)
def _remove_duplicates(self, items: Iterable[Completion]) ->list[Completion
]:
@@ -40,4 +45,10 @@ class GrammarCompleter(Completer):
(Sometimes we have duplicates, because the there several matches of the
same grammar, each yielding similar completions.)
"""
- pass
+ seen = set()
+ result = []
+ for item in items:
+ if item.text not in seen:
+ seen.add(item.text)
+ result.append(item)
+ return result
diff --git a/src/prompt_toolkit/contrib/regular_languages/regex_parser.py b/src/prompt_toolkit/contrib/regular_languages/regex_parser.py
index ba97c928..bad83eb1 100644
--- a/src/prompt_toolkit/contrib/regular_languages/regex_parser.py
+++ b/src/prompt_toolkit/contrib/regular_languages/regex_parser.py
@@ -131,11 +131,82 @@ def tokenize_regex(input: str) ->list[str]:
:param input: string, representing a regular expression.
:returns: List of tokens.
"""
- pass
+ tokens = []
+ i = 0
+ while i < len(input):
+ if input[i].isspace():
+ i += 1
+ continue
+ if input[i] == '#':
+ while i < len(input) and input[i] != '\n':
+ i += 1
+ continue
+ if input[i] in '()|*+?{}[]':
+ tokens.append(input[i])
+ i += 1
+ elif input[i] == '\\':
+ if i + 1 < len(input):
+ tokens.append(input[i:i+2])
+ i += 2
+ else:
+ tokens.append(input[i])
+ i += 1
+ else:
+ start = i
+ while i < len(input) and input[i] not in '()|*+?{}[]\\' and not input[i].isspace():
+ i += 1
+ tokens.append(input[start:i])
+ return tokens
def parse_regex(regex_tokens: list[str]) ->Node:
"""
Takes a list of tokens from the tokenizer, and returns a parse tree.
"""
- pass
+ def parse_sequence():
+ sequence = []
+ while tokens and tokens[0] not in ')|':
+ sequence.append(parse_atom())
+ return NodeSequence(sequence) if len(sequence) > 1 else sequence[0] if sequence else None
+
+ def parse_atom():
+ if not tokens:
+ return None
+ token = tokens.pop(0)
+ if token == '(':
+ node = parse_sequence()
+ if tokens and tokens[0] == ')':
+ tokens.pop(0)
+ return node
+ elif token == '[':
+ content = ''
+ while tokens and tokens[0] != ']':
+ content += tokens.pop(0)
+ if tokens and tokens[0] == ']':
+ tokens.pop(0)
+ return Regex(f'[{content}]')
+ elif token in '*+?':
+ return Repeat(parse_atom(), 0 if token in '*?' else 1, None if token in '*+' else 1)
+ elif token == '{':
+ min_repeat = max_repeat = ''
+ while tokens and tokens[0] not in ',}':
+ min_repeat += tokens.pop(0)
+ if tokens and tokens[0] == ',':
+ tokens.pop(0)
+ while tokens and tokens[0] != '}':
+ max_repeat += tokens.pop(0)
+ if tokens and tokens[0] == '}':
+ tokens.pop(0)
+ return Repeat(parse_atom(), int(min_repeat) if min_repeat else 0, int(max_repeat) if max_repeat else None)
+ else:
+ return Regex(token)
+
+ tokens = regex_tokens.copy()
+ result = parse_sequence()
+ while tokens:
+ if tokens[0] == '|':
+ tokens.pop(0)
+ result = AnyNode([result, parse_sequence()])
+ else:
+ break
+ return result
diff --git a/src/prompt_toolkit/contrib/ssh/server.py b/src/prompt_toolkit/contrib/ssh/server.py
index 73ec9401..68bc92d1 100644
--- a/src/prompt_toolkit/contrib/ssh/server.py
+++ b/src/prompt_toolkit/contrib/ssh/server.py
@@ -52,7 +52,10 @@ class PromptToolkitSSHSession(asyncssh.SSHServerSession):
"""
Callable that returns the current `Size`, required by Vt100_Output.
"""
- pass
+ if self._chan is None:
+ return Size(rows=24, columns=80) # Default size if channel is not available
+ width, height, _, _ = self._chan.get_terminal_size()
+ return Size(rows=height, columns=width)
class PromptToolkitSSHServer(asyncssh.SSHServer):
diff --git a/src/prompt_toolkit/contrib/telnet/protocol.py b/src/prompt_toolkit/contrib/telnet/protocol.py
index f58280d4..8c337a4d 100644
--- a/src/prompt_toolkit/contrib/telnet/protocol.py
+++ b/src/prompt_toolkit/contrib/telnet/protocol.py
@@ -63,47 +63,106 @@ class TelnetProtocolParser:
def do_received(self, data: bytes) ->None:
"""Received telnet DO command."""
- pass
+ logger.debug(f"Received DO command: {data}")
+ # Respond with WILL if we support the option, WONT otherwise
+ # This is a simplified implementation
+ self.feed(IAC + WILL + data)
def dont_received(self, data: bytes) ->None:
"""Received telnet DONT command."""
- pass
+ logger.debug(f"Received DONT command: {data}")
+ # Acknowledge the DONT command
+ self.feed(IAC + WONT + data)
def will_received(self, data: bytes) ->None:
"""Received telnet WILL command."""
- pass
+ logger.debug(f"Received WILL command: {data}")
+ # Respond with DO if we want to enable the option, DONT otherwise
+ # This is a simplified implementation
+ self.feed(IAC + DO + data)
def wont_received(self, data: bytes) ->None:
"""Received telnet WONT command."""
- pass
+ logger.debug(f"Received WONT command: {data}")
+ # Acknowledge the WONT command
+ self.feed(IAC + DONT + data)
def naws(self, data: bytes) ->None:
"""
Received NAWS. (Window dimensions.)
"""
- pass
+ if len(data) == 4:
+ columns, rows = struct.unpack('!HH', data)
+ logger.debug(f"Received NAWS: {columns}x{rows}")
+ self.size_received_callback(rows, columns)
+ else:
+ logger.warning(f"Invalid NAWS data received: {data}")
def ttype(self, data: bytes) ->None:
"""
Received terminal type.
"""
- pass
+ if data.startswith(IS):
+ terminal_type = data[1:].decode('ascii', errors='ignore')
+ logger.debug(f"Received terminal type: {terminal_type}")
+ self.ttype_received_callback(terminal_type)
+ else:
+ logger.warning(f"Invalid TTYPE data received: {data}")
def negotiate(self, data: bytes) ->None:
"""
Got negotiate data.
"""
- pass
+ logger.debug(f"Negotiating: {data}")
+ # This method can be expanded to handle specific negotiation scenarios
+ # For now, we'll just log the data
def _parse_coroutine(self) ->Generator[None, bytes, None]:
"""
Parser state machine.
Every 'yield' expression returns the next byte.
"""
- pass
+ while True:
+ d = yield
+ if d == IAC:
+ d = yield
+ if d == IAC:
+ self.data_received_callback(IAC)
+ elif d in (DO, DONT, WILL, WONT):
+ command = d
+ d = yield
+ if command == DO:
+ self.do_received(d)
+ elif command == DONT:
+ self.dont_received(d)
+ elif command == WILL:
+ self.will_received(d)
+ elif command == WONT:
+ self.wont_received(d)
+ elif d == SB:
+ buffer = []
+ while True:
+ d = yield
+ if d == IAC:
+ d = yield
+ if d == SE:
+ break
+ buffer.append(d)
+ buffer = b''.join(buffer)
+ if buffer.startswith(NAWS):
+ self.naws(buffer[1:])
+ elif buffer.startswith(TTYPE):
+ self.ttype(buffer[1:])
+ else:
+ self.negotiate(buffer)
+ else:
+ self.negotiate(d)
+ else:
+ self.data_received_callback(d)
def feed(self, data: bytes) ->None:
"""
Feed data to the parser.
"""
- pass
+ for b in data:
+ self._parser.send(bytes([b]))
diff --git a/src/prompt_toolkit/contrib/telnet/server.py b/src/prompt_toolkit/contrib/telnet/server.py
index 4feaaadb..88ed36c4 100644
--- a/src/prompt_toolkit/contrib/telnet/server.py
+++ b/src/prompt_toolkit/contrib/telnet/server.py
@@ -84,38 +84,54 @@ class TelnetConnection:
"""
Run application.
"""
- pass
+ await self._ready.wait()
+ self.context = contextvars.copy_context()
+ with create_app_session(input=self.vt100_input, output=self.vt100_output):
+ await self.interact(self)
def feed(self, data: bytes) ->None:
"""
Handler for incoming data. (Called by TelnetServer.)
"""
- pass
+ self.parser.feed(data)
def close(self) ->None:
"""
Closed by client.
"""
- pass
+ if not self._closed:
+ self._closed = True
+ self.vt100_input.close()
+ self.conn.close()
+ self.server.connections.remove(self)
def send(self, formatted_text: AnyFormattedText) ->None:
"""
Send text to the client.
"""
- pass
+ formatted_text = to_formatted_text(formatted_text)
+ print_formatted_text(self.vt100_output, formatted_text, self.style or DummyStyle())
def send_above_prompt(self, formatted_text: AnyFormattedText) ->None:
"""
Send text to the client.
This is asynchronous, returns a `Future`.
"""
- pass
+ async def send_above_prompt() ->None:
+ if self.context:
+ with create_app_session(input=self.vt100_input, output=self.vt100_output):
+ self.context.run(lambda: run_in_terminal(lambda: self.send(formatted_text)))
+
+ asyncio.create_task(send_above_prompt())
def erase_screen(self) ->None:
"""
Erase the screen and move the cursor to the top.
"""
- pass
+ if self.vt100_output:
+ self.vt100_output.erase_screen()
+ self.vt100_output.cursor_goto(0, 0)
+ self.vt100_output.flush()
class TelnetServer:
@@ -157,7 +173,21 @@ class TelnetServer:
:param ready_cb: Callback that will be called at the point that we're
actually listening.
"""
- pass
+ loop = get_running_loop()
+ server = await loop.create_server(
+ lambda: asyncio.Protocol(), # Placeholder protocol
+ self.host, self.port, reuse_address=True)
+
+ logger.info(f"Listening for telnet connections on {self.host}:{self.port}")
+
+ if ready_cb:
+ ready_cb()
+
+ async with server:
+ while True:
+ conn, addr = await server.accept()
+ logger.info(f"New connection from {addr}")
+ self._accept(conn)
def start(self) ->None:
"""
@@ -165,7 +195,8 @@ class TelnetServer:
Start the telnet server (stop by calling and awaiting `stop()`).
"""
- pass
+ if self._run_task is None:
+ self._run_task = asyncio.create_task(self.run())
async def stop(self) ->None:
"""
@@ -174,10 +205,37 @@ class TelnetServer:
Stop a telnet server that was started using `.start()` and wait for the
cancellation to complete.
"""
- pass
+ if self._run_task:
+ self._run_task.cancel()
+ await asyncio.gather(self._run_task, return_exceptions=True)
+ self._run_task = None
+
+ for task in self._application_tasks:
+ task.cancel()
+
+ await asyncio.gather(*self._application_tasks, return_exceptions=True)
+ self._application_tasks.clear()
+
+ for connection in list(self.connections):
+ connection.close()
- def _accept(self, listen_socket: socket.socket) ->None:
+ def _accept(self, conn: socket.socket) ->None:
"""
Accept new incoming connection.
"""
- pass
+ addr = conn.getpeername()
+ vt100_input = create_pipe_input()
+
+ connection = TelnetConnection(
+ conn=conn,
+ addr=addr,
+ interact=self.interact,
+ server=self,
+ encoding=self.encoding,
+ style=self.style,
+ vt100_input=vt100_input,
+ enable_cpr=self.enable_cpr,
+ )
+
+ self.connections.add(connection)
+ self._application_tasks.append(asyncio.create_task(connection.run_application()))
diff --git a/src/prompt_toolkit/cursor_shapes.py b/src/prompt_toolkit/cursor_shapes.py
index 076033d2..ee763816 100644
--- a/src/prompt_toolkit/cursor_shapes.py
+++ b/src/prompt_toolkit/cursor_shapes.py
@@ -43,12 +43,32 @@ class SimpleCursorShapeConfig(CursorShapeConfig):
) ->None:
self.cursor_shape = cursor_shape
+ def get_cursor_shape(self, application: Application[Any]) ->CursorShape:
+ return self.cursor_shape
+
class ModalCursorShapeConfig(CursorShapeConfig):
"""
Show cursor shape according to the current input mode.
"""
+ def __init__(self,
+ emacs: CursorShape = CursorShape.BEAM,
+ vi_insert: CursorShape = CursorShape.BEAM,
+ vi_navigation: CursorShape = CursorShape.BLOCK) -> None:
+ self.emacs = emacs
+ self.vi_insert = vi_insert
+ self.vi_navigation = vi_navigation
+
+ def get_cursor_shape(self, application: Application[Any]) -> CursorShape:
+ if application.editing_mode == EditingMode.VI:
+ if application.vi_state.input_mode == InputMode.INSERT:
+ return self.vi_insert
+ else:
+ return self.vi_navigation
+ else:
+ return self.emacs
+
class DynamicCursorShapeConfig(CursorShapeConfig):
@@ -56,10 +76,26 @@ class DynamicCursorShapeConfig(CursorShapeConfig):
AnyCursorShapeConfig]) ->None:
self.get_cursor_shape_config = get_cursor_shape_config
+ def get_cursor_shape(self, application: Application[Any]) -> CursorShape:
+ config = self.get_cursor_shape_config()
+ if isinstance(config, CursorShape):
+ return config
+ elif isinstance(config, CursorShapeConfig):
+ return config.get_cursor_shape(application)
+ else:
+ return CursorShape._NEVER_CHANGE
+
def to_cursor_shape_config(value: AnyCursorShapeConfig) ->CursorShapeConfig:
"""
Take a `CursorShape` instance or `CursorShapeConfig` and turn it into a
`CursorShapeConfig`.
"""
- pass
+ if isinstance(value, CursorShapeConfig):
+ return value
+ elif isinstance(value, CursorShape):
+ return SimpleCursorShapeConfig(value)
+ elif value is None:
+ return SimpleCursorShapeConfig(CursorShape._NEVER_CHANGE)
+ else:
+ raise TypeError(f"Invalid cursor shape config: {value}")
diff --git a/src/prompt_toolkit/document.py b/src/prompt_toolkit/document.py
index c246ba0d..0a5fbdbe 100644
--- a/src/prompt_toolkit/document.py
+++ b/src/prompt_toolkit/document.py
@@ -90,109 +90,119 @@ class Document:
@property
def text(self) ->str:
"""The document text."""
- pass
+ return self._text
@property
def cursor_position(self) ->int:
"""The document cursor position."""
- pass
+ return self._cursor_position
@property
def selection(self) ->(SelectionState | None):
""":class:`.SelectionState` object."""
- pass
+ return self._selection
@property
def current_char(self) ->str:
"""Return character under cursor or an empty string."""
- pass
+ return self._get_char_relative_to_cursor() if self.cursor_position < len(self._text) else ''
@property
def char_before_cursor(self) ->str:
"""Return character before the cursor or an empty string."""
- pass
+ return self._get_char_relative_to_cursor(-1) if self.cursor_position > 0 else ''
@property
def current_line_before_cursor(self) ->str:
"""Text from the start of the line until the cursor."""
- pass
+ return self.current_line[:self.cursor_position_col]
@property
def current_line_after_cursor(self) ->str:
"""Text from the cursor until the end of the line."""
- pass
+ return self.current_line[self.cursor_position_col:]
@property
def lines(self) ->list[str]:
"""
Array of all the lines.
"""
- pass
+ if self._cache.lines is None:
+ self._cache.lines = _ImmutableLineList(self._text.splitlines(keepends=True))
+ return self._cache.lines
@property
def _line_start_indexes(self) ->list[int]:
"""
Array pointing to the start indexes of all the lines.
"""
- pass
+ if self._cache.line_indexes is None:
+ indexes = [0]
+ for line in self.lines[:-1]:
+ indexes.append(indexes[-1] + len(line))
+ self._cache.line_indexes = indexes
+ return self._cache.line_indexes
@property
def lines_from_current(self) ->list[str]:
"""
Array of the lines starting from the current line, until the last line.
"""
- pass
+ return self.lines[self.cursor_position_row:]
@property
def line_count(self) ->int:
"""Return the number of lines in this document. If the document ends
with a trailing \\n, that counts as the beginning of a new line."""
- pass
+ return len(self.lines)
@property
def current_line(self) ->str:
"""Return the text on the line where the cursor is. (when the input
consists of just one line, it equals `text`."""
- pass
+ return self.lines[self.cursor_position_row]
@property
def leading_whitespace_in_current_line(self) ->str:
"""The leading whitespace in the left margin of the current line."""
- pass
+ return self.current_line[:len(self.current_line) - len(self.current_line.lstrip())]
def _get_char_relative_to_cursor(self, offset: int=0) ->str:
"""
Return character relative to cursor position, or empty string
"""
- pass
+ try:
+ return self._text[self._cursor_position + offset]
+ except IndexError:
+ return ''
@property
def on_first_line(self) ->bool:
"""
True when we are at the first line.
"""
- pass
+ return self.cursor_position_row == 0
@property
def on_last_line(self) ->bool:
"""
True when we are at the last line.
"""
- pass
+ return self.cursor_position_row == self.line_count - 1
@property
def cursor_position_row(self) ->int:
"""
Current row. (0-based.)
"""
- pass
+ return self._find_line_start_index(self._cursor_position)[0]
@property
def cursor_position_col(self) ->int:
"""
Current column. (0-based.)
"""
- pass
+ return self._cursor_position - self._find_line_start_index(self._cursor_position)[1]
def _find_line_start_index(self, index: int) ->tuple[int, int]:
"""
@@ -201,14 +211,18 @@ class Document:
Return (row, index) tuple.
"""
- pass
+ indexes = self._line_start_indexes
+ row = bisect.bisect_right(indexes, index) - 1
+ return row, indexes[row]
def translate_index_to_position(self, index: int) ->tuple[int, int]:
"""
Given an index for the text, return the corresponding (row, col) tuple.
(0-based. Returns (0, 0) for index=0.)
"""
- pass
+ row, row_index = self._find_line_start_index(index)
+ col = index - row_index
+ return row, col
def translate_row_col_to_index(self, row: int, col: int) ->int:
"""
@@ -217,7 +231,10 @@ class Document:
Negative row/col values are turned into zero.
"""
- pass
+ row = max(0, row)
+ col = max(0, col)
+ row = min(row, len(self._line_start_indexes) - 1)
+ return min(self._line_start_indexes[row] + col, len(self._text))
@property
def is_cursor_at_the_end(self) ->bool:
diff --git a/src/prompt_toolkit/eventloop/async_generator.py b/src/prompt_toolkit/eventloop/async_generator.py
index 9c1d7e5a..eab1bbf1 100644
--- a/src/prompt_toolkit/eventloop/async_generator.py
+++ b/src/prompt_toolkit/eventloop/async_generator.py
@@ -14,7 +14,10 @@ _T_Generator = TypeVar('_T_Generator', bound=AsyncGenerator[Any, None])
@asynccontextmanager
async def aclosing(thing: _T_Generator) ->AsyncGenerator[_T_Generator, None]:
"""Similar to `contextlib.aclosing`, in Python 3.10."""
- pass
+ try:
+ yield thing
+ finally:
+ await thing.aclose()
DEFAULT_BUFFER_SIZE: int = 1000
@@ -37,4 +40,27 @@ async def generator_to_async_generator(get_iterable: Callable[[], Iterable[
:param buffer_size: Size of the queue between the async consumer and the
synchronous generator that produces items.
"""
- pass
+ queue: Queue[_T | _Done] = Queue(maxsize=buffer_size)
+ loop = get_running_loop()
+
+ def producer():
+ try:
+ for item in get_iterable():
+ queue.put(item)
+ except Exception as e:
+ loop.call_soon_threadsafe(lambda: queue.put(e))
+ finally:
+ queue.put(_Done())
+
+ await run_in_executor_with_context(producer)
+
+ while True:
+ try:
+ item = await loop.run_in_executor(None, queue.get, True, 0.1)
+ if isinstance(item, _Done):
+ break
+ if isinstance(item, Exception):
+ raise item
+ yield item
+ except Empty:
+ pass
diff --git a/src/prompt_toolkit/eventloop/inputhook.py b/src/prompt_toolkit/eventloop/inputhook.py
index 33584d8e..436c545f 100644
--- a/src/prompt_toolkit/eventloop/inputhook.py
+++ b/src/prompt_toolkit/eventloop/inputhook.py
@@ -58,7 +58,9 @@ def new_eventloop_with_inputhook(inputhook: Callable[[InputHookContext], None]
"""
Create a new event loop with the given inputhook.
"""
- pass
+ selector = selectors.SelectSelector()
+ loop = asyncio.SelectorEventLoop(InputHookSelector(selector, inputhook))
+ return loop
def set_eventloop_with_inputhook(inputhook: Callable[[InputHookContext], None]
@@ -66,7 +68,9 @@ def set_eventloop_with_inputhook(inputhook: Callable[[InputHookContext], None]
"""
Create a new event loop with the given inputhook, and activate it.
"""
- pass
+ loop = new_eventloop_with_inputhook(inputhook)
+ asyncio.set_event_loop(loop)
+ return loop
class InputHookSelector(BaseSelector):
@@ -88,4 +92,31 @@ class InputHookSelector(BaseSelector):
"""
Clean up resources.
"""
- pass
+ self.selector.close()
+ os.close(self._r)
+ os.close(self._w)
+
+ def register(self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None) -> SelectorKey:
+ return self.selector.register(fileobj, events, data)
+
+ def unregister(self, fileobj: FileDescriptorLike) -> SelectorKey:
+ return self.selector.unregister(fileobj)
+
+ def modify(self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None) -> SelectorKey:
+ return self.selector.modify(fileobj, events, data)
+
+ def select(self, timeout: float | None = None) -> list[tuple[SelectorKey, _EventMask]]:
+ ready = self.selector.select(timeout=0)
+ if ready:
+ return ready
+
+ def input_is_ready() -> bool:
+ return bool(self.selector.select(timeout=0))
+
+ context = InputHookContext(self._r, input_is_ready)
+ self.inputhook(context)
+
+ return self.selector.select(timeout=0)
+
+ def get_map(self) -> Mapping[FileDescriptorLike, SelectorKey]:
+ return self.selector.get_map()
diff --git a/src/prompt_toolkit/eventloop/utils.py b/src/prompt_toolkit/eventloop/utils.py
index 4b6cc6db..4a03827d 100644
--- a/src/prompt_toolkit/eventloop/utils.py
+++ b/src/prompt_toolkit/eventloop/utils.py
@@ -19,7 +19,9 @@ def run_in_executor_with_context(func: Callable[..., _T], *args: Any, loop:
See also: https://bugs.python.org/issue34014
"""
- pass
+ loop = loop or asyncio.get_event_loop()
+ ctx = contextvars.copy_context()
+ return loop.run_in_executor(None, lambda: ctx.run(func, *args))
def call_soon_threadsafe(func: Callable[[], None], max_postpone_time: (
@@ -40,7 +42,19 @@ def call_soon_threadsafe(func: Callable[[], None], max_postpone_time: (
However, we want to set a deadline value, for when the rendering should
happen. (The UI should stay responsive).
"""
- pass
+ loop = loop or asyncio.get_event_loop()
+
+ if max_postpone_time is None:
+ loop.call_soon_threadsafe(func)
+ else:
+ def wrapper():
+ if time.time() >= deadline:
+ func()
+ else:
+ loop.call_soon(wrapper)
+
+ deadline = time.time() + max_postpone_time
+ loop.call_soon_threadsafe(wrapper)
def get_traceback_from_context(context: dict[str, Any]) ->(TracebackType | None
@@ -48,4 +62,7 @@ def get_traceback_from_context(context: dict[str, Any]) ->(TracebackType | None
"""
Get the traceback object from the context.
"""
- pass
+ exception = context.get('exception')
+ if exception:
+ return exception.__traceback__
+ return None
diff --git a/src/prompt_toolkit/eventloop/win32.py b/src/prompt_toolkit/eventloop/win32.py
index 55033702..cc0a2173 100644
--- a/src/prompt_toolkit/eventloop/win32.py
+++ b/src/prompt_toolkit/eventloop/win32.py
@@ -33,7 +33,17 @@ def wait_for_handles(handles: list[HANDLE], timeout: int=INFINITE) ->(HANDLE |
This function returns either `None` or one of the given `HANDLE` objects.
(The return value can be tested with the `is` operator.)
"""
- pass
+ arr = (HANDLE * len(handles))(*handles)
+ ret = windll.kernel32.WaitForMultipleObjects(
+ len(handles),
+ arr,
+ BOOL(False),
+ DWORD(timeout)
+ )
+ if ret == WAIT_TIMEOUT:
+ return None
+ else:
+ return handles[ret]
def create_win32_event() ->HANDLE:
@@ -41,4 +51,19 @@ def create_win32_event() ->HANDLE:
Creates a Win32 unnamed Event .
http://msdn.microsoft.com/en-us/library/windows/desktop/ms682396(v=vs.85).aspx
"""
- pass
+ sa = SECURITY_ATTRIBUTES()
+ sa.nLength = DWORD(sizeof(SECURITY_ATTRIBUTES))
+ sa.bInheritHandle = BOOL(True)
+ sa.lpSecurityDescriptor = None
+
+ handle = windll.kernel32.CreateEventA(
+ pointer(sa),
+ BOOL(True), # Manual reset event
+ BOOL(False), # Initial state = 0
+ None # Unnamed event
+ )
+
+ if handle == 0:
+ raise WindowsError(windll.kernel32.GetLastError())
+
+ return handle
diff --git a/src/prompt_toolkit/filters/app.py b/src/prompt_toolkit/filters/app.py
index 1cf7bf99..21c2010b 100644
--- a/src/prompt_toolkit/filters/app.py
+++ b/src/prompt_toolkit/filters/app.py
@@ -25,7 +25,9 @@ def has_focus(value: FocusableElement) ->Condition:
"""
Enable when this buffer has the focus.
"""
- pass
+ def has_focus_filter() -> bool:
+ return get_app().layout.current_control == value
+ return Condition(has_focus_filter)
@Condition
@@ -33,7 +35,8 @@ def buffer_has_focus() ->bool:
"""
Enabled when the currently focused control is a `BufferControl`.
"""
- pass
+ from prompt_toolkit.layout.controls import BufferControl
+ return isinstance(get_app().layout.current_control, BufferControl)
@Condition
@@ -41,7 +44,7 @@ def has_selection() ->bool:
"""
Enable when the current buffer has a selection.
"""
- pass
+ return bool(get_app().current_buffer.selection_state)
@Condition
@@ -49,7 +52,7 @@ def has_suggestion() ->bool:
"""
Enable when the current buffer has a suggestion.
"""
- pass
+ return get_app().current_buffer.suggestion is not None
@Condition
@@ -57,7 +60,7 @@ def has_completions() ->bool:
"""
Enable when the current buffer has completions.
"""
- pass
+ return bool(get_app().current_buffer.completer)
@Condition
@@ -65,7 +68,8 @@ def completion_is_selected() ->bool:
"""
True when the user selected a completion.
"""
- pass
+ return get_app().current_buffer.complete_state is not None and \
+ get_app().current_buffer.complete_state.current_completion is not None
@Condition
@@ -73,7 +77,7 @@ def is_read_only() ->bool:
"""
True when the current buffer is read only.
"""
- pass
+ return get_app().current_buffer.read_only()
@Condition
@@ -81,19 +85,19 @@ def is_multiline() ->bool:
"""
True when the current buffer has been marked as multiline.
"""
- pass
+ return get_app().current_buffer.multiline
@Condition
def has_validation_error() ->bool:
"""Current buffer has validation error."""
- pass
+ return get_app().current_buffer.validation_error is not None
@Condition
def has_arg() ->bool:
"""Enable when the input processor has an 'arg'."""
- pass
+ return get_app().key_processor.arg is not None
@Condition
@@ -101,7 +105,7 @@ def is_done() ->bool:
"""
True when the CLI is returning, aborting or exiting.
"""
- pass
+ return get_app().is_done
@Condition
@@ -115,7 +119,7 @@ def renderer_height_is_known() ->bool:
until we receive the height, in order to avoid flickering -- first drawing
somewhere in the middle, and then again at the bottom.)
"""
- pass
+ return get_app().renderer.height_is_known
@memoized()
@@ -123,7 +127,9 @@ def in_editing_mode(editing_mode: EditingMode) ->Condition:
"""
Check whether a given editing mode is active. (Vi or Emacs.)
"""
- pass
+ def in_editing_mode_filter() -> bool:
+ return get_app().editing_mode == editing_mode
+ return Condition(in_editing_mode_filter)
@Condition
@@ -131,34 +137,37 @@ def vi_navigation_mode() ->bool:
"""
Active when the set for Vi navigation key bindings are active.
"""
- pass
+ app = get_app()
+ return app.editing_mode == EditingMode.VI and \
+ app.vi_state.input_mode == 'navigation'
@Condition
def vi_recording_macro() ->bool:
"""When recording a Vi macro."""
- pass
+ return get_app().vi_state.recording_macro
@Condition
def emacs_mode() ->bool:
"""When the Emacs bindings are active."""
- pass
+ return get_app().editing_mode == EditingMode.EMACS
@Condition
def is_searching() ->bool:
"""When we are searching."""
- pass
+ return get_app().layout.is_searching
@Condition
def control_is_searchable() ->bool:
"""When the current UIControl is searchable."""
- pass
+ from prompt_toolkit.layout.controls import SearchableControl
+ return isinstance(get_app().layout.current_control, SearchableControl)
@Condition
def vi_search_direction_reversed() ->bool:
"""When the '/' and '?' key bindings for Vi-style searching have been reversed."""
- pass
+ return get_app().vi_search_direction_reversed
diff --git a/src/prompt_toolkit/filters/base.py b/src/prompt_toolkit/filters/base.py
index 66696022..7ddb9aef 100644
--- a/src/prompt_toolkit/filters/base.py
+++ b/src/prompt_toolkit/filters/base.py
@@ -93,7 +93,10 @@ class _AndList(Filter):
If there's only one unique filter in the given iterable, it will return
that one filter instead of an `_AndList`.
"""
- pass
+ unique_filters = list(set(filters))
+ if len(unique_filters) == 1:
+ return unique_filters[0]
+ return cls(unique_filters)
def __call__(self) ->bool:
return all(f() for f in self.filters)
@@ -119,7 +122,10 @@ class _OrList(Filter):
If there's only one unique filter in the given iterable, it will return
that one filter instead of an `_OrList`.
"""
- pass
+ unique_filters = list(set(filters))
+ if len(unique_filters) == 1:
+ return unique_filters[0]
+ return cls(unique_filters)
def __call__(self) ->bool:
return any(f() for f in self.filters)
diff --git a/src/prompt_toolkit/filters/utils.py b/src/prompt_toolkit/filters/utils.py
index 4234ca63..005f13eb 100644
--- a/src/prompt_toolkit/filters/utils.py
+++ b/src/prompt_toolkit/filters/utils.py
@@ -11,7 +11,9 @@ def to_filter(bool_or_filter: FilterOrBool) ->Filter:
Accept both booleans and Filters as input and
turn it into a Filter.
"""
- pass
+ if isinstance(bool_or_filter, bool):
+ return _bool_to_filter[bool_or_filter]
+ return bool_or_filter
def is_true(value: FilterOrBool) ->bool:
@@ -20,4 +22,6 @@ def is_true(value: FilterOrBool) ->bool:
:param value: Boolean or `Filter` instance.
"""
- pass
+ if isinstance(value, bool):
+ return value
+ return bool(value())
diff --git a/src/prompt_toolkit/formatted_text/ansi.py b/src/prompt_toolkit/formatted_text/ansi.py
index 35ad0c63..9e8b809a 100644
--- a/src/prompt_toolkit/formatted_text/ansi.py
+++ b/src/prompt_toolkit/formatted_text/ansi.py
@@ -43,19 +43,99 @@ class ANSI:
"""
Coroutine that parses the ANSI escape sequences.
"""
- pass
+ formatted_text = self._formatted_text
+ style = ''
+ text = ''
+
+ while True:
+ c = yield
+ if c == '\x1b':
+ if text:
+ formatted_text.append((style, text))
+ text = ''
+ # Parse escape sequence
+ sequence = ''
+ while True:
+ c = yield
+ if c.isalpha() or c == '\\':
+ sequence += c
+ break
+ sequence += c
+
+ if sequence.startswith('['):
+ params = sequence[1:-1].split(';')
+ self._select_graphic_rendition([int(p) if p.isdigit() else 0 for p in params])
+ style = self._create_style_string()
+ elif c in ('\001', '\002'):
+ if text:
+ formatted_text.append((style, text))
+ text = ''
+ formatted_text.append(('[ZeroWidthEscape]', c))
+ else:
+ text += c
+
+ if text:
+ formatted_text.append((style, text))
def _select_graphic_rendition(self, attrs: list[int]) ->None:
"""
- Taken a list of graphics attributes and apply changes.
+ Take a list of graphics attributes and apply changes.
"""
- pass
+ for attr in attrs:
+ if attr == 0:
+ self._color = self._bgcolor = None
+ self._bold = self._underline = self._strike = self._italic = self._blink = self._reverse = self._hidden = False
+ elif attr == 1:
+ self._bold = True
+ elif attr == 3:
+ self._italic = True
+ elif attr == 4:
+ self._underline = True
+ elif attr == 5:
+ self._blink = True
+ elif attr == 7:
+ self._reverse = True
+ elif attr == 8:
+ self._hidden = True
+ elif attr == 9:
+ self._strike = True
+ elif 30 <= attr <= 37:
+ self._color = _fg_colors[attr - 30]
+ elif attr == 39:
+ self._color = None
+ elif 40 <= attr <= 47:
+ self._bgcolor = _bg_colors[attr - 40]
+ elif attr == 49:
+ self._bgcolor = None
+ elif 90 <= attr <= 97:
+ self._color = _fg_colors[attr - 90 + 8]
+ elif 100 <= attr <= 107:
+ self._bgcolor = _bg_colors[attr - 100 + 8]
def _create_style_string(self) ->str:
"""
Turn current style flags into a string for usage in a formatted text.
"""
- pass
+ parts = []
+ if self._color:
+ parts.append(f'fg:{self._color}')
+ if self._bgcolor:
+ parts.append(f'bg:{self._bgcolor}')
+ if self._bold:
+ parts.append('bold')
+ if self._underline:
+ parts.append('underline')
+ if self._strike:
+ parts.append('strike')
+ if self._italic:
+ parts.append('italic')
+ if self._blink:
+ parts.append('blink')
+ if self._reverse:
+ parts.append('reverse')
+ if self._hidden:
+ parts.append('hidden')
+ return ' '.join(parts)
def __repr__(self) ->str:
return f'ANSI({self.value!r})'
@@ -68,7 +148,9 @@ class ANSI:
Like `str.format`, but make sure that the arguments are properly
escaped. (No ANSI escapes can be injected.)
"""
- pass
+ escaped_args = tuple(ansi_escape(arg) for arg in args)
+ escaped_kwargs = {key: ansi_escape(value) for key, value in kwargs.items()}
+ return ANSI(FORMATTER.vformat(self.value, escaped_args, escaped_kwargs))
def __mod__(self, value: object) ->ANSI:
"""
@@ -91,11 +173,17 @@ def ansi_escape(text: object) ->str:
"""
Replace characters with a special meaning.
"""
- pass
+ if not isinstance(text, str):
+ text = str(text)
+ return text.replace('\x1b', '?').replace('\b', '?')
class ANSIFormatter(Formatter):
- pass
+ def format_field(self, value: object, format_spec: str) ->str:
+ """
+ This is used by the string formatting operator.
+ """
+ return ansi_escape(super().format_field(value, format_spec))
FORMATTER = ANSIFormatter()
diff --git a/src/prompt_toolkit/formatted_text/base.py b/src/prompt_toolkit/formatted_text/base.py
index 41ecbc3b..62b8ff85 100644
--- a/src/prompt_toolkit/formatted_text/base.py
+++ b/src/prompt_toolkit/formatted_text/base.py
@@ -42,7 +42,20 @@ def to_formatted_text(value: AnyFormattedText, style: str='', auto_convert:
:param auto_convert: If `True`, also accept other types, and convert them
to a string first.
"""
- pass
+ if callable(value):
+ value = value()
+
+ if isinstance(value, str):
+ return FormattedText([(style, value)])
+ elif isinstance(value, list):
+ return FormattedText([(style + ' ' + item_style if style else item_style, item_text)
+ for item_style, item_text in value])
+ elif hasattr(value, '__pt_formatted_text__'):
+ return to_formatted_text(value.__pt_formatted_text__(), style)
+ elif auto_convert:
+ return FormattedText([(style, str(value))])
+ else:
+ raise ValueError(f"Invalid formatted text: {value!r}")
def is_formatted_text(value: object) ->TypeGuard[AnyFormattedText]:
@@ -51,7 +64,16 @@ def is_formatted_text(value: object) ->TypeGuard[AnyFormattedText]:
statements).
In case of a callable, it doesn't check the return type.
"""
- pass
+ if callable(value):
+ return True
+ if isinstance(value, (str, FormattedText)):
+ return True
+ if isinstance(value, list):
+ return all(isinstance(item, tuple) and len(item) in (2, 3) and isinstance(item[0], str) and isinstance(item[1], str)
+ for item in value)
+ if hasattr(value, '__pt_formatted_text__'):
+ return True
+ return False
class FormattedText(StyleAndTextTuples):
@@ -89,4 +111,7 @@ def merge_formatted_text(items: Iterable[AnyFormattedText]) ->AnyFormattedText:
"""
Merge (Concatenate) several pieces of formatted text together.
"""
- pass
+ result: StyleAndTextTuples = []
+ for item in items:
+ result.extend(to_formatted_text(item))
+ return FormattedText(result)
diff --git a/src/prompt_toolkit/formatted_text/html.py b/src/prompt_toolkit/formatted_text/html.py
index 25f82c7d..6edb1443 100644
--- a/src/prompt_toolkit/formatted_text/html.py
+++ b/src/prompt_toolkit/formatted_text/html.py
@@ -3,8 +3,14 @@ import xml.dom.minidom as minidom
from string import Formatter
from typing import Any
from .base import FormattedText, StyleAndTextTuples
+import html
+
__all__ = ['HTML']
+def html_escape(value: object) -> str:
+ """Escape special characters to their HTML entities."""
+ return html.escape(str(value), quote=True)
+
class HTML:
"""
@@ -93,7 +99,10 @@ class HTML:
Like `str.format`, but make sure that the arguments are properly
escaped.
"""
- pass
+ escaped_args = tuple(html_escape(arg) for arg in args)
+ escaped_kwargs = {k: html_escape(v) for k, v in kwargs.items()}
+ formatted_value = FORMATTER.vformat(self.value, escaped_args, escaped_kwargs)
+ return HTML(formatted_value)
def __mod__(self, value: object) ->HTML:
"""
@@ -106,7 +115,11 @@ class HTML:
class HTMLFormatter(Formatter):
- pass
+ def get_value(self, key: int | str, args: tuple, kwargs: dict) -> str:
+ value = super().get_value(key, args, kwargs)
+ if isinstance(value, HTML):
+ return value.value
+ return html_escape(value)
FORMATTER = HTMLFormatter()
diff --git a/src/prompt_toolkit/formatted_text/utils.py b/src/prompt_toolkit/formatted_text/utils.py
index 1e4f49b4..ccb49f30 100644
--- a/src/prompt_toolkit/formatted_text/utils.py
+++ b/src/prompt_toolkit/formatted_text/utils.py
@@ -16,7 +16,7 @@ def to_plain_text(value: AnyFormattedText) ->str:
"""
Turn any kind of formatted text back into plain text.
"""
- pass
+ return fragment_list_to_text(to_formatted_text(value))
def fragment_list_len(fragments: StyleAndTextTuples) ->int:
@@ -26,7 +26,7 @@ def fragment_list_len(fragments: StyleAndTextTuples) ->int:
:param fragments: List of ``(style_str, text)`` or
``(style_str, text, mouse_handler)`` tuples.
"""
- pass
+ return sum(len(text) for _, text, *_ in fragments)
def fragment_list_width(fragments: StyleAndTextTuples) ->int:
@@ -37,7 +37,7 @@ def fragment_list_width(fragments: StyleAndTextTuples) ->int:
:param fragments: List of ``(style_str, text)`` or
``(style_str, text, mouse_handler)`` tuples.
"""
- pass
+ return sum(get_cwidth(text) for _, text, *_ in fragments)
def fragment_list_to_text(fragments: StyleAndTextTuples) ->str:
@@ -47,7 +47,7 @@ def fragment_list_to_text(fragments: StyleAndTextTuples) ->str:
:param fragments: List of ``(style_str, text)`` or
``(style_str, text, mouse_handler)`` tuples.
"""
- pass
+ return ''.join(text for _, text, *_ in fragments)
def split_lines(fragments: Iterable[OneStyleAndTextTuple]) ->Iterable[
@@ -59,4 +59,14 @@ def split_lines(fragments: Iterable[OneStyleAndTextTuple]) ->Iterable[
:param fragments: Iterable of ``(style_str, text)`` or
``(style_str, text, mouse_handler)`` tuples.
"""
- pass
+ line: StyleAndTextTuples = []
+ for style, text, *rest in fragments:
+ parts = text.split('\n')
+ for part in parts[:-1]:
+ line.append((style, part, *rest))
+ yield line
+ line = []
+ if parts[-1]:
+ line.append((style, parts[-1], *rest))
+ if line:
+ yield line
diff --git a/src/prompt_toolkit/history.py b/src/prompt_toolkit/history.py
index de643197..12b55d7f 100644
--- a/src/prompt_toolkit/history.py
+++ b/src/prompt_toolkit/history.py
@@ -40,18 +40,24 @@ class History(metaclass=ABCMeta):
were were appended to the history will be incorporated next time this
method is called.
"""
- pass
+ if not self._loaded:
+ self._loaded_strings = list(self.load_history_strings())
+ self._loaded = True
+
+ for item in reversed(self._loaded_strings):
+ yield item
def get_strings(self) ->list[str]:
"""
Get the strings from the history that are loaded so far.
(In order. Oldest item first.)
"""
- pass
+ return self._loaded_strings.copy()
def append_string(self, string: str) ->None:
"""Add string to the history."""
- pass
+ self._loaded_strings.append(string)
+ self.store_string(string)
@abstractmethod
def load_history_strings(self) ->Iterable[str]:
@@ -94,7 +100,33 @@ class ThreadedHistory(History):
Like `History.load(), but call `self.load_history_strings()` in a
background thread.
"""
- pass
+ def load_in_thread():
+ with self._lock:
+ strings = list(self.history.load_history_strings())
+ self._loaded_strings.extend(strings)
+ for event in self._string_load_events:
+ event.set()
+
+ if self._load_thread is None:
+ self._load_thread = threading.Thread(target=load_in_thread)
+ self._load_thread.daemon = True
+ self._load_thread.start()
+
+ loop = get_running_loop()
+ while True:
+ with self._lock:
+ if self._loaded_strings:
+ string = self._loaded_strings.pop()
+ yield string
+ elif self._load_thread.is_alive():
+ event = threading.Event()
+ self._string_load_events.append(event)
+ with self._lock:
+ if self._loaded_strings:
+ continue
+ await loop.run_in_executor(None, event.wait)
+ else:
+ break
def __repr__(self) ->str:
return f'ThreadedHistory({self.history!r})'
@@ -115,12 +147,24 @@ class InMemoryHistory(History):
else:
self._storage = list(history_strings)
+ def load_history_strings(self) ->Iterable[str]:
+ return reversed(self._storage)
+
+ def store_string(self, string: str) ->None:
+ self._storage.append(string)
+
class DummyHistory(History):
"""
:class:`.History` object that doesn't remember anything.
"""
+ def load_history_strings(self) ->Iterable[str]:
+ return []
+
+ def store_string(self, string: str) ->None:
+ pass
+
class FileHistory(History):
"""
@@ -130,3 +174,13 @@ class FileHistory(History):
def __init__(self, filename: str) ->None:
self.filename = filename
super().__init__()
+
+ def load_history_strings(self) ->Iterable[str]:
+ if os.path.exists(self.filename):
+ with open(self.filename, 'r', encoding='utf-8') as f:
+ for line in reversed(f.readlines()):
+ yield line.rstrip('\n')
+
+ def store_string(self, string: str) ->None:
+ with open(self.filename, 'a', encoding='utf-8') as f:
+ f.write(string + '\n')
diff --git a/src/prompt_toolkit/input/ansi_escape_sequences.py b/src/prompt_toolkit/input/ansi_escape_sequences.py
index e1df88bf..9e8bae56 100644
--- a/src/prompt_toolkit/input/ansi_escape_sequences.py
+++ b/src/prompt_toolkit/input/ansi_escape_sequences.py
@@ -133,7 +133,16 @@ def _get_reverse_ansi_sequences() ->dict[Keys, str]:
Create a dictionary that maps prompt_toolkit keys back to the VT100 escape
sequences.
"""
- pass
+ result = {}
+ for sequence, key in ANSI_SEQUENCES.items():
+ if isinstance(key, Keys):
+ if key not in result:
+ result[key] = sequence
+ elif isinstance(key, tuple):
+ # For tuples, we only consider the last key in the tuple
+ if key[-1] not in result:
+ result[key[-1]] = sequence
+ return result
REVERSE_ANSI_SEQUENCES = _get_reverse_ansi_sequences()
diff --git a/src/prompt_toolkit/input/base.py b/src/prompt_toolkit/input/base.py
index 208e9d1f..a01e1baf 100644
--- a/src/prompt_toolkit/input/base.py
+++ b/src/prompt_toolkit/input/base.py
@@ -23,28 +23,28 @@ class Input(metaclass=ABCMeta):
"""
Fileno for putting this in an event loop.
"""
- pass
+ raise NotImplementedError
@abstractmethod
def typeahead_hash(self) ->str:
"""
Identifier for storing type ahead key presses.
"""
- pass
+ raise NotImplementedError
@abstractmethod
def read_keys(self) ->list[KeyPress]:
"""
Return a list of Key objects which are read/parsed from the input.
"""
- pass
+ raise NotImplementedError
def flush_keys(self) ->list[KeyPress]:
"""
Flush the underlying parser. and return the pending keys.
(Used for vt100 input.)
"""
- pass
+ return []
def flush(self) ->None:
"""The event loop can call this when the input has to be flushed."""
@@ -53,21 +53,21 @@ class Input(metaclass=ABCMeta):
@abstractproperty
def closed(self) ->bool:
"""Should be true when the input stream is closed."""
- pass
+ raise NotImplementedError
@abstractmethod
def raw_mode(self) ->ContextManager[None]:
"""
Context manager that turns the input into raw mode.
"""
- pass
+ raise NotImplementedError
@abstractmethod
def cooked_mode(self) ->ContextManager[None]:
"""
Context manager that turns the input into cooked mode.
"""
- pass
+ raise NotImplementedError
@abstractmethod
def attach(self, input_ready_callback: Callable[[], None]
@@ -76,7 +76,7 @@ class Input(metaclass=ABCMeta):
Return a context manager that makes this input active in the current
event loop.
"""
- pass
+ raise NotImplementedError
@abstractmethod
def detach(self) ->ContextManager[None]:
@@ -84,7 +84,7 @@ class Input(metaclass=ABCMeta):
Return a context manager that makes sure that this input is not active
in the current event loop.
"""
- pass
+ raise NotImplementedError
def close(self) ->None:
"""Close input."""
@@ -99,12 +99,12 @@ class PipeInput(Input):
@abstractmethod
def send_bytes(self, data: bytes) ->None:
"""Feed byte string into the pipe"""
- pass
+ raise NotImplementedError
@abstractmethod
def send_text(self, data: str) ->None:
"""Feed a text string into the pipe"""
- pass
+ raise NotImplementedError
class DummyInput(Input):
@@ -114,3 +114,32 @@ class DummyInput(Input):
If used in an actual application, it will make the application render
itself once and exit immediately, due to an `EOFError`.
"""
+
+ def fileno(self) -> int:
+ return -1
+
+ def typeahead_hash(self) -> str:
+ return "dummy"
+
+ def read_keys(self) -> list[KeyPress]:
+ raise EOFError()
+
+ @property
+ def closed(self) -> bool:
+ return True
+
+ @contextmanager
+ def raw_mode(self) -> Generator[None, None, None]:
+ yield
+
+ @contextmanager
+ def cooked_mode(self) -> Generator[None, None, None]:
+ yield
+
+ @contextmanager
+ def attach(self, input_ready_callback: Callable[[], None]) -> Generator[None, None, None]:
+ yield
+
+ @contextmanager
+ def detach(self) -> Generator[None, None, None]:
+ yield
diff --git a/src/prompt_toolkit/input/defaults.py b/src/prompt_toolkit/input/defaults.py
index 43cd9c53..11e39c47 100644
--- a/src/prompt_toolkit/input/defaults.py
+++ b/src/prompt_toolkit/input/defaults.py
@@ -17,7 +17,19 @@ def create_input(stdin: (TextIO | None)=None, always_prefer_tty: bool=False
`sys.stdin`. (We can open `stdout` or `stderr` for reading, this is how
a `$PAGER` works.)
"""
- pass
+ if stdin is None:
+ stdin = sys.stdin
+
+ if sys.platform == 'win32':
+ from prompt_toolkit.input.win32 import Win32Input
+ return Win32Input(stdin)
+ else:
+ from prompt_toolkit.input.vt100 import Vt100Input
+ if always_prefer_tty and not stdin.isatty():
+ for file in (sys.stderr, sys.stdout):
+ if file.isatty():
+ return Vt100Input(open(file.fileno(), 'rb', buffering=0))
+ return Vt100Input(stdin)
def create_pipe_input() ->ContextManager[PipeInput]:
@@ -33,4 +45,12 @@ def create_pipe_input() ->ContextManager[PipeInput]:
Breaking change: In prompt_toolkit 3.0.28 and earlier, this was returning
the `PipeInput` directly, rather than through a context manager.
"""
- pass
+ return _PipeInputContextManager()
+
+class _PipeInputContextManager:
+ def __enter__(self) -> PipeInput:
+ self.pipe_input = PipeInput()
+ return self.pipe_input
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.pipe_input.close()
diff --git a/src/prompt_toolkit/input/posix_pipe.py b/src/prompt_toolkit/input/posix_pipe.py
index 33950c1e..0499dfae 100644
--- a/src/prompt_toolkit/input/posix_pipe.py
+++ b/src/prompt_toolkit/input/posix_pipe.py
@@ -20,15 +20,20 @@ class _Pipe:
def close_read(self) ->None:
"""Close read-end if not yet closed."""
- pass
+ if not self._read_closed:
+ os.close(self.read_fd)
+ self._read_closed = True
def close_write(self) ->None:
"""Close write-end if not yet closed."""
- pass
+ if not self._write_closed:
+ os.close(self.write_fd)
+ self._write_closed = True
def close(self) ->None:
"""Close both read and write ends."""
- pass
+ self.close_read()
+ self.close_write()
class PosixPipeInput(Vt100Input, PipeInput):
@@ -63,14 +68,14 @@ class PosixPipeInput(Vt100Input, PipeInput):
def send_text(self, data: str) ->None:
"""Send text to the input."""
- pass
+ os.write(self.pipe.write_fd, data.encode('utf-8'))
def close(self) ->None:
"""Close pipe fds."""
- pass
+ self.pipe.close()
def typeahead_hash(self) ->str:
"""
This needs to be unique for every `PipeInput`.
"""
- pass
+ return f'posix-pipe-input-{self._id}'
diff --git a/src/prompt_toolkit/input/posix_utils.py b/src/prompt_toolkit/input/posix_utils.py
index 3e1d3ca4..6e39d8ff 100644
--- a/src/prompt_toolkit/input/posix_utils.py
+++ b/src/prompt_toolkit/input/posix_utils.py
@@ -43,4 +43,27 @@ class PosixStdinReader:
the input stream was not yet closed. This means that something went
wrong during the decoding.
"""
- pass
+ # Check if the file is closed
+ if self.closed:
+ return ''
+
+ # Use select to check if there's data available to read
+ r, _, _ = select.select([self.stdin_fd], [], [], 0)
+ if not r:
+ return '' # No data available
+
+ # Read available data
+ try:
+ data = os.read(self.stdin_fd, count)
+ except OSError:
+ # File descriptor was closed
+ self.closed = True
+ return ''
+
+ if not data:
+ # End of file
+ self.closed = True
+ return ''
+
+ # Decode the data
+ return self._stdin_decoder.decode(data)
diff --git a/src/prompt_toolkit/input/typeahead.py b/src/prompt_toolkit/input/typeahead.py
index 1d5e4fca..c10ccc73 100644
--- a/src/prompt_toolkit/input/typeahead.py
+++ b/src/prompt_toolkit/input/typeahead.py
@@ -43,18 +43,20 @@ def store_typeahead(input_obj: Input, key_presses: list[KeyPress]) ->None:
"""
Insert typeahead key presses for the given input.
"""
- pass
+ _buffer[input_obj].extend(key_presses)
def get_typeahead(input_obj: Input) ->list[KeyPress]:
"""
Retrieve typeahead and reset the buffer for this input.
"""
- pass
+ typeahead = _buffer[input_obj]
+ _buffer[input_obj] = []
+ return typeahead
def clear_typeahead(input_obj: Input) ->None:
"""
Clear typeahead buffer.
"""
- pass
+ _buffer[input_obj].clear()
diff --git a/src/prompt_toolkit/input/vt100.py b/src/prompt_toolkit/input/vt100.py
index 2e980418..2ab345d7 100644
--- a/src/prompt_toolkit/input/vt100.py
+++ b/src/prompt_toolkit/input/vt100.py
@@ -53,25 +53,31 @@ class Vt100Input(Input):
Return a context manager that makes this input active in the current
event loop.
"""
- pass
+ return _attached_input(self, input_ready_callback)
def detach(self) ->ContextManager[None]:
"""
Return a context manager that makes sure that this input is not active
in the current event loop.
"""
- pass
+ return _attached_input(self, None)
def read_keys(self) ->list[KeyPress]:
"""Read list of KeyPress."""
- pass
+ data = self.stdin_reader.read()
+ self.vt100_parser.feed(data)
+ result = self._buffer
+ self._buffer = []
+ return result
def flush_keys(self) ->list[KeyPress]:
"""
Flush pending keys and return them.
(Used for flushing the 'escape' key.)
"""
- pass
+ result = self._buffer
+ self._buffer = []
+ return result
_current_callbacks: dict[tuple[AbstractEventLoop, int], Callable[[], None] |
@@ -79,7 +85,7 @@ _current_callbacks: dict[tuple[AbstractEventLoop, int], Callable[[], None] |
@contextlib.contextmanager
-def _attached_input(input: Vt100Input, callback: Callable[[], None]
+def _attached_input(input: Vt100Input, callback: Callable[[], None] | None
) ->Generator[None, None, None]:
"""
Context manager that makes this input active in the current event loop.
@@ -87,7 +93,30 @@ def _attached_input(input: Vt100Input, callback: Callable[[], None]
:param input: :class:`~prompt_toolkit.input.Input` object.
:param callback: Called when the input is ready to read.
"""
- pass
+ loop = get_running_loop()
+ key = (loop, input._fileno)
+
+ if callback is None:
+ # Detach
+ previous = _current_callbacks.get(key)
+ if previous:
+ loop.remove_reader(input._fileno)
+ del _current_callbacks[key]
+ else:
+ # Attach
+ def ready() -> None:
+ callback()
+
+ _current_callbacks[key] = ready
+ loop.add_reader(input._fileno, ready)
+
+ try:
+ yield
+ finally:
+ if callback is not None:
+ loop.remove_reader(input._fileno)
+ if key in _current_callbacks:
+ del _current_callbacks[key]
class raw_mode:
diff --git a/src/prompt_toolkit/input/vt100_parser.py b/src/prompt_toolkit/input/vt100_parser.py
index e2623b5d..253829de 100644
--- a/src/prompt_toolkit/input/vt100_parser.py
+++ b/src/prompt_toolkit/input/vt100_parser.py
@@ -65,26 +65,83 @@ class Vt100Parser:
"""
Start the parser coroutine.
"""
- pass
+ self._input_parser = self._input_parser_generator()
+ next(self._input_parser)
def _get_match(self, prefix: str) ->(None | Keys | tuple[Keys, ...]):
"""
Return the key (or keys) that maps to this prefix.
"""
- pass
+ if prefix in ANSI_SEQUENCES:
+ return ANSI_SEQUENCES[prefix]
+ elif _cpr_response_re.match(prefix):
+ return Keys.CPRResponse
+ elif _mouse_event_re.match(prefix):
+ return Keys.Vt100MouseEvent
+ return None
def _input_parser_generator(self) ->Generator[None, str | _Flush, None]:
"""
Coroutine (state machine) for the input parser.
"""
- pass
+ prefix = ''
+ retry = False
+ flush = False
+
+ while True:
+ flush = False
+ retry = False
+
+ if retry:
+ data = prefix
+ else:
+ data = yield
+
+ if isinstance(data, _Flush):
+ flush = True
+ data = ''
+
+ for c in data:
+ prefix += c
+ is_prefix_of_longer_match = _IS_PREFIX_OF_LONGER_MATCH_CACHE[prefix]
+ match = self._get_match(prefix)
+
+ if is_prefix_of_longer_match and not flush:
+ # No exact match found, but it is a prefix of a longer match.
+ continue
+
+ elif match is None:
+ # No match and also not a prefix of a longer match.
+ # Reset prefix and retry with the next character.
+ self._call_handler(prefix[0], '')
+ prefix = prefix[1:]
+ retry = True
+ break
+
+ else:
+ # Match found.
+ self._call_handler(match, '')
+ prefix = ''
+ retry = False
+
+ if flush and prefix:
+ # Flush prefix.
+ self._call_handler(prefix, '')
+ prefix = ''
def _call_handler(self, key: (str | Keys | tuple[Keys, ...]),
insert_text: str) ->None:
"""
Callback to handler.
"""
- pass
+ if isinstance(key, tuple):
+ for k in key:
+ self._call_handler(k, insert_text)
+ else:
+ if insert_text:
+ self.feed_key_callback(KeyPress(key, insert_text))
+ else:
+ self.feed_key_callback(KeyPress(key))
def feed(self, data: str) ->None:
"""
@@ -92,7 +149,7 @@ class Vt100Parser:
:param data: Input string (unicode).
"""
- pass
+ self._input_parser.send(data)
def flush(self) ->None:
"""
@@ -106,10 +163,11 @@ class Vt100Parser:
timeout, and processes everything that's still in the buffer as-is, so
without assuming any characters will follow.
"""
- pass
+ self._input_parser.send(_Flush())
def feed_and_flush(self, data: str) ->None:
"""
Wrapper around ``feed`` and ``flush``.
"""
- pass
+ self.feed(data)
+ self.flush()
diff --git a/src/prompt_toolkit/input/win32.py b/src/prompt_toolkit/input/win32.py
index 8d89a53c..ee29b772 100644
--- a/src/prompt_toolkit/input/win32.py
+++ b/src/prompt_toolkit/input/win32.py
@@ -52,14 +52,14 @@ class Win32Input(_Win32InputBase):
Return a context manager that makes this input active in the current
event loop.
"""
- pass
+ return attach_win32_input(self, input_ready_callback)
def detach(self) ->ContextManager[None]:
"""
Return a context manager that makes sure that this input is not active
in the current event loop.
"""
- pass
+ return detach_win32_input(self)
class ConsoleInputReader:
@@ -106,7 +106,8 @@ class ConsoleInputReader:
def close(self) ->None:
"""Close fdcon."""
- pass
+ if self._fdcon is not None:
+ os.close(self._fdcon)
def read(self) ->Iterable[KeyPress]:
"""
@@ -115,20 +116,49 @@ class ConsoleInputReader:
http://msdn.microsoft.com/en-us/library/windows/desktop/ms684961(v=vs.85).aspx
"""
- pass
+ max_records = 1024
+ records = (INPUT_RECORD * max_records)()
+ read = DWORD()
+
+ if windll.kernel32.ReadConsoleInputW(self.handle, records, max_records, pointer(read)):
+ return list(self._get_keys(read.value, records))
+ return []
def _insert_key_data(self, key_press: KeyPress) ->KeyPress:
"""
Insert KeyPress data, for vt100 compatibility.
"""
- pass
+ if key_press.data:
+ return key_press
+
+ data = REVERSE_ANSI_SEQUENCES.get(key_press.key, '')
+ return KeyPress(key_press.key, data)
def _get_keys(self, read: DWORD, input_records: Array[INPUT_RECORD]
) ->Iterator[KeyPress]:
"""
Generator that yields `KeyPress` objects from the input records.
"""
- pass
+ for i in range(read):
+ ir = input_records[i]
+
+ if ir.EventType == EventTypes.KEY_EVENT:
+ for key_press in self._event_to_key_presses(ir.Event.KeyEvent):
+ yield self._insert_key_data(key_press)
+
+ elif ir.EventType == EventTypes.MOUSE_EVENT:
+ for key_press in self._handle_mouse(ir.Event.MouseEvent):
+ yield key_press
+
+ elif ir.EventType == EventTypes.WINDOW_BUFFER_SIZE_EVENT:
+ yield KeyPress(Keys.WindowResize, '')
+
+ key_presses = list(self._merge_paired_surrogates(list(self._get_keys(read, input_records))))
+
+ if self.recognize_paste and self._is_paste(key_presses):
+ yield KeyPress(Keys.BracketedPaste, ''.join(k.data for k in key_presses))
+ else:
+ yield from key_presses
@staticmethod
def _merge_paired_surrogates(key_presses: list[KeyPress]) ->Iterator[
@@ -137,7 +167,16 @@ class ConsoleInputReader:
Combines consecutive KeyPresses with high and low surrogates into
single characters
"""
- pass
+ i = 0
+ while i < len(key_presses):
+ if i + 1 < len(key_presses) and \
+ 0xD800 <= ord(key_presses[i].data) <= 0xDBFF and \
+ 0xDC00 <= ord(key_presses[i + 1].data) <= 0xDFFF:
+ yield KeyPress(key_presses[i].key, key_presses[i].data + key_presses[i + 1].data)
+ i += 2
+ else:
+ yield key_presses[i]
+ i += 1
@staticmethod
def _is_paste(keys: list[KeyPress]) ->bool:
@@ -148,19 +187,69 @@ class ConsoleInputReader:
the best possible way to detect pasting of text and handle that
correctly.)
"""
- pass
+ return (
+ len(keys) > 1 and
+ all(k.key == Keys.ControlV or (
+ not isinstance(k.key, Keys) and
+ k.data is not None and
+ len(k.data) == 1
+ ) for k in keys)
+ )
def _event_to_key_presses(self, ev: KEY_EVENT_RECORD) ->list[KeyPress]:
"""
For this `KEY_EVENT_RECORD`, return a list of `KeyPress` instances.
"""
- pass
+ result = []
+
+ if ev.KeyDown or ev.KeyDown == 0: # In case of KeyUp, no unicode will be present.
+ if ev.UnicodeChar == '\x00':
+ if ev.VirtualKeyCode in self.keycodes:
+ result.append(KeyPress(self.keycodes[ev.VirtualKeyCode], ''))
+ else:
+ result.append(KeyPress(ev.UnicodeChar, ev.UnicodeChar))
+
+ # Correctly handle Control-Arrow keys.
+ if (ev.ControlKeyState & self.LEFT_CTRL_PRESSED or
+ ev.ControlKeyState & self.RIGHT_CTRL_PRESSED) and ev.VirtualKeyCode in self.keycodes:
+ result.append(KeyPress(self.keycodes[ev.VirtualKeyCode], ''))
+
+ # Turn stateful shift/control/alt keys into individual events.
+ for k, v in [
+ (Keys.Shift, ev.ControlKeyState & self.SHIFT_PRESSED),
+ (Keys.Control, ev.ControlKeyState & self.LEFT_CTRL_PRESSED),
+ (Keys.Control, ev.ControlKeyState & self.RIGHT_CTRL_PRESSED),
+ (Keys.Alt, ev.ControlKeyState & self.LEFT_ALT_PRESSED),
+ (Keys.Alt, ev.ControlKeyState & self.RIGHT_ALT_PRESSED),
+ ]:
+ if v:
+ result.append(KeyPress(k, ''))
+
+ return result
def _handle_mouse(self, ev: MOUSE_EVENT_RECORD) ->list[KeyPress]:
"""
Handle mouse events. Return a list of KeyPress instances.
"""
- pass
+ result = []
+
+ # Get mouse position.
+ position = ev.MousePosition.X, ev.MousePosition.Y
+
+ # Mouse event.
+ if ev.EventFlags in (0, MOUSE_MOVED):
+ # Button press or release.
+ if ev.ButtonState == FROM_LEFT_1ST_BUTTON_PRESSED:
+ result.append(KeyPress(Keys.MouseDown, ''))
+ elif ev.ButtonState == RIGHTMOST_BUTTON_PRESSED:
+ result.append(KeyPress(Keys.MouseDown, ''))
+ else:
+ result.append(KeyPress(Keys.MouseUp, ''))
+
+ elif ev.EventFlags & MOUSE_WHEELED:
+ result.append(KeyPress(Keys.ScrollUp if ev.ButtonState > 0 else Keys.ScrollDown, ''))
+
+ return result
class _Win32Handles:
@@ -188,7 +277,30 @@ class _Win32Handles:
"""
Add a Win32 handle to the event loop.
"""
- pass
+ handle_value = handle.value
+
+ # Create an event object.
+ event = create_win32_event()
+ self._remove_events[handle_value] = event
+
+ # Add reader.
+ def ready() ->None:
+ del self._handle_callbacks[handle_value]
+ del self._remove_events[handle_value]
+ callback()
+
+ self._handle_callbacks[handle_value] = ready
+
+ # Start wait thread.
+ def wait_for_handle() ->None:
+ while True:
+ result = wait_for_handles([handle, event])
+ if result == 0:
+ get_running_loop().call_soon_threadsafe(ready)
+ else:
+ break
+
+ threading.Thread(target=wait_for_handle, daemon=True).start()
def remove_win32_handle(self, handle: HANDLE) ->(Callable[[], None] | None
):
@@ -196,7 +308,15 @@ class _Win32Handles:
Remove a Win32 handle from the event loop.
Return either the registered handler or `None`.
"""
- pass
+ handle_value = handle.value
+
+ if handle_value in self._handle_callbacks:
+ callback = self._handle_callbacks.pop(handle_value)
+ event = self._remove_events.pop(handle_value)
+ windll.kernel32.SetEvent(event)
+ return callback
+
+ return None
@contextmanager
@@ -208,7 +328,18 @@ def attach_win32_input(input: _Win32InputBase, callback: Callable[[], None]
:param input: :class:`~prompt_toolkit.input.Input` object.
:param input_ready_callback: Called when the input is ready to read.
"""
- pass
+ handle = input.console_input_reader.handle
+
+ def ready() ->None:
+ # When the console is ready, set the event.
+ callback()
+
+ input.win32_handles.add_win32_handle(handle, ready)
+
+ try:
+ yield
+ finally:
+ input.win32_handles.remove_win32_handle(handle)
class raw_mode:
diff --git a/src/prompt_toolkit/input/win32_pipe.py b/src/prompt_toolkit/input/win32_pipe.py
index 740ea177..70e6f6ad 100644
--- a/src/prompt_toolkit/input/win32_pipe.py
+++ b/src/prompt_toolkit/input/win32_pipe.py
@@ -44,12 +44,12 @@ class Win32PipeInput(_Win32InputBase, PipeInput):
"""
The windows pipe doesn't depend on the file handle.
"""
- pass
+ return -1 # Return a dummy value since it's not used
@property
def handle(self) ->HANDLE:
"""The handle used for registering this pipe in the event loop."""
- pass
+ return self._event
def attach(self, input_ready_callback: Callable[[], None]
) ->ContextManager[None]:
@@ -57,40 +57,45 @@ class Win32PipeInput(_Win32InputBase, PipeInput):
Return a context manager that makes this input active in the current
event loop.
"""
- pass
+ return attach_win32_input(self, input_ready_callback)
def detach(self) ->ContextManager[None]:
"""
Return a context manager that makes sure that this input is not active
in the current event loop.
"""
- pass
+ return detach_win32_input(self)
def read_keys(self) ->list[KeyPress]:
"""Read list of KeyPress."""
- pass
+ result = self._buffer
+ self._buffer = []
+ return result
def flush_keys(self) ->list[KeyPress]:
"""
Flush pending keys and return them.
(Used for flushing the 'escape' key.)
"""
- pass
+ return self.read_keys()
def send_bytes(self, data: bytes) ->None:
"""Send bytes to the input."""
- pass
+ self.vt100_parser.feed(data)
+ windll.kernel32.SetEvent(self._event)
def send_text(self, text: str) ->None:
"""Send text to the input."""
- pass
+ self.send_bytes(text.encode())
def close(self) ->None:
"""Close write-end of the pipe."""
- pass
+ if not self._closed:
+ self._closed = True
+ windll.kernel32.CloseHandle(self._event)
def typeahead_hash(self) ->str:
"""
This needs to be unique for every `PipeInput`.
"""
- pass
+ return f'win32-pipe-input-{self._id}'
diff --git a/src/prompt_toolkit/key_binding/bindings/auto_suggest.py b/src/prompt_toolkit/key_binding/bindings/auto_suggest.py
index 67735983..52acce2c 100644
--- a/src/prompt_toolkit/key_binding/bindings/auto_suggest.py
+++ b/src/prompt_toolkit/key_binding/bindings/auto_suggest.py
@@ -19,4 +19,40 @@ def load_auto_suggest_bindings() ->KeyBindings:
implementation for the "right arrow", but we really want the suggestion
binding when a suggestion is available.)
"""
- pass
+ kb = KeyBindings()
+
+ @Condition
+ def suggestion_available() -> bool:
+ app = get_app()
+ return (
+ app.current_buffer.suggestion is not None
+ and app.current_buffer.document.is_cursor_at_the_end
+ )
+
+ @kb.add("right", filter=suggestion_available)
+ @kb.add("c-e", filter=suggestion_available)
+ @kb.add("c-f", filter=suggestion_available & emacs_mode)
+ def _(event: E) -> None:
+ """
+ Accept the auto-suggestion.
+ """
+ b = event.current_buffer
+ suggestion = b.suggestion
+
+ if suggestion:
+ b.insert_text(suggestion.text)
+
+ @kb.add("c-right", filter=suggestion_available)
+ def _(event: E) -> None:
+ """
+ Accept the next word of the auto-suggestion.
+ """
+ b = event.current_buffer
+ suggestion = b.suggestion
+
+ if suggestion:
+ word_match = re.match(r"^\S+\s*", suggestion.text)
+ if word_match:
+ b.insert_text(word_match.group())
+
+ return kb
diff --git a/src/prompt_toolkit/key_binding/bindings/basic.py b/src/prompt_toolkit/key_binding/bindings/basic.py
index 9c3870ef..08e12b24 100644
--- a/src/prompt_toolkit/key_binding/bindings/basic.py
+++ b/src/prompt_toolkit/key_binding/bindings/basic.py
@@ -12,4 +12,4 @@ E = KeyPressEvent
def if_no_repeat(event: E) ->bool:
"""Callable that returns True when the previous event was delivered to
another handler."""
- pass
+ return event.is_first_press
diff --git a/src/prompt_toolkit/key_binding/bindings/completion.py b/src/prompt_toolkit/key_binding/bindings/completion.py
index e88dca52..27557e75 100644
--- a/src/prompt_toolkit/key_binding/bindings/completion.py
+++ b/src/prompt_toolkit/key_binding/bindings/completion.py
@@ -24,7 +24,20 @@ def generate_completions(event: E) ->None:
Tab-completion: where the first tab completes the common suffix and the
second tab lists all the completions.
"""
- pass
+ b = event.current_buffer
+
+ # If we are at the end of the input, and there's no selection, then we can
+ # safely insert the common suffix, otherwise, only list the completions.
+ if b.cursor_position == len(b.text) and not b.selection_state:
+ completions = list(b.completer.get_completions(b.document, CompleteEvent()))
+ if completions:
+ common_suffix = get_common_complete_suffix(b.document, completions)
+ if common_suffix:
+ b.insert_text(common_suffix)
+ else:
+ b.start_completion(select_first=False)
+ else:
+ b.start_completion(select_first=False)
def display_completions_like_readline(event: E) ->None:
@@ -41,7 +54,11 @@ def display_completions_like_readline(event: E) ->None:
# Call this handler when 'Tab' has been pressed.
key_bindings.add(Keys.ControlI)(display_completions_like_readline)
"""
- pass
+ b = event.current_buffer
+ completions = list(b.completer.get_completions(b.document, CompleteEvent()))
+
+ if completions:
+ asyncio.ensure_future(_display_completions_like_readline(event.app, completions))
def _display_completions_like_readline(app: Application[object],
@@ -51,11 +68,71 @@ def _display_completions_like_readline(app: Application[object],
This will ask for a confirmation if there are too many completions to fit
on a single page and provide a paginator to walk through them.
"""
- pass
+ async def run():
+ loop = asyncio.get_event_loop()
+ term_size = app.output.get_size()
+ page_size = term_size.rows - 1 # Leave one row for the prompt
+ completions_per_page = page_size * (term_size.columns // 20)
+ pages = math.ceil(len(completions) / completions_per_page)
+
+ if pages > 1:
+ show_meta = await loop.run_in_executor(None, _create_more_session().prompt)
+ if not show_meta:
+ return
+
+ def format_completion(completion: Completion) -> StyleAndTextTuples:
+ return [('class:completion', completion.text)]
+
+ for page in range(pages):
+ start = page * completions_per_page
+ end = min((page + 1) * completions_per_page, len(completions))
+ page_completions = completions[start:end]
+
+ formatted_completions = [format_completion(c) for c in page_completions]
+ if page < pages - 1:
+ formatted_completions.append([('class:completion', '--MORE--')])
+
+ with in_terminal():
+ app.print_formatted_text(formatted_completions, style='class:completion-menu')
+
+ if page < pages - 1:
+ show_next = await loop.run_in_executor(None, _create_more_session().prompt)
+ if not show_next:
+ break
+
+ return asyncio.create_task(run())
def _create_more_session(message: str='--MORE--') ->PromptSession[bool]:
"""
Create a `PromptSession` object for displaying the "--MORE--".
"""
- pass
+ from prompt_toolkit.shortcuts import PromptSession
+
+ bindings = KeyBindings()
+
+ @bindings.add(' ')
+ @bindings.add('y')
+ @bindings.add('Y')
+ @bindings.add(Keys.Enter)
+ @bindings.add(Keys.ControlJ)
+ def _(event):
+ event.app.exit(result=True)
+
+ @bindings.add('n')
+ @bindings.add('N')
+ @bindings.add('q')
+ @bindings.add('Q')
+ @bindings.add(Keys.ControlC)
+ def _(event):
+ event.app.exit(result=False)
+
+ @bindings.add(Keys.Any)
+ def _(event):
+ " Disable inserting of text. "
+
+ return PromptSession(
+ message,
+ key_bindings=bindings,
+ erase_when_done=True,
+ )
diff --git a/src/prompt_toolkit/key_binding/bindings/emacs.py b/src/prompt_toolkit/key_binding/bindings/emacs.py
index c03fc0ea..46eebe86 100644
--- a/src/prompt_toolkit/key_binding/bindings/emacs.py
+++ b/src/prompt_toolkit/key_binding/bindings/emacs.py
@@ -18,11 +18,63 @@ def load_emacs_bindings() ->KeyBindingsBase:
"""
Some e-macs extensions.
"""
- pass
+ kb = KeyBindings()
+
+ @kb.add('c-x', 'c-e')
+ def _(event: E) -> None:
+ """Open editor."""
+ event.app.run_system_command('editor')
+
+ @kb.add('c-x', 'c-f')
+ def _(event: E) -> None:
+ """Find file."""
+ event.app.run_system_command('find_file')
+
+ @kb.add('c-x', 'c-s')
+ def _(event: E) -> None:
+ """Save file."""
+ event.app.run_system_command('save_file')
+
+ @kb.add('c-x', 'c-c')
+ def _(event: E) -> None:
+ """Quit."""
+ event.app.exit()
+
+ return ConditionalKeyBindings(kb, emacs_mode)
def load_emacs_shift_selection_bindings() ->KeyBindingsBase:
"""
Bindings to select text with shift + cursor movements
"""
- pass
+ kb = KeyBindings()
+
+ @kb.add('s-left')
+ def _(event: E) -> None:
+ """Move cursor left and select."""
+ buff = event.current_buffer
+ buff.cursor_position += buff.document.get_cursor_left_position(count=event.arg)
+ buff.start_selection()
+
+ @kb.add('s-right')
+ def _(event: E) -> None:
+ """Move cursor right and select."""
+ buff = event.current_buffer
+ buff.cursor_position += buff.document.get_cursor_right_position(count=event.arg)
+ buff.start_selection()
+
+ @kb.add('s-up')
+ def _(event: E) -> None:
+ """Move cursor up and select."""
+ buff = event.current_buffer
+ buff.cursor_up(count=event.arg)
+ buff.start_selection()
+
+ @kb.add('s-down')
+ def _(event: E) -> None:
+ """Move cursor down and select."""
+ buff = event.current_buffer
+ buff.cursor_down(count=event.arg)
+ buff.start_selection()
+
+ return ConditionalKeyBindings(kb, emacs_mode & shift_selection_mode)
diff --git a/src/prompt_toolkit/key_binding/bindings/focus.py b/src/prompt_toolkit/key_binding/bindings/focus.py
index 9d636a30..ae44abb3 100644
--- a/src/prompt_toolkit/key_binding/bindings/focus.py
+++ b/src/prompt_toolkit/key_binding/bindings/focus.py
@@ -9,7 +9,7 @@ def focus_next(event: E) ->None:
Focus the next visible Window.
(Often bound to the `Tab` key.)
"""
- pass
+ event.app.layout.focus_next()
def focus_previous(event: E) ->None:
@@ -17,4 +17,4 @@ def focus_previous(event: E) ->None:
Focus the previous visible Window.
(Often bound to the `BackTab` key.)
"""
- pass
+ event.app.layout.focus_previous()
diff --git a/src/prompt_toolkit/key_binding/bindings/mouse.py b/src/prompt_toolkit/key_binding/bindings/mouse.py
index 03ff5d6f..20a15317 100644
--- a/src/prompt_toolkit/key_binding/bindings/mouse.py
+++ b/src/prompt_toolkit/key_binding/bindings/mouse.py
@@ -113,4 +113,48 @@ def load_mouse_bindings() ->KeyBindings:
Key bindings, required for mouse support.
(Mouse events enter through the key binding system.)
"""
- pass
+ key_bindings = KeyBindings()
+
+ @key_bindings.add(Keys.Any)
+ def _(event: E) -> NotImplementedOrNone:
+ """
+ Catch mouse events.
+ """
+ if event.key_sequence[0].key == Keys.WindowsMouseEvent:
+ return _handle_mouse_event(event, system="windows")
+ elif event.key_sequence[0].key == Keys.VtMouseEvent:
+ return _handle_mouse_event(event, system="vt")
+ return NotImplemented
+
+ return key_bindings
+
+def _handle_mouse_event(event: E, system: str) -> None:
+ """
+ Handle mouse events for both Windows and VT systems.
+ """
+ # Get the parsed mouse event.
+ mouse_event = event.key_sequence[0].data
+
+ if system == "windows":
+ # Windows systems
+ x = mouse_event.position.x
+ y = mouse_event.position.y
+ button = mouse_event.button
+ event_type = mouse_event.event_type
+ modifiers = mouse_event.modifiers
+ else:
+ # VT systems
+ x = mouse_event.x
+ y = mouse_event.y
+ button = mouse_event.button
+ event_type = mouse_event.event_type
+ modifiers = mouse_event.modifiers
+
+ # Create a MouseEvent instance
+ mouse_event = MouseEvent(position=Point(x=x, y=y),
+ event_type=event_type,
+ button=button,
+ modifiers=modifiers)
+
+ # Call the mouse handler
+ event.app.mouse_handlers.mouse_click(mouse_event)
diff --git a/src/prompt_toolkit/key_binding/bindings/named_commands.py b/src/prompt_toolkit/key_binding/bindings/named_commands.py
index 5b59883f..813b24c0 100644
--- a/src/prompt_toolkit/key_binding/bindings/named_commands.py
+++ b/src/prompt_toolkit/key_binding/bindings/named_commands.py
@@ -26,14 +26,17 @@ def register(name: str) ->Callable[[_T], _T]:
"""
Store handler in the `_readline_commands` dictionary.
"""
- pass
+ def decorator(handler: _T) ->_T:
+ _readline_commands[name] = handler
+ return handler
+ return decorator
def get_by_name(name: str) ->Binding:
"""
Return the handler for the (Readline) command with the given name.
"""
- pass
+ return _readline_commands.get(name, lambda event: None)
@register('beginning-of-buffer')
@@ -41,7 +44,7 @@ def beginning_of_buffer(event: E) ->None:
"""
Move to the start of the buffer.
"""
- pass
+ event.current_buffer.cursor_position = 0
@register('end-of-buffer')
@@ -49,7 +52,7 @@ def end_of_buffer(event: E) ->None:
"""
Move to the end of the buffer.
"""
- pass
+ event.current_buffer.cursor_position = len(event.current_buffer.text)
@register('beginning-of-line')
@@ -57,7 +60,7 @@ def beginning_of_line(event: E) ->None:
"""
Move to the start of the current line.
"""
- pass
+ event.current_buffer.cursor_position = event.current_buffer.document.get_start_of_line_position()
@register('end-of-line')
@@ -65,7 +68,7 @@ def end_of_line(event: E) ->None:
"""
Move to the end of the line.
"""
- pass
+ event.current_buffer.cursor_position = event.current_buffer.document.get_end_of_line_position()
@register('forward-char')
@@ -73,13 +76,13 @@ def forward_char(event: E) ->None:
"""
Move forward a character.
"""
- pass
+ event.current_buffer.cursor_right()
@register('backward-char')
def backward_char(event: E) ->None:
"""Move back a character."""
- pass
+ event.current_buffer.cursor_left()
@register('forward-word')
@@ -88,7 +91,7 @@ def forward_word(event: E) ->None:
Move forward to the end of the next word. Words are composed of letters and
digits.
"""
- pass
+ event.current_buffer.cursor_right(event.current_buffer.document.find_next_word_ending())
@register('backward-word')
@@ -97,7 +100,7 @@ def backward_word(event: E) ->None:
Move back to the start of the current or previous word. Words are composed
of letters and digits.
"""
- pass
+ event.current_buffer.cursor_left(event.current_buffer.document.find_previous_word_beginning())
@register('clear-screen')
@@ -105,7 +108,7 @@ def clear_screen(event: E) ->None:
"""
Clear the screen and redraw everything at the top of the screen.
"""
- pass
+ event.app.renderer.clear()
@register('redraw-current-line')
@@ -114,7 +117,7 @@ def redraw_current_line(event: E) ->None:
Refresh the current line.
(Readline defines this command, but prompt-toolkit doesn't have it.)
"""
- pass
+ event.app.invalidate()
@register('accept-line')
@@ -122,7 +125,7 @@ def accept_line(event: E) ->None:
"""
Accept the line regardless of where the cursor is.
"""
- pass
+ event.current_buffer.validate_and_handle()
@register('previous-history')
@@ -130,7 +133,7 @@ def previous_history(event: E) ->None:
"""
Move `back` through the history list, fetching the previous command.
"""
- pass
+ event.current_buffer.history_backward()
@register('next-history')
@@ -138,7 +141,7 @@ def next_history(event: E) ->None:
"""
Move `forward` through the history list, fetching the next command.
"""
- pass
+ event.current_buffer.history_forward()
@register('beginning-of-history')
@@ -146,7 +149,7 @@ def beginning_of_history(event: E) ->None:
"""
Move to the first line in the history.
"""
- pass
+ event.current_buffer.go_to_history(0)
@register('end-of-history')
@@ -154,7 +157,7 @@ def end_of_history(event: E) ->None:
"""
Move to the end of the input history, i.e., the line currently being entered.
"""
- pass
+ event.current_buffer.history_forward(count=len(event.current_buffer._working_lines) - 1)
@register('reverse-search-history')
@@ -163,15 +166,16 @@ def reverse_search_history(event: E) ->None:
Search backward starting at the current line and moving `up` through
the history as necessary. This is an incremental search.
"""
- pass
+ event.app.layout.focus(event.app.layout.search_buffer)
+ event.app.vi_state.input_mode = EditingMode.INSERT
@register('end-of-file')
-def end_of_file(event: E) ->None:
+def end-of-file(event: E) ->None:
"""
Exit.
"""
- pass
+ event.app.exit()
@register('delete-char')
diff --git a/src/prompt_toolkit/key_binding/bindings/open_in_editor.py b/src/prompt_toolkit/key_binding/bindings/open_in_editor.py
index 6199de8b..6f9c7429 100644
--- a/src/prompt_toolkit/key_binding/bindings/open_in_editor.py
+++ b/src/prompt_toolkit/key_binding/bindings/open_in_editor.py
@@ -13,18 +13,35 @@ def load_open_in_editor_bindings() ->KeyBindingsBase:
"""
Load both the Vi and emacs key bindings for handling edit-and-execute-command.
"""
- pass
+ return merge_key_bindings([
+ load_emacs_open_in_editor_bindings(),
+ load_vi_open_in_editor_bindings(),
+ ])
def load_emacs_open_in_editor_bindings() ->KeyBindings:
"""
Pressing C-X C-E will open the buffer in an external editor.
"""
- pass
+ kb = KeyBindings()
+
+ @kb.add('c-x', 'c-e', filter=emacs_mode & ~has_selection)
+ def _(event):
+ """Edit and execute command."""
+ event.current_buffer.open_in_editor(event.cli)
+
+ return kb
def load_vi_open_in_editor_bindings() ->KeyBindings:
"""
Pressing 'v' in navigation mode will open the buffer in an external editor.
"""
- pass
+ kb = KeyBindings()
+
+ @kb.add('v', filter=vi_navigation_mode & ~has_selection)
+ def _(event):
+ """Edit and execute command."""
+ event.current_buffer.open_in_editor(event.cli)
+
+ return kb
diff --git a/src/prompt_toolkit/key_binding/bindings/page_navigation.py b/src/prompt_toolkit/key_binding/bindings/page_navigation.py
index 302f662b..c5f525fb 100644
--- a/src/prompt_toolkit/key_binding/bindings/page_navigation.py
+++ b/src/prompt_toolkit/key_binding/bindings/page_navigation.py
@@ -14,7 +14,10 @@ def load_page_navigation_bindings() ->KeyBindingsBase:
"""
Load both the Vi and Emacs bindings for page navigation.
"""
- pass
+ return merge_key_bindings([
+ load_emacs_page_navigation_bindings(),
+ load_vi_page_navigation_bindings()
+ ])
def load_emacs_page_navigation_bindings() ->KeyBindingsBase:
@@ -22,7 +25,39 @@ def load_emacs_page_navigation_bindings() ->KeyBindingsBase:
Key bindings, for scrolling up and down through pages.
This are separate bindings, because GNU readline doesn't have them.
"""
- pass
+ kb = KeyBindings()
+
+ @kb.add('c-v')
+ def _(event):
+ " Scroll half page down. "
+ scroll_half_page_down(event)
+
+ @kb.add('pagedown')
+ def _(event):
+ " Scroll one page down. "
+ scroll_page_down(event)
+
+ @kb.add('escape', 'v')
+ def _(event):
+ " Scroll half page up. "
+ scroll_half_page_up(event)
+
+ @kb.add('pageup')
+ def _(event):
+ " Scroll one page up. "
+ scroll_page_up(event)
+
+ @kb.add('escape', '>')
+ def _(event):
+ " Scroll to bottom. "
+ scroll_forward(event, count=1000000)
+
+ @kb.add('escape', '<')
+ def _(event):
+ " Scroll to top. "
+ scroll_backward(event, count=1000000)
+
+ return ConditionalKeyBindings(kb, emacs_mode & buffer_has_focus)
def load_vi_page_navigation_bindings() ->KeyBindingsBase:
@@ -30,4 +65,36 @@ def load_vi_page_navigation_bindings() ->KeyBindingsBase:
Key bindings, for scrolling up and down through pages.
This are separate bindings, because GNU readline doesn't have them.
"""
- pass
+ kb = KeyBindings()
+
+ @kb.add('c-f')
+ def _(event):
+ " Scroll one page down. "
+ scroll_page_down(event)
+
+ @kb.add('c-b')
+ def _(event):
+ " Scroll one page up. "
+ scroll_page_up(event)
+
+ @kb.add('c-d')
+ def _(event):
+ " Scroll half page down. "
+ scroll_half_page_down(event)
+
+ @kb.add('c-u')
+ def _(event):
+ " Scroll half page up. "
+ scroll_half_page_up(event)
+
+ @kb.add('c-e')
+ def _(event):
+ " Scroll one line down. "
+ scroll_one_line_down(event)
+
+ @kb.add('c-y')
+ def _(event):
+ " Scroll one line up. "
+ scroll_one_line_up(event)
+
+ return ConditionalKeyBindings(kb, vi_mode & buffer_has_focus)
diff --git a/src/prompt_toolkit/key_binding/bindings/scroll.py b/src/prompt_toolkit/key_binding/bindings/scroll.py
index c1d1ada6..27d17576 100644
--- a/src/prompt_toolkit/key_binding/bindings/scroll.py
+++ b/src/prompt_toolkit/key_binding/bindings/scroll.py
@@ -16,53 +16,112 @@ def scroll_forward(event: E, half: bool=False) ->None:
"""
Scroll window down.
"""
- pass
+ window = event.app.layout.current_window
+ if window:
+ info = window.render_info
+ if info:
+ amount = info.window_height // 2 if half else info.window_height
+ new_scroll = min(info.content_height - info.window_height,
+ info.vertical_scroll + amount)
+ window.vertical_scroll = new_scroll
def scroll_backward(event: E, half: bool=False) ->None:
"""
Scroll window up.
"""
- pass
+ window = event.app.layout.current_window
+ if window:
+ info = window.render_info
+ if info:
+ amount = info.window_height // 2 if half else info.window_height
+ new_scroll = max(0, info.vertical_scroll - amount)
+ window.vertical_scroll = new_scroll
def scroll_half_page_down(event: E) ->None:
"""
Same as ControlF, but only scroll half a page.
"""
- pass
+ scroll_forward(event, half=True)
def scroll_half_page_up(event: E) ->None:
"""
Same as ControlB, but only scroll half a page.
"""
- pass
+ scroll_backward(event, half=True)
def scroll_one_line_down(event: E) ->None:
"""
scroll_offset += 1
"""
- pass
+ window = event.app.layout.current_window
+ if window:
+ info = window.render_info
+ if info:
+ new_scroll = min(info.content_height - info.window_height,
+ info.vertical_scroll + 1)
+ window.vertical_scroll = new_scroll
def scroll_one_line_up(event: E) ->None:
"""
scroll_offset -= 1
"""
- pass
+ window = event.app.layout.current_window
+ if window:
+ info = window.render_info
+ if info:
+ new_scroll = max(0, info.vertical_scroll - 1)
+ window.vertical_scroll = new_scroll
def scroll_page_down(event: E) ->None:
"""
Scroll page down. (Prefer the cursor at the top of the page, after scrolling.)
"""
- pass
+ window = event.app.layout.current_window
+ b = event.app.current_buffer
+ if window and b:
+ info = window.render_info
+ if info:
+ # Scroll down one page, but keep one overlap line.
+ overlap = 1
+ new_scroll = min(info.content_height - info.window_height,
+ info.vertical_scroll + info.window_height - overlap)
+ window.vertical_scroll = new_scroll
+
+ # Put cursor at the top of the visible region.
+ try:
+ new_document_line = b.document.translate_row_col_to_index(
+ info.first_visible_line(), 0
+ )
+ b.cursor_position = new_document_line
+ except IndexError:
+ pass
def scroll_page_up(event: E) ->None:
"""
Scroll page up. (Prefer the cursor at the bottom of the page, after scrolling.)
"""
- pass
+ window = event.app.layout.current_window
+ b = event.app.current_buffer
+ if window and b:
+ info = window.render_info
+ if info:
+ # Scroll up one page, but keep one overlap line.
+ overlap = 1
+ new_scroll = max(0, info.vertical_scroll - info.window_height + overlap)
+ window.vertical_scroll = new_scroll
+
+ # Put cursor at the bottom of the visible region.
+ try:
+ new_document_line = b.document.translate_row_col_to_index(
+ info.last_visible_line(), 0
+ )
+ b.cursor_position = new_document_line
+ except IndexError:
+ pass
diff --git a/src/prompt_toolkit/key_binding/bindings/search.py b/src/prompt_toolkit/key_binding/bindings/search.py
index 3df6124e..545f7144 100644
--- a/src/prompt_toolkit/key_binding/bindings/search.py
+++ b/src/prompt_toolkit/key_binding/bindings/search.py
@@ -21,7 +21,10 @@ def abort_search(event: E) ->None:
line.
(Usually bound to ControlG/ControlC.)
"""
- pass
+ search_state = event.app.current_search_state
+ if search_state:
+ search_state.abort()
+ event.app.layout.focus_previous()
@key_binding(filter=is_searching)
@@ -31,7 +34,10 @@ def accept_search(event: E) ->None:
isearch would be too complicated.)
(Usually bound to Enter.)
"""
- pass
+ search_state = event.app.current_search_state
+ if search_state:
+ search_state.apply_search()
+ event.app.layout.focus_previous()
@key_binding(filter=control_is_searchable)
@@ -40,7 +46,13 @@ def start_reverse_incremental_search(event: E) ->None:
Enter reverse incremental search.
(Usually ControlR.)
"""
- pass
+ search_state = event.app.current_search_state
+ if search_state is None:
+ search_state = search.SearchState(direction=search.SearchDirection.BACKWARD)
+ event.app.current_search_state = search_state
+ else:
+ search_state.direction = search.SearchDirection.BACKWARD
+ event.app.layout.focus(search_state.control)
@key_binding(filter=control_is_searchable)
@@ -49,7 +61,13 @@ def start_forward_incremental_search(event: E) ->None:
Enter forward incremental search.
(Usually ControlS.)
"""
- pass
+ search_state = event.app.current_search_state
+ if search_state is None:
+ search_state = search.SearchState(direction=search.SearchDirection.FORWARD)
+ event.app.current_search_state = search_state
+ else:
+ search_state.direction = search.SearchDirection.FORWARD
+ event.app.layout.focus(search_state.control)
@key_binding(filter=is_searching)
@@ -57,7 +75,10 @@ def reverse_incremental_search(event: E) ->None:
"""
Apply reverse incremental search, but keep search buffer focused.
"""
- pass
+ search_state = event.app.current_search_state
+ if search_state:
+ search_state.direction = search.SearchDirection.BACKWARD
+ search_state.apply_search()
@key_binding(filter=is_searching)
@@ -65,7 +86,10 @@ def forward_incremental_search(event: E) ->None:
"""
Apply forward incremental search, but keep search buffer focused.
"""
- pass
+ search_state = event.app.current_search_state
+ if search_state:
+ search_state.direction = search.SearchDirection.FORWARD
+ search_state.apply_search()
@Condition
@@ -73,7 +97,9 @@ def _previous_buffer_is_returnable() ->bool:
"""
True if the previously focused buffer has a return handler.
"""
- pass
+ app = get_app()
+ prev_control = app.layout.previous_control
+ return prev_control is not None and prev_control.buffer.is_returnable
@key_binding(filter=is_searching & _previous_buffer_is_returnable)
@@ -81,4 +107,8 @@ def accept_search_and_accept_input(event: E) ->None:
"""
Accept the search operation first, then accept the input.
"""
- pass
+ search_state = event.app.current_search_state
+ if search_state:
+ search_state.apply_search()
+ event.app.layout.focus_previous()
+ event.app.current_buffer.validate_and_handle()
diff --git a/src/prompt_toolkit/key_binding/bindings/vi.py b/src/prompt_toolkit/key_binding/bindings/vi.py
index bdf5ffe0..3e9b4a1a 100644
--- a/src/prompt_toolkit/key_binding/bindings/vi.py
+++ b/src/prompt_toolkit/key_binding/bindings/vi.py
@@ -48,7 +48,7 @@ class TextObject:
"""
Return a (start, end) tuple where start <= end.
"""
- pass
+ return (min(self.start, self.end), max(self.start, self.end))
def operator_range(self, document: Document) ->tuple[int, int]:
"""
@@ -59,19 +59,57 @@ class TextObject:
This should return something that can be used in a slice, so the `end`
position is *not* included.
"""
- pass
+ start, end = self.sorted()
+ cursor_position = document.cursor_position
+
+ if self.type == TextObjectType.EXCLUSIVE:
+ return (cursor_position + start, cursor_position + end + 1)
+ elif self.type == TextObjectType.INCLUSIVE:
+ return (cursor_position + start, cursor_position + end + 1)
+ elif self.type == TextObjectType.LINEWISE:
+ start_line = document.line_count - 1 if start < 0 else document.cursor_position_row + start
+ end_line = document.line_count - 1 if end < 0 else document.cursor_position_row + end
+ return (document.get_start_of_line_position(start_line),
+ min(document.get_end_of_line_position(end_line) + 1, len(document.text)))
+ else: # BLOCK
+ return (cursor_position + start, cursor_position + end + 1)
def get_line_numbers(self, buffer: Buffer) ->tuple[int, int]:
"""
Return a (start_line, end_line) pair.
"""
- pass
+ document = buffer.document
+ start, end = self.sorted()
+ cursor_row = document.cursor_position_row
+
+ if self.type in (TextObjectType.EXCLUSIVE, TextObjectType.INCLUSIVE):
+ start_line = document.translate_row_col_to_index(cursor_row + start, 0)
+ end_line = document.translate_row_col_to_index(cursor_row + end, 0)
+ elif self.type == TextObjectType.LINEWISE:
+ start_line = max(0, cursor_row + start)
+ end_line = min(document.line_count - 1, cursor_row + end)
+ else: # BLOCK
+ start_line = cursor_row
+ end_line = cursor_row + end
+
+ return (start_line, end_line)
def cut(self, buffer: Buffer) ->tuple[Document, ClipboardData]:
"""
Turn text object into `ClipboardData` instance.
"""
- pass
+ start, end = self.operator_range(buffer.document)
+ text = buffer.text[start:end]
+
+ if self.type == TextObjectType.LINEWISE:
+ text += '\n'
+
+ new_document = Document(
+ text=buffer.text[:start] + buffer.text[end:],
+ cursor_position=start
+ )
+
+ return new_document, ClipboardData(text, self.type)
TextObjectFunction = Callable[[E], TextObject]
@@ -83,7 +121,23 @@ def create_text_object_decorator(key_bindings: KeyBindings) ->Callable[...,
"""
Create a decorator that can be used to register Vi text object implementations.
"""
- pass
+ def decorator(*keys: str, filter: Filter=Always(), eager: bool=False):
+ def wrapper(func: _TOF) -> _TOF:
+ @key_bindings.add(*keys, filter=filter & vi_waiting_for_text_object_mode, eager=eager)
+ def _(event: E) -> None:
+ if event.app.vi_state.operator_func:
+ text_object = func(event)
+ event.app.vi_state.operator_func(event, text_object)
+ event.app.vi_state.operator_func = None
+ event.app.vi_state.operator_arg = None
+ else:
+ # Move cursor.
+ text_object = func(event)
+ start, end = text_object.operator_range(event.app.current_buffer.document)
+ event.app.current_buffer.cursor_position += start
+ return func
+ return wrapper
+ return decorator
OperatorFunction = Callable[[E, TextObject], None]
@@ -95,7 +149,16 @@ def create_operator_decorator(key_bindings: KeyBindings) ->Callable[...,
"""
Create a decorator that can be used for registering Vi operators.
"""
- pass
+ def decorator(*keys: str, filter: Filter=Always(), eager: bool=False):
+ def wrapper(func: _OF) -> _OF:
+ @key_bindings.add(*keys, filter=filter & vi_navigation_mode, eager=eager)
+ def _(event: E) -> None:
+ event.app.vi_state.operator_func = func
+ event.app.vi_state.operator_arg = event.arg
+
+ return func
+ return wrapper
+ return decorator
def load_vi_bindings() ->KeyBindingsBase:
diff --git a/src/prompt_toolkit/key_binding/defaults.py b/src/prompt_toolkit/key_binding/defaults.py
index 020b28c1..90dc0244 100644
--- a/src/prompt_toolkit/key_binding/defaults.py
+++ b/src/prompt_toolkit/key_binding/defaults.py
@@ -19,4 +19,22 @@ def load_key_bindings() ->KeyBindingsBase:
"""
Create a KeyBindings object that contains the default key bindings.
"""
- pass
+ return merge_key_bindings([
+ # Load basic bindings.
+ load_basic_bindings(),
+
+ # Load emacs bindings.
+ ConditionalKeyBindings(load_emacs_bindings(), ~buffer_has_focus),
+ ConditionalKeyBindings(load_emacs_search_bindings(), ~buffer_has_focus),
+ ConditionalKeyBindings(load_emacs_shift_selection_bindings(), ~buffer_has_focus),
+
+ # Load Vi bindings.
+ ConditionalKeyBindings(load_vi_bindings(), ~buffer_has_focus),
+ ConditionalKeyBindings(load_vi_search_bindings(), ~buffer_has_focus),
+
+ # Load mouse bindings.
+ load_mouse_bindings(),
+
+ # Load CPR bindings.
+ load_cpr_bindings(),
+ ])
diff --git a/src/prompt_toolkit/key_binding/emacs_state.py b/src/prompt_toolkit/key_binding/emacs_state.py
index d60cbeb7..66eb5ce1 100644
--- a/src/prompt_toolkit/key_binding/emacs_state.py
+++ b/src/prompt_toolkit/key_binding/emacs_state.py
@@ -15,12 +15,15 @@ class EmacsState:
@property
def is_recording(self) ->bool:
"""Tell whether we are recording a macro."""
- pass
+ return self.current_recording is not None
def start_macro(self) ->None:
"""Start recording macro."""
- pass
+ if not self.is_recording:
+ self.current_recording = []
def end_macro(self) ->None:
"""End recording macro."""
- pass
+ if self.is_recording:
+ self.macro = self.current_recording
+ self.current_recording = None
diff --git a/src/prompt_toolkit/key_binding/key_bindings.py b/src/prompt_toolkit/key_binding/key_bindings.py
index 9d241939..b6c0d10e 100644
--- a/src/prompt_toolkit/key_binding/key_bindings.py
+++ b/src/prompt_toolkit/key_binding/key_bindings.py
@@ -115,7 +115,10 @@ class KeyBindingsBase(metaclass=ABCMeta):
:param keys: tuple of keys.
"""
- pass
+ def get():
+ return [b for b in self._bindings if b.keys[:len(keys)] == keys and len(b.keys) > len(keys)]
+
+ return self._get_bindings_starting_with_keys_cache.get(keys, get)
@abstractproperty
def bindings(self) ->list[Binding]:
@@ -182,7 +185,15 @@ class KeyBindings(KeyBindingsBase):
:param record_in_macro: Record these key bindings when a macro is
being recorded. (True by default.)
"""
- pass
+ def decorator(func: T) -> T:
+ if callable(func):
+ binding = Binding(keys, func, filter=filter, eager=eager,
+ is_global=is_global, save_before=save_before,
+ record_in_macro=record_in_macro)
+ self._bindings.append(binding)
+ self.__version += 1
+ return func
+ return decorator
def remove(self, *args: (Keys | str | KeyHandlerCallable)) ->None:
"""
@@ -198,7 +209,20 @@ class KeyBindings(KeyBindingsBase):
remove(handler) # Pass handler.
remove('c-x', 'c-a') # Or pass the key bindings.
"""
- pass
+ if len(args) == 1 and callable(args[0]):
+ handler = args[0]
+ bindings_to_remove = [b for b in self._bindings if b.handler == handler]
+ else:
+ keys = tuple(_parse_key(k) for k in args)
+ bindings_to_remove = [b for b in self._bindings if b.keys == keys]
+
+ if not bindings_to_remove:
+ raise ValueError("Binding not found")
+
+ for b in bindings_to_remove:
+ self._bindings.remove(b)
+
+ self.__version += 1
add_binding = add
remove_binding = remove
@@ -210,7 +234,10 @@ class KeyBindings(KeyBindingsBase):
:param keys: tuple of keys.
"""
- pass
+ def get():
+ return [b for b in self._bindings if b.keys == keys]
+
+ return self._get_bindings_for_keys_cache.get(keys, get)
def get_bindings_starting_with_keys(self, keys: KeysTuple) ->list[Binding]:
"""
@@ -228,7 +255,16 @@ def _parse_key(key: (Keys | str)) ->(str | Keys):
"""
Replace key by alias and verify whether it's a valid one.
"""
- pass
+ if isinstance(key, Keys):
+ return key
+
+ if key in KEY_ALIASES:
+ return KEY_ALIASES[key]
+
+ if len(key) == 1:
+ return key
+
+ raise ValueError(f"Invalid key: {key}")
def key_binding(filter: FilterOrBool=True, eager: FilterOrBool=False,
@@ -239,7 +275,17 @@ def key_binding(filter: FilterOrBool=True, eager: FilterOrBool=False,
Decorator that turn a function into a `Binding` object. This can be added
to a `KeyBindings` object when a key binding is assigned.
"""
- pass
+ def decorator(func: KeyHandlerCallable) -> Binding:
+ return Binding(
+ keys=(), # Empty tuple, as keys are not known at this point
+ handler=func,
+ filter=filter,
+ eager=eager,
+ is_global=is_global,
+ save_before=save_before,
+ record_in_macro=record_in_macro
+ )
+ return decorator
class _Proxy(KeyBindingsBase):
diff --git a/src/prompt_toolkit/key_binding/key_processor.py b/src/prompt_toolkit/key_binding/key_processor.py
index 4104a4f6..586886d1 100644
--- a/src/prompt_toolkit/key_binding/key_processor.py
+++ b/src/prompt_toolkit/key_binding/key_processor.py
@@ -89,21 +89,49 @@ class KeyProcessor:
For a list of :class:`KeyPress` instances. Give the matching handlers
that would handle this.
"""
- pass
+ return [b for b in self._bindings.get_bindings_for_keys(key_presses) if b.filter()]
def _is_prefix_of_longer_match(self, key_presses: list[KeyPress]) ->bool:
"""
For a list of :class:`KeyPress` instances. Return True if there is any
handler that is bound to a suffix of this keys.
"""
- pass
+ for b in self._bindings.get_bindings_starting_with_keys(key_presses):
+ if b.filter():
+ return True
+ return False
def _process(self) ->Generator[None, KeyPress, None]:
"""
Coroutine implementing the key match algorithm. Key strokes are sent
into this generator, and it calls the appropriate handlers.
"""
- pass
+ buffer: list[KeyPress] = []
+ retry = False
+
+ while True:
+ if retry:
+ retry = False
+ else:
+ key_press = yield
+
+ if key_press is _Flush:
+ self._flush(buffer)
+ buffer = []
+ continue
+
+ buffer.append(key_press)
+
+ matches = self._get_matches(buffer)
+ if matches:
+ self._call_handler(matches[-1], key_sequence=buffer)
+ buffer = []
+ elif self._is_prefix_of_longer_match(buffer):
+ retry = True
+ else:
+ retry = True
+ self._flush(buffer)
+ buffer = []
def feed(self, key_press: KeyPress, first: bool=False) ->None:
"""
@@ -112,14 +140,20 @@ class KeyProcessor:
:param first: If true, insert before everything else.
"""
- pass
+ if first:
+ self.input_queue.appendleft(key_press)
+ else:
+ self.input_queue.append(key_press)
def feed_multiple(self, key_presses: list[KeyPress], first: bool=False
) ->None:
"""
:param first: If true, insert before everything else.
"""
- pass
+ if first:
+ self.input_queue.extendleft(reversed(key_presses))
+ else:
+ self.input_queue.extend(key_presses)
def process_keys(self) ->None:
"""
@@ -130,13 +164,17 @@ class KeyProcessor:
possible to call `feed` from inside a key binding.
This function keeps looping until the queue is empty.
"""
- pass
+ while self.input_queue:
+ key_press = self.input_queue.popleft()
+ self._process().send(key_press)
def empty_queue(self) ->list[KeyPress]:
"""
Empty the input queue. Return the unprocessed input.
"""
- pass
+ key_presses = list(self.input_queue)
+ self.input_queue.clear()
+ return key_presses
def _fix_vi_cursor_position(self, event: KeyPressEvent) ->None:
"""
@@ -144,14 +182,23 @@ class KeyProcessor:
never put the cursor after the last character of a line. (Unless it's
an empty line.)
"""
- pass
+ app = event.app
+ buff = app.current_buffer
+ if (vi_navigation_mode() and buff.document.is_cursor_at_the_end_of_line
+ and len(buff.document.current_line) > 0):
+ buff.cursor_position -= 1
def _leave_vi_temp_navigation_mode(self, event: KeyPressEvent) ->None:
"""
If we're in Vi temporary navigation (normal) mode, return to
insert/replace mode after executing one action.
"""
- pass
+ app = event.app
+ if app.editing_mode == EditingMode.VI and not event.is_repeat:
+ vi_state = app.vi_state
+ if vi_state.temporary_navigation_mode:
+ vi_state.temporary_navigation_mode = False
+ app.vi_state.input_mode = vi_state.original_input_mode
def _start_timeout(self) ->None:
"""
@@ -161,13 +208,22 @@ class KeyProcessor:
and no key was pressed in the meantime, we flush all data in the queue
and call the appropriate key binding handlers.
"""
- pass
+ async def auto_flush() ->None:
+ await sleep(self._timeout)
+ if self._flush_wait_task and not self._flush_wait_task.done():
+ self.feed(_Flush)
+ self.process_keys()
+
+ if self._timeout is not None:
+ self._flush_wait_task = get_app().create_background_task(auto_flush())
def send_sigint(self) ->None:
"""
Send SIGINT. Immediately call the SIGINT key handler.
"""
- pass
+ key_press = KeyPress(Keys.ControlC, '\x03')
+ self.feed(key_press)
+ self.process_keys()
class KeyPressEvent:
@@ -200,28 +256,30 @@ class KeyPressEvent:
"""
The current `Application` object.
"""
- pass
+ return self._app
@property
def current_buffer(self) ->Buffer:
"""
The current buffer.
"""
- pass
+ return self._app.current_buffer
@property
def arg(self) ->int:
"""
Repetition argument.
"""
- pass
+ if self._arg:
+ return int(self._arg)
+ return 1
@property
def arg_present(self) ->bool:
"""
True if repetition argument was explicitly provided.
"""
- pass
+ return self._arg is not None
def append_to_arg_count(self, data: str) ->None:
"""
@@ -229,9 +287,11 @@ class KeyPressEvent:
:param data: the typed digit as string
"""
- pass
+ if self._arg is None:
+ self._arg = ''
+ self._arg += data
@property
def cli(self) ->Application[Any]:
"""For backward-compatibility."""
- pass
+ return self.app
diff --git a/src/prompt_toolkit/key_binding/vi_state.py b/src/prompt_toolkit/key_binding/vi_state.py
index be95b80e..ce02f438 100644
--- a/src/prompt_toolkit/key_binding/vi_state.py
+++ b/src/prompt_toolkit/key_binding/vi_state.py
@@ -46,15 +46,25 @@ class ViState:
@property
def input_mode(self) ->InputMode:
"""Get `InputMode`."""
- pass
+ return self.__input_mode
@input_mode.setter
def input_mode(self, value: InputMode) ->None:
"""Set `InputMode`."""
- pass
+ self.__input_mode = value
def reset(self) ->None:
"""
Reset state, go back to the given mode. INSERT by default.
"""
- pass
+ self.__input_mode = InputMode.INSERT
+ self.last_character_find = None
+ self.operator_func = None
+ self.operator_arg = None
+ self.named_registers = {}
+ self.waiting_for_digraph = False
+ self.digraph_symbol1 = None
+ self.tilde_operator = False
+ self.recording_register = None
+ self.current_recording = ''
+ self.temporary_navigation_mode = False
diff --git a/src/prompt_toolkit/layout/containers.py b/src/prompt_toolkit/layout/containers.py
index 52be90da..3b4500e8 100644
--- a/src/prompt_toolkit/layout/containers.py
+++ b/src/prompt_toolkit/layout/containers.py
@@ -117,7 +117,11 @@ AnyContainer = Union[Container, 'MagicContainer']
def _window_too_small() ->Window:
"""Create a `Window` that displays the 'Window too small' text."""
- pass
+ return Window(
+ FormattedTextControl(text='Window too small...'),
+ style='class:window-too-small',
+ align=WindowAlign.CENTER
+ )
class VerticalAlign(Enum):
@@ -216,7 +220,16 @@ class HSplit(_Split):
"""
List of child objects, including padding.
"""
- pass
+ def create_padding():
+ return Window(width=self.padding, char=self.padding_char, style=self.padding_style)
+
+ children = []
+ for i, c in enumerate(self.children):
+ if i != 0:
+ children.append(create_padding())
+ children.append(c)
+
+ return children
def write_to_screen(self, screen: Screen, mouse_handlers: MouseHandlers,
write_position: WritePosition, parent_style: str, erase_bg: bool,
@@ -235,7 +248,18 @@ class HSplit(_Split):
Return the heights for all rows.
Or None when there is not enough space.
"""
- pass
+ if self.height is not None:
+ height = to_dimension(self.height).preferred(write_position.height)
+ else:
+ height = write_position.height
+
+ children = self._all_children
+ dimensions = [c.preferred_height(write_position.width, height) for c in children]
+
+ if self.align == VerticalAlign.JUSTIFY:
+ return distribute_weights(dimensions, height)
+ else:
+ return sum_layout_dimensions(dimensions)
class VSplit(_Split):
@@ -293,14 +317,29 @@ class VSplit(_Split):
"""
List of child objects, including padding.
"""
- pass
+ def create_padding():
+ return Window(height=self.padding, char=self.padding_char, style=self.padding_style)
+
+ children = []
+ for i, c in enumerate(self.children):
+ if i != 0:
+ children.append(create_padding())
+ children.append(c)
+
+ return children
def _divide_widths(self, width: int) ->(list[int] | None):
"""
Return the widths for all columns.
Or None when there is not enough space.
"""
- pass
+ children = self._all_children
+ dimensions = [c.preferred_width(width) for c in children]
+
+ if self.align == HorizontalAlign.JUSTIFY:
+ return distribute_weights(dimensions, width)
+ else:
+ return sum_layout_dimensions(dimensions)
def write_to_screen(self, screen: Screen, mouse_handlers: MouseHandlers,
write_position: WritePosition, parent_style: str, erase_bg: bool,
@@ -915,21 +954,33 @@ class DynamicContainer(Container):
We call `to_container`, because `get_container` can also return a
widget with a ``__pt_container__`` method.
"""
- pass
+ return to_container(self.get_container())
def to_container(container: AnyContainer) ->Container:
"""
Make sure that the given object is a :class:`.Container`.
"""
- pass
+ if isinstance(container, Container):
+ return container
+ elif hasattr(container, '__pt_container__'):
+ return to_container(container.__pt_container__())
+ else:
+ raise ValueError('Not a container object: %r' % (container,))
def to_window(container: AnyContainer) ->Window:
"""
Make sure that the given argument is a :class:`.Window`.
"""
- pass
+ if isinstance(container, Window):
+ return container
+ else:
+ container = to_container(container)
+ if isinstance(container, Window):
+ return container
+ else:
+ raise ValueError('Not a Window object: %r' % (container,))
def is_container(value: object) ->TypeGuard[AnyContainer]:
@@ -937,4 +988,4 @@ def is_container(value: object) ->TypeGuard[AnyContainer]:
Checks whether the given value is a container object
(for use in assert statements).
"""
- pass
+ return isinstance(value, Container) or hasattr(value, '__pt_container__')
diff --git a/src/prompt_toolkit/layout/controls.py b/src/prompt_toolkit/layout/controls.py
index d76f0c21..c5fd5626 100644
--- a/src/prompt_toolkit/layout/controls.py
+++ b/src/prompt_toolkit/layout/controls.py
@@ -134,7 +134,25 @@ class UIContent:
when line wrapping.
:returns: The computed height.
"""
- pass
+ # Create a unique key for caching
+ key = (lineno, width, get_line_prefix, slice_stop)
+
+ if key not in self._line_heights_cache:
+ fragments = self.get_line(lineno)
+
+ # Apply line prefix if provided
+ if get_line_prefix:
+ fragments = to_formatted_text(get_line_prefix(lineno, 0)) + fragments
+
+ # Apply slice if provided
+ if slice_stop is not None:
+ fragments = fragments[:slice_stop]
+
+ # Calculate the height
+ text = fragment_list_to_text(fragments)
+ self._line_heights_cache[key] = (len(text) - 1) // width + 1
+
+ return self._line_heights_cache[key]
class FormattedTextControl(UIControl):
@@ -206,14 +224,27 @@ class FormattedTextControl(UIControl):
(This function is called several times during one rendering, because
we also need those for calculating the dimensions.)
"""
- pass
+ # Use SimpleCache to store the fragments
+ key = (self.text, get_app().render_counter)
+
+ def get_formatted_text() ->StyleAndTextTuples:
+ result = to_formatted_text(self.text, style=self.style)
+ self._fragments = result
+ return result
+
+ return self._fragment_cache.get(key, get_formatted_text)
def preferred_width(self, max_available_width: int) ->int:
"""
Return the preferred width for this control.
That is the width of the longest line.
"""
- pass
+ fragments = self._get_formatted_text_cached()
+ text = fragment_list_to_text(fragments)
+ lines = text.splitlines()
+ if not lines:
+ return 0
+ return max(get_cwidth(line) for line in lines)
def preferred_height(self, width: int, max_available_height: int,
wrap_lines: bool, get_line_prefix: (GetLinePrefixCallable | None)) ->(
@@ -221,7 +252,27 @@ class FormattedTextControl(UIControl):
"""
Return the preferred height for this control.
"""
- pass
+ fragments = self._get_formatted_text_cached()
+
+ # If wrapping is disabled, return the number of lines
+ if not wrap_lines:
+ return len(split_lines(fragments))
+
+ # If wrapping is enabled, calculate the wrapped height
+ height = 0
+ for line in split_lines(fragments):
+ # Apply line prefix if provided
+ if get_line_prefix:
+ line = to_formatted_text(get_line_prefix(height, 0)) + line
+
+ # Calculate the height for this line
+ line_text = fragment_list_to_text(line)
+ height += (len(line_text) - 1) // width + 1
+
+ if height >= max_available_height:
+ return max_available_height
+
+ return height
def mouse_handler(self, mouse_event: MouseEvent) ->NotImplementedOrNone:
"""
@@ -233,7 +284,22 @@ class FormattedTextControl(UIControl):
:class:`~prompt_toolkit.layout.Window` to handle this particular
event.)
"""
- pass
+ fragments = self._get_formatted_text_cached()
+ x = mouse_event.position.x
+ y = mouse_event.position.y
+
+ # Find the fragment at the given position
+ for style, text, handler in fragments:
+ if y == 0 and x < len(text):
+ if handler is not None:
+ return handler(get_app(), mouse_event)
+ return NotImplemented
+ x -= len(text)
+ if x < 0:
+ y -= 1
+ x = 9999
+
+ return NotImplemented
class DummyControl(UIControl):
diff --git a/src/prompt_toolkit/layout/dimension.py b/src/prompt_toolkit/layout/dimension.py
index 27439360..d9e5b4ef 100644
--- a/src/prompt_toolkit/layout/dimension.py
+++ b/src/prompt_toolkit/layout/dimension.py
@@ -64,7 +64,7 @@ class Dimension:
Return a :class:`.Dimension` with an exact size. (min, max and
preferred set to ``amount``).
"""
- pass
+ return cls(min=amount, max=amount, preferred=amount)
@classmethod
def zero(cls) ->Dimension:
@@ -72,11 +72,11 @@ class Dimension:
Create a dimension that represents a zero size. (Used for 'invisible'
controls.)
"""
- pass
+ return cls(min=0, max=0, preferred=0)
def is_zero(self) ->bool:
"""True if this `Dimension` represents a zero size."""
- pass
+ return self.max == 0
def __repr__(self) ->str:
fields = []
@@ -95,7 +95,12 @@ def sum_layout_dimensions(dimensions: list[Dimension]) ->Dimension:
"""
Sum a list of :class:`.Dimension` instances.
"""
- pass
+ min_sum = sum(d.min for d in dimensions)
+ max_sum = sum(d.max for d in dimensions)
+ preferred_sum = sum(d.preferred for d in dimensions)
+ weight_sum = sum(d.weight for d in dimensions)
+
+ return Dimension(min=min_sum, max=max_sum, preferred=preferred_sum, weight=weight_sum)
def max_layout_dimensions(dimensions: list[Dimension]) ->Dimension:
@@ -103,7 +108,15 @@ def max_layout_dimensions(dimensions: list[Dimension]) ->Dimension:
Take the maximum of a list of :class:`.Dimension` instances.
Used when we have a HSplit/VSplit, and we want to get the best width/height.)
"""
- pass
+ if not dimensions:
+ return Dimension.zero()
+
+ return Dimension(
+ min=max(d.min for d in dimensions),
+ max=max(d.max for d in dimensions),
+ preferred=max(d.preferred for d in dimensions),
+ weight=max(d.weight for d in dimensions)
+ )
AnyDimension = Union[None, int, Dimension, Callable[[], Any]]
@@ -113,7 +126,15 @@ def to_dimension(value: AnyDimension) ->Dimension:
"""
Turn the given object into a `Dimension` object.
"""
- pass
+ if value is None:
+ return Dimension()
+ if isinstance(value, int):
+ return Dimension.exact(value)
+ if isinstance(value, Dimension):
+ return value
+ if callable(value):
+ return to_dimension(value())
+ raise ValueError(f"Cannot convert {value} to Dimension")
def is_dimension(value: object) ->TypeGuard[AnyDimension]:
@@ -121,7 +142,7 @@ def is_dimension(value: object) ->TypeGuard[AnyDimension]:
Test whether the given value could be a valid dimension.
(For usage in an assertion. It's not guaranteed in case of a callable.)
"""
- pass
+ return value is None or isinstance(value, (int, Dimension)) or callable(value)
D = Dimension
diff --git a/src/prompt_toolkit/layout/dummy.py b/src/prompt_toolkit/layout/dummy.py
index 5c47d1f0..3169a74e 100644
--- a/src/prompt_toolkit/layout/dummy.py
+++ b/src/prompt_toolkit/layout/dummy.py
@@ -19,4 +19,25 @@ def create_dummy_layout() ->Layout:
Create a dummy layout for use in an 'Application' that doesn't have a
layout specified. When ENTER is pressed, the application quits.
"""
- pass
+ kb = KeyBindings()
+
+ @kb.add('enter')
+ def _(event: E) -> None:
+ " Quit when enter is pressed. "
+ event.app.exit()
+
+ dummy_text = HTML(
+ '<style bg="ansiblack" fg="ansiwhite">'
+ 'This is a dummy layout. '
+ 'You did not provide a layout to your Application.\n'
+ 'Press ENTER to quit.'
+ '</style>'
+ )
+
+ window = Window(
+ FormattedTextControl(dummy_text),
+ height=D(min=1),
+ key_bindings=kb,
+ )
+
+ return Layout(window)
diff --git a/src/prompt_toolkit/layout/layout.py b/src/prompt_toolkit/layout/layout.py
index e6a57ed0..b06d61a1 100644
--- a/src/prompt_toolkit/layout/layout.py
+++ b/src/prompt_toolkit/layout/layout.py
@@ -47,7 +47,9 @@ class Layout:
"""
Find all the :class:`.UIControl` objects in this layout.
"""
- pass
+ for item in self.walk():
+ if isinstance(item, Window):
+ yield item
def focus(self, value: FocusableElement) ->None:
"""
@@ -62,77 +64,114 @@ class Layout:
from this container that was focused most recent, or the very first
focusable :class:`.Window` of the container.
"""
- pass
+ if isinstance(value, UIControl):
+ for window in self.find_all_windows():
+ if window.content == value:
+ self.current_window = window
+ return
+ elif isinstance(value, Buffer):
+ for window in self.find_all_windows():
+ if isinstance(window.content, BufferControl) and window.content.buffer == value:
+ self.current_window = window
+ return
+ elif isinstance(value, str):
+ for window in self.find_all_windows():
+ if isinstance(window.content, BufferControl) and window.content.buffer.name == value:
+ self.current_window = window
+ return
+ elif isinstance(value, Window):
+ self.current_window = value
+ elif isinstance(value, Container):
+ for window in self.find_all_windows():
+ if window in value.get_children():
+ self.current_window = window
+ return
def has_focus(self, value: FocusableElement) ->bool:
"""
Check whether the given control has the focus.
:param value: :class:`.UIControl` or :class:`.Window` instance.
"""
- pass
+ if isinstance(value, UIControl):
+ return self.current_control == value
+ elif isinstance(value, Window):
+ return self.current_window == value
+ return False
@property
def current_control(self) ->UIControl:
"""
Get the :class:`.UIControl` to currently has the focus.
"""
- pass
+ return self.current_window.content
@current_control.setter
def current_control(self, control: UIControl) ->None:
"""
Set the :class:`.UIControl` to receive the focus.
"""
- pass
+ self.focus(control)
@property
def current_window(self) ->Window:
"""Return the :class:`.Window` object that is currently focused."""
- pass
+ return self._stack[-1] if self._stack else None
@current_window.setter
def current_window(self, value: Window) ->None:
"""Set the :class:`.Window` object to be currently focused."""
- pass
+ if value not in self._stack:
+ self._stack.append(value)
+ else:
+ self._stack.remove(value)
+ self._stack.append(value)
@property
def is_searching(self) ->bool:
"""True if we are searching right now."""
- pass
+ return any(isinstance(c, SearchBufferControl) for c in self.search_links)
@property
def search_target_buffer_control(self) ->(BufferControl | None):
"""
Return the :class:`.BufferControl` in which we are searching or `None`.
"""
- pass
+ for search_control, buffer_control in self.search_links.items():
+ if self.has_focus(search_control):
+ return buffer_control
+ return None
def get_focusable_windows(self) ->Iterable[Window]:
"""
Return all the :class:`.Window` objects which are focusable (in the
'modal' area).
"""
- pass
+ return (w for w in self.walk_through_modal_area() if isinstance(w, Window) and w.content.is_focusable())
def get_visible_focusable_windows(self) ->list[Window]:
"""
Return a list of :class:`.Window` objects that are focusable.
"""
- pass
+ return [w for w in self.get_focusable_windows() if w.filter()]
@property
def current_buffer(self) ->(Buffer | None):
"""
The currently focused :class:`~.Buffer` or `None`.
"""
- pass
+ if isinstance(self.current_control, BufferControl):
+ return self.current_control.buffer
+ return None
def get_buffer_by_name(self, buffer_name: str) ->(Buffer | None):
"""
Look in the layout for a buffer with the given name.
Return `None` when nothing was found.
"""
- pass
+ for window in self.find_all_windows():
+ if isinstance(window.content, BufferControl) and window.content.buffer.name == buffer_name:
+ return window.content.buffer
+ return None
@property
def buffer_has_focus(self) ->bool:
@@ -141,32 +180,49 @@ class Layout:
:class:`.BufferControl`. (For instance, used to determine whether the
default key bindings should be active or not.)
"""
- pass
+ return isinstance(self.current_control, BufferControl)
@property
def previous_control(self) ->UIControl:
"""
Get the :class:`.UIControl` to previously had the focus.
"""
- pass
+ return self._stack[-2].content if len(self._stack) > 1 else None
def focus_last(self) ->None:
"""
Give the focus to the last focused control.
"""
- pass
+ if len(self._stack) > 1:
+ self._stack.pop()
def focus_next(self) ->None:
"""
Focus the next visible/focusable Window.
"""
- pass
+ windows = self.get_visible_focusable_windows()
+ if not windows:
+ return
+ try:
+ index = windows.index(self.current_window)
+ self.focus(windows[(index + 1) % len(windows)])
+ except ValueError:
+ # If the current window is not in the list, focus the first one
+ self.focus(windows[0])
def focus_previous(self) ->None:
"""
Focus the previous visible/focusable Window.
"""
- pass
+ windows = self.get_visible_focusable_windows()
+ if not windows:
+ return
+ try:
+ index = windows.index(self.current_window)
+ self.focus(windows[(index - 1) % len(windows)])
+ except ValueError:
+ # If the current window is not in the list, focus the last one
+ self.focus(windows[-1])
def walk(self) ->Iterable[Container]:
"""
@@ -203,4 +259,11 @@ def walk(container: Container, skip_hidden: bool=False) ->Iterable[Container]:
"""
Walk through layout, starting at this container.
"""
- pass
+ def walk_recursive(cont):
+ yield cont
+ if hasattr(cont, 'get_children'):
+ for child in cont.get_children():
+ if not skip_hidden or not isinstance(child, ConditionalContainer) or child.filter():
+ yield from walk_recursive(child)
+
+ yield from walk_recursive(container)
diff --git a/src/prompt_toolkit/layout/margins.py b/src/prompt_toolkit/layout/margins.py
index 221ae7c0..32cf5fbe 100644
--- a/src/prompt_toolkit/layout/margins.py
+++ b/src/prompt_toolkit/layout/margins.py
@@ -120,4 +120,30 @@ class PromptMargin(Margin):
def get_width(self, get_ui_content: Callable[[], UIContent]) ->int:
"""Width to report to the `Window`."""
- pass
+ # Get the prompt text
+ prompt_text = fragment_list_to_text(to_formatted_text(self.get_prompt()))
+
+ # Calculate the width of the prompt
+ prompt_width = get_cwidth(prompt_text)
+
+ # If there's a continuation function, calculate its maximum width
+ if self.get_continuation:
+ ui_content = get_ui_content()
+ line_count = ui_content.line_count
+
+ # Check width for each line (excluding the first line)
+ continuation_widths = [
+ get_cwidth(fragment_list_to_text(to_formatted_text(
+ self.get_continuation(prompt_width, i, False)
+ )))
+ for i in range(1, line_count)
+ ]
+
+ # Get the maximum width of continuation lines
+ max_continuation_width = max(continuation_widths) if continuation_widths else 0
+
+ # Return the maximum of prompt width and continuation width
+ return max(prompt_width, max_continuation_width)
+
+ # If there's no continuation function, just return the prompt width
+ return prompt_width
diff --git a/src/prompt_toolkit/layout/menus.py b/src/prompt_toolkit/layout/menus.py
index 8231f28b..b419bc0f 100644
--- a/src/prompt_toolkit/layout/menus.py
+++ b/src/prompt_toolkit/layout/menus.py
@@ -38,33 +38,81 @@ class CompletionsMenuControl(UIControl):
"""
Create a UIContent object for this control.
"""
- pass
+ complete_state = get_app().current_buffer.complete_state
+ if complete_state is None:
+ return UIContent()
+
+ completions = complete_state.completions
+ index = complete_state.complete_index # Can be None!
+
+ # Calculate width of completions menu.
+ menu_width = self._get_menu_width(width, complete_state)
+ menu_meta_width = self._get_menu_meta_width(width, complete_state)
+ show_meta = self._show_meta(complete_state)
+
+ def get_line(i):
+ c = completions[i]
+ is_current_completion = (i == index)
+ result = _get_menu_item_fragments(c, is_current_completion, menu_width, space_after=show_meta)
+
+ if show_meta:
+ result += _get_menu_item_fragments(c.display_meta, is_current_completion, menu_meta_width)
+
+ return result
+
+ return UIContent(get_line=get_line,
+ cursor_position=Point(x=0, y=index or 0),
+ line_count=len(completions))
def _show_meta(self, complete_state: CompletionState) ->bool:
"""
Return ``True`` if we need to show a column with meta information.
"""
- pass
+ return any(c.display_meta for c in complete_state.completions)
def _get_menu_width(self, max_width: int, complete_state: CompletionState
) ->int:
"""
Return the width of the main column.
"""
- pass
+ return min(max_width, max(self.MIN_WIDTH, max(get_cwidth(c.display) for c in complete_state.completions) + 2))
def _get_menu_meta_width(self, max_width: int, complete_state:
CompletionState) ->int:
"""
Return the width of the meta column.
"""
- pass
+ if self._show_meta(complete_state):
+ return min(max_width // 2, max(get_cwidth(c.display_meta) for c in complete_state.completions if c.display_meta))
+ return 0
def mouse_handler(self, mouse_event: MouseEvent) ->NotImplementedOrNone:
"""
Handle mouse events: clicking and scrolling.
"""
- pass
+ b = get_app().current_buffer
+ complete_state = b.complete_state
+
+ if complete_state is None:
+ return NotImplemented
+
+ # Mouse click
+ if mouse_event.event_type == MouseEventType.MOUSE_UP:
+ index = mouse_event.position.y
+
+ if 0 <= index < len(complete_state.completions):
+ b.apply_completion(complete_state.completions[index])
+ return None
+
+ # Scroll up/down
+ if mouse_event.event_type == MouseEventType.SCROLL_DOWN:
+ b.complete_next()
+ return None
+ elif mouse_event.event_type == MouseEventType.SCROLL_UP:
+ b.complete_previous()
+ return None
+
+ return NotImplemented
def _get_menu_item_fragments(completion: Completion, is_current_completion:
@@ -73,7 +121,24 @@ def _get_menu_item_fragments(completion: Completion, is_current_completion:
Get the style/text tuples for a menu item, styled and trimmed to the given
width.
"""
- pass
+ style = 'class:completion-menu.completion'
+ if is_current_completion:
+ style += '.current'
+
+ text = completion.display
+ text, text_width = _trim_formatted_text([(style, text)], width)
+
+ padding = width - text_width
+ if space_after:
+ padding -= 1
+
+ if padding > 0:
+ text.append((style, ' ' * padding))
+
+ if space_after:
+ text.append((style, ' '))
+
+ return text
def _trim_formatted_text(formatted_text: StyleAndTextTuples, max_width: int
@@ -82,7 +147,21 @@ def _trim_formatted_text(formatted_text: StyleAndTextTuples, max_width: int
Trim the text to `max_width`, append dots when the text is too long.
Returns (text, width) tuple.
"""
- pass
+ result: StyleAndTextTuples = []
+ current_width = 0
+
+ for style, text in formatted_text:
+ for c in text:
+ char_width = get_cwidth(c)
+ if current_width + char_width > max_width:
+ result.append((style, '...'))
+ current_width += 3
+ return result, current_width
+
+ result.append((style, c))
+ current_width += char_width
+
+ return result, current_width
class CompletionsMenu(ConditionalContainer):
@@ -146,7 +225,20 @@ class MultiColumnCompletionMenuControl(UIControl):
Preferred width: prefer to use at least min_rows, but otherwise as much
as possible horizontally.
"""
- pass
+ complete_state = get_app().current_buffer.complete_state
+ if complete_state is None:
+ return 0
+
+ column_width = self._get_column_width(complete_state)
+ max_columns = max(1, (max_available_width - self._required_margin) // column_width)
+
+ return min(
+ max_available_width,
+ max(
+ self._required_margin + column_width,
+ column_width * max_columns + self._required_margin
+ )
+ )
def preferred_height(self, width: int, max_available_height: int,
wrap_lines: bool, get_line_prefix: (GetLinePrefixCallable | None)) ->(
@@ -154,32 +246,122 @@ class MultiColumnCompletionMenuControl(UIControl):
"""
Preferred height: as much as needed in order to display all the completions.
"""
- pass
+ complete_state = get_app().current_buffer.complete_state
+ if complete_state is None:
+ return 0
+
+ column_width = self._get_column_width(complete_state)
+ column_count = max(1, (width - self._required_margin) // column_width)
+ row_count = int(math.ceil(len(complete_state.completions) / float(column_count)))
+ return max(self.min_rows, min(row_count, max_available_height))
def create_content(self, width: int, height: int) ->UIContent:
"""
Create a UIContent object for this menu.
"""
- pass
+ complete_state = get_app().current_buffer.complete_state
+ if complete_state is None:
+ return UIContent()
+
+ column_width = self._get_column_width(complete_state)
+ self._render_pos_to_completion = {}
+ self._render_left_arrow = False
+ self._render_right_arrow = False
+ self._render_width = width
+
+ visible_columns = max(1, (width - self._required_margin) // column_width)
+ visible_rows = height
+
+ columns = []
+ for i in range(visible_columns):
+ col_start = i * visible_rows - self.scroll
+ col_end = (i + 1) * visible_rows - self.scroll
+ columns.append(complete_state.completions[col_start:col_end])
+
+ def get_line(y):
+ result = []
+ for x, column in enumerate(columns):
+ if y < len(column):
+ completion = column[y]
+ style = 'class:completion-menu.completion'
+ if complete_state.complete_index == self.scroll + x * visible_rows + y:
+ style += '.current'
+ result.extend(_get_menu_item_fragments(completion, False, column_width - 1, True))
+ self._render_pos_to_completion[x, y] = completion
+ else:
+ result.append(('', ' ' * column_width))
+ return result
+
+ self._rendered_rows = visible_rows
+ self._rendered_columns = visible_columns
+ self._total_columns = int(math.ceil(len(complete_state.completions) / float(visible_rows)))
+
+ # Show left/right arrows (for scrolling) if there are more completions than visible.
+ if self.scroll > 0:
+ self._render_left_arrow = True
+ if self.scroll < (self._total_columns - visible_columns) * visible_rows:
+ self._render_right_arrow = True
+
+ return UIContent(get_line=get_line,
+ line_count=visible_rows,
+ show_cursor=False)
def _get_column_width(self, completion_state: CompletionState) ->int:
"""
Return the width of each column.
"""
- pass
+ if completion_state in self._column_width_for_completion_state:
+ return self._column_width_for_completion_state[completion_state][0]
+
+ max_width = max(get_cwidth(c.display) for c in completion_state.completions)
+ result = max(self.suggested_max_column_width, max_width)
+ self._column_width_for_completion_state[completion_state] = (result, max_width)
+ return result
def mouse_handler(self, mouse_event: MouseEvent) ->NotImplementedOrNone:
"""
Handle scroll and click events.
"""
- pass
+ if mouse_event.event_type == MouseEventType.SCROLL_DOWN:
+ self.scroll += self._rendered_rows
+ self.scroll = min(self.scroll, (self._total_columns - self._rendered_columns) * self._rendered_rows)
+ elif mouse_event.event_type == MouseEventType.SCROLL_UP:
+ self.scroll -= self._rendered_rows
+ self.scroll = max(0, self.scroll)
+ elif mouse_event.event_type == MouseEventType.MOUSE_UP:
+ x = mouse_event.position.x
+ y = mouse_event.position.y
+
+ if x == 0 and self._render_left_arrow:
+ self.scroll -= self._rendered_rows
+ self.scroll = max(0, self.scroll)
+ elif x == self._render_width - 1 and self._render_right_arrow:
+ self.scroll += self._rendered_rows
+ self.scroll = min(self.scroll, (self._total_columns - self._rendered_columns) * self._rendered_rows)
+ elif (x, y) in self._render_pos_to_completion:
+ completion = self._render_pos_to_completion[x, y]
+ get_app().current_buffer.apply_completion(completion)
+
+ return None
def get_key_bindings(self) ->KeyBindings:
"""
Expose key bindings that handle the left/right arrow keys when the menu
is displayed.
"""
- pass
+ kb = KeyBindings()
+
+ @kb.add('left', filter=~has_completions)
+ def _(event):
+ self.scroll -= self._rendered_rows
+ self.scroll = max(0, self.scroll)
+
+ @kb.add('right', filter=~has_completions)
+ def _(event):
+ self.scroll += self._rendered_rows
+ self.scroll = min(self.scroll, (self._total_columns - self._rendered_columns) * self._rendered_rows)
+
+ return kb
class MultiColumnCompletionsMenu(HSplit):
@@ -224,4 +406,9 @@ class _SelectedCompletionMetaControl(UIControl):
layout doesn't change when we select another completion (E.g. that
completions are suddenly shown in more or fewer columns.)
"""
- pass
+ complete_state = get_app().current_buffer.complete_state
+ if complete_state is None:
+ return None
+
+ max_meta_width = max(get_cwidth(c.display_meta) for c in complete_state.completions if c.display_meta)
+ return min(max_meta_width, max_available_width)
diff --git a/src/prompt_toolkit/layout/mouse_handlers.py b/src/prompt_toolkit/layout/mouse_handlers.py
index 52c54dc6..0539ef1e 100644
--- a/src/prompt_toolkit/layout/mouse_handlers.py
+++ b/src/prompt_toolkit/layout/mouse_handlers.py
@@ -29,4 +29,6 @@ class MouseHandlers:
"""
Set mouse handler for a region.
"""
- pass
+ for x in range(x_min, x_max + 1):
+ for y in range(y_min, y_max + 1):
+ self.mouse_handlers[x][y] = handler
diff --git a/src/prompt_toolkit/layout/processors.py b/src/prompt_toolkit/layout/processors.py
index 71fc93e2..3c0fe8ba 100644
--- a/src/prompt_toolkit/layout/processors.py
+++ b/src/prompt_toolkit/layout/processors.py
@@ -119,7 +119,8 @@ class HighlightSearchProcessor(Processor):
"""
The text we are searching for.
"""
- pass
+ search_state = buffer_control.search_state
+ return search_state.text if search_state else ''
class HighlightIncrementalSearchProcessor(HighlightSearchProcessor):
@@ -138,7 +139,8 @@ class HighlightIncrementalSearchProcessor(HighlightSearchProcessor):
"""
The text we are searching for.
"""
- pass
+ search_state = buffer_control.search_state
+ return search_state.text if search_state and search_state.isearch_state else ''
class HighlightSelectionProcessor(Processor):
@@ -183,7 +185,44 @@ class HighlightMatchingBracketProcessor(Processor):
"""
Return a list of (row, col) tuples that need to be highlighted.
"""
- pass
+ cursor_row, cursor_col = document.translate_index_to_position(
+ document.cursor_position)
+
+ def find_matching_bracket(pos, direction):
+ stack = []
+ for i in range(pos, 0 if direction < 0 else len(document.text), direction):
+ char = document.text[i]
+ if char in self.chars:
+ if not stack and char in self._closing_braces:
+ return i
+ if self.chars.index(char) % 2 == 0:
+ stack.append(char)
+ else:
+ if stack and self.chars.index(stack[-1]) == self.chars.index(char) - 1:
+ stack.pop()
+ else:
+ return i
+ if not stack:
+ return i
+ return -1
+
+ result = []
+ cursor_char = document.text[document.cursor_position] if document.cursor_position < len(document.text) else ''
+
+ if cursor_char in self.chars:
+ pos = document.cursor_position
+ elif document.cursor_position > 0 and document.text[document.cursor_position - 1] in self.chars:
+ pos = document.cursor_position - 1
+ else:
+ return result
+
+ if abs(pos - document.cursor_position) <= self.max_cursor_distance:
+ matching_pos = find_matching_bracket(pos, 1 if self.chars.index(document.text[pos]) % 2 == 0 else -1)
+ if matching_pos != -1:
+ result.append(document.translate_index_to_position(pos))
+ result.append(document.translate_index_to_position(matching_pos))
+
+ return result
class DisplayMultipleCursors(Processor):
@@ -367,7 +406,7 @@ def merge_processors(processors: list[Processor]) ->Processor:
"""
Merge multiple `Processor` objects into one.
"""
- pass
+ return _MergedProcessor(processors)
class _MergedProcessor(Processor):
diff --git a/src/prompt_toolkit/layout/screen.py b/src/prompt_toolkit/layout/screen.py
index 29cbf2a9..9550258d 100644
--- a/src/prompt_toolkit/layout/screen.py
+++ b/src/prompt_toolkit/layout/screen.py
@@ -87,27 +87,27 @@ class Screen:
"""
Set the cursor position for a given window.
"""
- pass
+ self.cursor_positions[window] = position
def set_menu_position(self, window: Window, position: Point) ->None:
"""
Set the cursor position for a given window.
"""
- pass
+ self.menu_positions[window] = position
def get_cursor_position(self, window: Window) ->Point:
"""
Get the cursor position for a given window.
Returns a `Point`.
"""
- pass
+ return self.cursor_positions.get(window, Point(0, 0))
def get_menu_position(self, window: Window) ->Point:
"""
Get the menu position for a given window.
(This falls back to the cursor position if no menu position was set.)
"""
- pass
+ return self.menu_positions.get(window, self.get_cursor_position(window))
def draw_with_z_index(self, z_index: int, draw_func: Callable[[], None]
) ->None:
@@ -115,20 +115,24 @@ class Screen:
Add a draw-function for a `Window` which has a >= 0 z_index.
This will be postponed until `draw_all_floats` is called.
"""
- pass
+ self._draw_float_functions.append((z_index, draw_func))
def draw_all_floats(self) ->None:
"""
Draw all float functions in order of z-index.
"""
- pass
+ for _, draw_func in sorted(self._draw_float_functions):
+ draw_func()
+ self._draw_float_functions.clear()
def append_style_to_content(self, style_str: str) ->None:
"""
For all the characters in the screen.
Set the style string to the given `style_str`.
"""
- pass
+ for row in self.data_buffer.values():
+ for col, char in row.items():
+ row[col] = _CHAR_CACHE[char.char, char.style + ' ' + style_str]
def fill_area(self, write_position: WritePosition, style: str='', after:
bool=False) ->None:
@@ -136,7 +140,12 @@ class Screen:
Fill the content of this area, using the given `style`.
The style is prepended before whatever was here before.
"""
- pass
+ for y in range(write_position.ypos, write_position.ypos + write_position.height):
+ row = self.data_buffer[y]
+ for x in range(write_position.xpos, write_position.xpos + write_position.width):
+ char = row[x]
+ new_style = style + ' ' + char.style if after else char.style + ' ' + style
+ row[x] = _CHAR_CACHE[char.char, new_style.strip()]
class WritePosition:
diff --git a/src/prompt_toolkit/layout/scrollable_pane.py b/src/prompt_toolkit/layout/scrollable_pane.py
index 22c7047e..f948f1a0 100644
--- a/src/prompt_toolkit/layout/scrollable_pane.py
+++ b/src/prompt_toolkit/layout/scrollable_pane.py
@@ -77,21 +77,82 @@ class ScrollablePane(Container):
This works by rendering on an off-screen canvas, and copying over the
visible region.
"""
- pass
+ # Calculate dimensions
+ xpos = write_position.xpos
+ ypos = write_position.ypos
+ width = write_position.width
+ height = write_position.height
+
+ # Create a temporary screen for rendering the full content
+ temp_screen = Screen(self.max_available_height, width)
+ temp_mouse_handlers = MouseHandlers()
+
+ # Render content on the temporary screen
+ self.content.write_to_screen(
+ temp_screen,
+ temp_mouse_handlers,
+ WritePosition(xpos=0, ypos=0, width=width, height=self.max_available_height),
+ parent_style,
+ erase_bg,
+ z_index
+ )
+
+ # Calculate the visible region
+ visible_height = min(height, temp_screen.height - self.vertical_scroll)
+
+ # Copy visible region to the actual screen
+ self._copy_over_screen(screen, temp_screen, write_position, width)
+ self._copy_over_mouse_handlers(mouse_handlers, temp_mouse_handlers, write_position, width)
+ self._copy_over_write_positions(screen, temp_screen, write_position)
+
+ # Draw scrollbar if needed
+ if self.show_scrollbar() and temp_screen.height > height:
+ self._draw_scrollbar(write_position, temp_screen.height, screen)
+
+ # Make focused window visible if needed
+ if self.keep_focused_window_visible():
+ focused_windows = [
+ w for w in temp_screen.visible_windows
+ if w.render_info and w.render_info.get_visible_line_to_row_col
+ ]
+ if focused_windows:
+ focused_window = focused_windows[-1]
+ cursor_position = focused_window.render_info.cursor_position
+ self._make_window_visible(visible_height, temp_screen.height,
+ focused_window.render_info.window_write_position,
+ cursor_position)
def _clip_point_to_visible_area(self, point: Point, write_position:
WritePosition) ->Point:
"""
Ensure that the cursor and menu positions always are always reported
+ within the visible area.
"""
- pass
+ x = point.x
+ y = point.y - self.vertical_scroll
+
+ x = max(0, min(x, write_position.width - 1))
+ y = max(0, min(y, write_position.height - 1))
+
+ return Point(x=x, y=y)
def _copy_over_screen(self, screen: Screen, temp_screen: Screen,
write_position: WritePosition, virtual_width: int) ->None:
"""
Copy over visible screen content and "zero width escape sequences".
"""
- pass
+ for y in range(min(write_position.height, temp_screen.height - self.vertical_scroll)):
+ for x in range(min(write_position.width, virtual_width)):
+ temp_char = temp_screen.data_buffer[self.vertical_scroll + y][x]
+ screen.data_buffer[write_position.ypos + y][write_position.xpos + x] = temp_char
+
+ # Copy over zero width escape sequences
+ for y in range(min(write_position.height, temp_screen.height - self.vertical_scroll)):
+ row = self.vertical_scroll + y
+ if row in temp_screen.zero_width_escapes:
+ for x, escapes in temp_screen.zero_width_escapes[row].items():
+ if x < virtual_width:
+ screen.zero_width_escapes[write_position.ypos + y][write_position.xpos + x] = escapes
def _copy_over_mouse_handlers(self, mouse_handlers: MouseHandlers,
temp_mouse_handlers: MouseHandlers, write_position: WritePosition,
@@ -102,14 +163,36 @@ class ScrollablePane(Container):
Note: we take `virtual_width` because we don't want to copy over mouse
handlers that we possibly have behind the scrollbar.
"""
- pass
+ for y in range(min(write_position.height, self.max_available_height - self.vertical_scroll)):
+ for x in range(min(write_position.width, virtual_width)):
+ key = (self.vertical_scroll + y, x)
+ if key in temp_mouse_handlers.mouse_handlers:
+ mouse_handlers.set_mouse_handler_for_region(
+ x=write_position.xpos + x,
+ y=write_position.ypos + y,
+ width=1,
+ height=1,
+ handler=temp_mouse_handlers.mouse_handlers[key]
+ )
def _copy_over_write_positions(self, screen: Screen, temp_screen:
Screen, write_position: WritePosition) ->None:
"""
Copy over window write positions.
"""
- pass
+ for window, positions in temp_screen.write_positions.items():
+ new_positions = []
+ for position in positions:
+ new_ypos = position.ypos - self.vertical_scroll + write_position.ypos
+ if write_position.ypos <= new_ypos < write_position.ypos + write_position.height:
+ new_positions.append(WritePosition(
+ xpos=position.xpos + write_position.xpos,
+ ypos=new_ypos,
+ width=min(position.width, write_position.width),
+ height=min(position.height, write_position.ypos + write_position.height - new_ypos)
+ ))
+ if new_positions:
+ screen.write_positions[window] = new_positions
def _make_window_visible(self, visible_height: int, virtual_height: int,
visible_win_write_pos: WritePosition, cursor_position: (Point | None)
@@ -124,7 +207,31 @@ class ScrollablePane(Container):
:param cursor_position: The location of the cursor position of this
window on the temp screen.
"""
- pass
+ if cursor_position is not None:
+ cursor_y = cursor_position.y
+ else:
+ cursor_y = visible_win_write_pos.ypos
+
+ def scroll_to(scroll_offset):
+ self.vertical_scroll = max(0, min(scroll_offset, virtual_height - visible_height))
+
+ if self.keep_cursor_visible():
+ # Scroll up if needed
+ if cursor_y < self.vertical_scroll + self.scroll_offsets.top:
+ scroll_to(cursor_y - self.scroll_offsets.top)
+
+ # Scroll down if needed
+ elif cursor_y >= self.vertical_scroll + visible_height - self.scroll_offsets.bottom:
+ scroll_to(cursor_y - visible_height + 1 + self.scroll_offsets.bottom)
+
+ else:
+ # Scroll up if needed
+ if visible_win_write_pos.ypos < self.vertical_scroll:
+ scroll_to(visible_win_write_pos.ypos)
+
+ # Scroll down if needed
+ elif visible_win_write_pos.ypos + visible_win_write_pos.height > self.vertical_scroll + visible_height:
+ scroll_to(visible_win_write_pos.ypos + visible_win_write_pos.height - visible_height)
def _draw_scrollbar(self, write_position: WritePosition, content_height:
int, screen: Screen) ->None:
@@ -134,4 +241,25 @@ class ScrollablePane(Container):
Note: There is some code duplication with the `ScrollbarMargin`
implementation.
"""
- pass
+ window_height = write_position.height
+ scrollbar_height = max(1, int(window_height * window_height / content_height))
+ scrollbar_top = int(self.vertical_scroll * window_height / content_height)
+
+ x = write_position.xpos + write_position.width - 1
+ y = write_position.ypos
+
+ # Draw scrollbar background
+ for i in range(window_height):
+ screen.data_buffer[y + i][x] = Char(' ', 'class:scrollbar.background')
+
+ # Draw scrollbar itself
+ for i in range(scrollbar_height):
+ if 0 <= y + scrollbar_top + i < y + window_height:
+ screen.data_buffer[y + scrollbar_top + i][x] = Char(' ', 'class:scrollbar')
+
+ # Draw arrows
+ if self.display_arrows():
+ if self.vertical_scroll > 0:
+ screen.data_buffer[y][x] = Char(self.up_arrow_symbol, 'class:scrollbar.arrow')
+ if self.vertical_scroll + window_height < content_height:
+ screen.data_buffer[y + window_height - 1][x] = Char(self.down_arrow_symbol, 'class:scrollbar.arrow')
diff --git a/src/prompt_toolkit/layout/utils.py b/src/prompt_toolkit/layout/utils.py
index b4df4dc7..3d19c4e6 100644
--- a/src/prompt_toolkit/layout/utils.py
+++ b/src/prompt_toolkit/layout/utils.py
@@ -48,4 +48,13 @@ def explode_text_fragments(fragments: Iterable[_T]) ->_ExplodedList[_T]:
:param fragments: List of (style, text) tuples.
"""
- pass
+ result = _ExplodedList()
+
+ for style, text in fragments:
+ if isinstance(text, str):
+ result.extend((style, c) for c in text)
+ else:
+ # If it's not a string, we assume it's already exploded
+ result.append((style, text))
+
+ return result
diff --git a/src/prompt_toolkit/lexers/base.py b/src/prompt_toolkit/lexers/base.py
index a6b501bd..1d2d718d 100644
--- a/src/prompt_toolkit/lexers/base.py
+++ b/src/prompt_toolkit/lexers/base.py
@@ -32,7 +32,7 @@ class Lexer(metaclass=ABCMeta):
When this changes, `lex_document` could give a different output.
(Only used for `DynamicLexer`.)
"""
- pass
+ return None
class SimpleLexer(Lexer):
@@ -46,6 +46,15 @@ class SimpleLexer(Lexer):
def __init__(self, style: str='') ->None:
self.style = style
+ def lex_document(self, document: Document) ->Callable[[int],
+ StyleAndTextTuples]:
+ def get_line(lineno: int) ->StyleAndTextTuples:
+ return [(self.style, document.lines[lineno])]
+ return get_line
+
+ def invalidation_hash(self) ->Hashable:
+ return self.style
+
class DynamicLexer(Lexer):
"""
@@ -57,3 +66,14 @@ class DynamicLexer(Lexer):
def __init__(self, get_lexer: Callable[[], Lexer | None]) ->None:
self.get_lexer = get_lexer
self._dummy = SimpleLexer()
+
+ def lex_document(self, document: Document) ->Callable[[int],
+ StyleAndTextTuples]:
+ lexer = self.get_lexer() or self._dummy
+ return lexer.lex_document(document)
+
+ def invalidation_hash(self) ->Hashable:
+ lexer = self.get_lexer()
+ if lexer:
+ return lexer.invalidation_hash()
+ return self._dummy.invalidation_hash()
diff --git a/src/prompt_toolkit/lexers/pygments.py b/src/prompt_toolkit/lexers/pygments.py
index 79f8caa1..d1e1bd58 100644
--- a/src/prompt_toolkit/lexers/pygments.py
+++ b/src/prompt_toolkit/lexers/pygments.py
@@ -62,14 +62,27 @@ class RegexSync(SyntaxSync):
"""
Scan backwards, and find a possible position to start.
"""
- pass
+ # Start from the requested line and move backwards
+ for i in range(max(0, lineno - 1), max(-1, lineno - self.MAX_BACKWARDS), -1):
+ match = self._compiled_pattern.search(document.lines[i])
+ if match:
+ return i, match.start()
+
+ # If no match found, start from the beginning if the document is small
+ if lineno <= self.FROM_START_IF_NO_SYNC_POS_FOUND:
+ return 0, 0
+
+ # Otherwise, start a bit before the requested line
+ return max(0, lineno - self.MAX_BACKWARDS), 0
@classmethod
def from_pygments_lexer_cls(cls, lexer_cls: PygmentsLexerCls) ->RegexSync:
"""
Create a :class:`.RegexSync` instance for this Pygments lexer class.
"""
- pass
+ patterns = getattr(lexer_cls, 'flags', []) + getattr(lexer_cls, 'tokens', {}).get('root', [])
+ needle = '|'.join(f'({p[1].pattern})' for p in patterns if isinstance(p, tuple) and hasattr(p[1], 'pattern'))
+ return cls(needle)
class _TokenCache(Dict[Tuple[str, ...], str]):
@@ -132,7 +145,13 @@ class PygmentsLexer(Lexer):
"""
Create a `Lexer` from a filename.
"""
- pass
+ from pygments.lexers import get_lexer_for_filename
+ try:
+ pygments_lexer = get_lexer_for_filename(filename)
+ except ClassNotFound:
+ return SimpleLexer()
+
+ return cls(pygments_lexer.__class__, sync_from_start=sync_from_start)
def lex_document(self, document: Document) ->Callable[[int],
StyleAndTextTuples]:
@@ -140,4 +159,40 @@ class PygmentsLexer(Lexer):
Create a lexer function that takes a line number and returns the list
of (style_str, text) tuples as the Pygments lexer returns for that line.
"""
- pass
+ if self.sync_from_start():
+ return self._lex_from_start(document)
+ else:
+ return self._lex_from_closest_sync(document)
+
+ def _lex_from_start(self, document: Document) ->Callable[[int],
+ StyleAndTextTuples]:
+ lines = document.lines
+ pygments_lexer = self.pygments_lexer
+
+ def get_line(lineno: int) ->StyleAndTextTuples:
+ return list(pygments_lexer.get_tokens(lines[lineno]))
+
+ return get_line
+
+ def _lex_from_closest_sync(self, document: Document) ->Callable[[int],
+ StyleAndTextTuples]:
+ lines = document.lines
+ pygments_lexer = self.pygments_lexer
+
+ def get_line(lineno: int) ->StyleAndTextTuples:
+ # Find the start position for the lexer
+ row, column = self.syntax_sync.get_sync_start_position(document, lineno)
+
+ # Create a generator for the lexed tokens
+ text = '\n'.join(lines[row:lineno + 1])
+ tokens = pygments_lexer.get_tokens(text)
+
+ # Ignore tokens for previous lines
+ for _ in range(lineno - row):
+ for _ in tokens:
+ pass
+
+ # Return the tokens for the requested line
+ return list(tokens)
+
+ return get_line
diff --git a/src/prompt_toolkit/output/base.py b/src/prompt_toolkit/output/base.py
index 8c4343ff..2ca27e5e 100644
--- a/src/prompt_toolkit/output/base.py
+++ b/src/prompt_toolkit/output/base.py
@@ -25,7 +25,9 @@ class Output(metaclass=ABCMeta):
@abstractmethod
def fileno(self) ->int:
"""Return the file descriptor to which we can write for the output."""
- pass
+ if self.stdout is not None:
+ return self.stdout.fileno()
+ raise NotImplementedError("fileno() not implemented for this output")
@abstractmethod
def encoding(self) ->str:
@@ -35,7 +37,9 @@ class Output(metaclass=ABCMeta):
output the data, so that the UI can provide alternatives, when
required.)
"""
- pass
+ if self.stdout is not None:
+ return self.stdout.encoding or 'utf-8'
+ return 'utf-8'
@abstractmethod
def write(self, data: str) ->None:
@@ -191,7 +195,7 @@ class Output(metaclass=ABCMeta):
On Windows, we don't need this, there we have
`get_rows_below_cursor_position`.
"""
- pass
+ return False # Default implementation returns False
@abstractmethod
def get_size(self) ->Size:
@@ -253,4 +257,4 @@ class DummyOutput(Output):
def fileno(self) ->int:
"""There is no sensible default for fileno()."""
- pass
+ raise NotImplementedError("DummyOutput does not have a file descriptor")
diff --git a/src/prompt_toolkit/output/color_depth.py b/src/prompt_toolkit/output/color_depth.py
index 079d6e36..59f274de 100644
--- a/src/prompt_toolkit/output/color_depth.py
+++ b/src/prompt_toolkit/output/color_depth.py
@@ -27,11 +27,24 @@ class ColorDepth(str, Enum):
This is a way to enforce a certain color depth in all prompt_toolkit
applications.
"""
- pass
+ env_value = os.environ.get('PROMPT_TOOLKIT_COLOR_DEPTH')
+ if env_value:
+ try:
+ return cls(env_value)
+ except ValueError:
+ # If the value is not a valid ColorDepth, return None
+ return None
+ return None
@classmethod
def default(cls) ->ColorDepth:
"""
Return the default color depth for the default output.
"""
- pass
+ # First, check if there's an environment variable set
+ from_env = cls.from_env()
+ if from_env is not None:
+ return from_env
+
+ # If no environment variable is set, return the DEFAULT value
+ return cls.DEFAULT
diff --git a/src/prompt_toolkit/output/defaults.py b/src/prompt_toolkit/output/defaults.py
index 396c16f3..c293f03c 100644
--- a/src/prompt_toolkit/output/defaults.py
+++ b/src/prompt_toolkit/output/defaults.py
@@ -24,4 +24,34 @@ def create_output(stdout: (TextIO | None)=None, always_prefer_tty: bool=False
That way, tools like `print_formatted_text` will write plain text into
that file.
"""
- pass
+ if stdout is None:
+ stdout = sys.stdout
+
+ # Check if the output is a TTY.
+ if not stdout.isatty() and always_prefer_tty:
+ stdout = sys.stderr
+
+ if not stdout.isatty():
+ return PlainTextOutput(stdout)
+
+ term = get_term_environment_variable()
+ bell_variable = get_bell_environment_variable()
+
+ # If the PROMPT_TOOLKIT_COLOR_DEPTH environment variable is set, use that.
+ color_depth = ColorDepth.default()
+
+ if is_conemu_ansi():
+ from .conemu import ConEmuOutput
+ return ConEmuOutput(stdout)
+
+ if term in ('linux', 'eterm-color'):
+ from .vt100 import Vt100_Output
+ return Vt100_Output(stdout, color_depth=color_depth)
+
+ if term == 'windows':
+ from .win32 import Win32Output
+ return Win32Output(stdout, bell_variable=bell_variable)
+
+ # Default to VT100 output.
+ from .vt100 import Vt100_Output
+ return Vt100_Output(stdout, color_depth=color_depth)
diff --git a/src/prompt_toolkit/output/flush_stdout.py b/src/prompt_toolkit/output/flush_stdout.py
index 5a1f98e9..d4f6c4ef 100644
--- a/src/prompt_toolkit/output/flush_stdout.py
+++ b/src/prompt_toolkit/output/flush_stdout.py
@@ -1,5 +1,6 @@
from __future__ import annotations
import errno
+import fcntl
import os
import sys
from contextlib import contextmanager
@@ -12,4 +13,13 @@ def _blocking_io(io: IO[str]) ->Iterator[None]:
"""
Ensure that the FD for `io` is set to blocking in here.
"""
- pass
+ fd = io.fileno()
+ old_flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+
+ try:
+ # Remove non-blocking flag
+ fcntl.fcntl(fd, fcntl.F_SETFL, old_flags & ~os.O_NONBLOCK)
+ yield
+ finally:
+ # Restore original flags
+ fcntl.fcntl(fd, fcntl.F_SETFL, old_flags)
diff --git a/src/prompt_toolkit/output/plain_text.py b/src/prompt_toolkit/output/plain_text.py
index 59ecf92e..46cdda73 100644
--- a/src/prompt_toolkit/output/plain_text.py
+++ b/src/prompt_toolkit/output/plain_text.py
@@ -27,5 +27,11 @@ class PlainTextOutput(Output):
self._buffer: list[str] = []
def fileno(self) ->int:
- """There is no sensible default for fileno()."""
- pass
+ """
+ Return the file descriptor of the stdout stream.
+
+ If stdout doesn't have a file descriptor, raise an AttributeError.
+ """
+ if hasattr(self.stdout, 'fileno'):
+ return self.stdout.fileno()
+ raise AttributeError("The stdout stream does not have a file descriptor.")
diff --git a/src/prompt_toolkit/output/vt100.py b/src/prompt_toolkit/output/vt100.py
index d371a044..bc553357 100644
--- a/src/prompt_toolkit/output/vt100.py
+++ b/src/prompt_toolkit/output/vt100.py
@@ -52,7 +52,12 @@ def _get_closest_ansi_color(r: int, g: int, b: int, exclude: Sequence[str]=()
:param b: Blue (Between 0 and 255.)
:param exclude: A tuple of color names to exclude. (E.g. ``('ansired', )``.)
"""
- pass
+ def distance(color):
+ r2, g2, b2 = ANSI_COLORS_TO_RGB[color]
+ return (r - r2) ** 2 + (g - g2) ** 2 + (b - b2) ** 2
+
+ colors = set(ANSI_COLORS_TO_RGB.keys()) - set(exclude)
+ return min(colors, key=distance)
_ColorCodeAndName = Tuple[int, str]
@@ -75,7 +80,20 @@ class _16ColorCache:
Return a (ansi_code, ansi_name) tuple. (E.g. ``(44, 'ansiblue')``.) for
a given (r,g,b) value.
"""
- pass
+ r, g, b = value
+
+ # If it's in the cache, return it
+ cache_key = (r, g, b, tuple(exclude))
+ if cache_key in self._cache:
+ return self._cache[cache_key]
+
+ # Otherwise, find the closest match
+ color_name = _get_closest_ansi_color(r, g, b, exclude)
+ color_code = (BG_ANSI_COLORS if self.bg else FG_ANSI_COLORS)[color_name]
+
+ result = (color_code, color_name)
+ self._cache[cache_key] = result
+ return result
class _256ColorCache(Dict[Tuple[int, int, int], int]):
@@ -171,13 +189,41 @@ class _EscapeCodeCache(Dict[Attrs, str]):
def _color_name_to_rgb(self, color: str) ->tuple[int, int, int]:
"""Turn 'ffffff', into (0xff, 0xff, 0xff)."""
- pass
+ if color in ANSI_COLORS_TO_RGB:
+ return ANSI_COLORS_TO_RGB[color]
+ else:
+ r = int(color[0:2], 16)
+ g = int(color[2:4], 16)
+ b = int(color[4:6], 16)
+ return (r, g, b)
def _colors_to_code(self, fg_color: str, bg_color: str) ->Iterable[str]:
"""
Return a tuple with the vt100 values that represent this color.
"""
- pass
+ result = []
+
+ def color_to_code(color: str, fg: bool) ->str:
+ table = FG_ANSI_COLORS if fg else BG_ANSI_COLORS
+ if color in table:
+ return str(table[color])
+ elif isinstance(color, str):
+ r, g, b = self._color_name_to_rgb(color)
+ if self.color_depth == ColorDepth.DEPTH_24_BIT:
+ return f'{38 if fg else 48};2;{r};{g};{b}'
+ elif self.color_depth == ColorDepth.DEPTH_8_BIT:
+ return f'{38 if fg else 48};5;{_256_colors[(r, g, b)]}'
+ else:
+ code, name = (_16_fg_colors if fg else _16_bg_colors).get_code((r, g, b))
+ return str(code)
+ return ''
+
+ if fg_color:
+ result.append(color_to_code(fg_color, True))
+ if bg_color:
+ result.append(color_to_code(bg_color, False))
+
+ return result
def _get_size(fileno: int) ->tuple[int, int]:
@@ -187,7 +233,23 @@ def _get_size(fileno: int) ->tuple[int, int]:
:param fileno: stdout.fileno()
:returns: A (rows, cols) tuple.
"""
- pass
+ import fcntl
+ import termios
+ import struct
+
+ # Try to get the size using TIOCGWINSZ
+ try:
+ size = fcntl.ioctl(fileno, termios.TIOCGWINSZ, struct.pack('HHHH', 0, 0, 0, 0))
+ rows, cols, _, _ = struct.unpack('HHHH', size)
+ return rows, cols
+ except:
+ # Fallback to environment variables if ioctl fails
+ try:
+ return (int(os.environ.get('LINES', 25)),
+ int(os.environ.get('COLUMNS', 80)))
+ except:
+ # If all else fails, return a default size
+ return 25, 80
class Vt100_Output(Output):
@@ -233,7 +295,13 @@ class Vt100_Output(Output):
(This will take the dimensions by reading the pseudo
terminal attributes.)
"""
- pass
+ def get_size() ->Size:
+ rows, columns = _get_size(stdout.fileno())
+ return Size(rows=rows, columns=columns)
+
+ return cls(stdout, get_size, term=term,
+ default_color_depth=default_color_depth,
+ enable_bell=enable_bell)
def fileno(self) ->int:
"""Return file descriptor."""
@@ -317,11 +385,15 @@ class Vt100_Output(Output):
"""
Asks for a cursor position report (CPR).
"""
- pass
+ if self.enable_cpr:
+ self.write_raw('\x1b[6n')
+ self.flush()
def bell(self) ->None:
"""Sound bell."""
- pass
+ if self.enable_bell:
+ self.write_raw('\a')
+ self.flush()
def get_default_color_depth(self) ->ColorDepth:
"""
@@ -331,4 +403,16 @@ class Vt100_Output(Output):
We prefer 256 colors almost always, because this is what most terminals
support these days, and is a good default.
"""
- pass
+ if self.default_color_depth:
+ return self.default_color_depth
+
+ if self.term in ('linux', 'eterm-color'):
+ return ColorDepth.DEPTH_4_BIT
+
+ if '256color' in self.term:
+ return ColorDepth.DEPTH_8_BIT
+
+ if self.term in ('xterm', 'screen', 'vt100', 'vt220', 'rxvt-unicode'):
+ return ColorDepth.DEPTH_8_BIT
+
+ return ColorDepth.DEPTH_8_BIT
diff --git a/src/prompt_toolkit/output/win32.py b/src/prompt_toolkit/output/win32.py
index 15c3a7b8..988839f6 100644
--- a/src/prompt_toolkit/output/win32.py
+++ b/src/prompt_toolkit/output/win32.py
@@ -33,7 +33,7 @@ def _coord_byval(coord: COORD) ->c_long:
More info: http://msdn.microsoft.com/en-us/library/windows/desktop/ms686025(v=vs.85).aspx
"""
- pass
+ return c_long(coord.Y * 0x10000 | coord.X & 0xFFFF)
_DEBUG_RENDER_OUTPUT = False
@@ -83,66 +83,115 @@ class Win32Output(Output):
def fileno(self) ->int:
"""Return file descriptor."""
- pass
+ return self.stdout.fileno()
def encoding(self) ->str:
"""Return encoding used for stdout."""
- pass
+ return self.stdout.encoding
def write_raw(self, data: str) ->None:
"""For win32, there is no difference between write and write_raw."""
- pass
+ self._buffer.append(data)
def _winapi(self, func: Callable[..., _T], *a: object, **kw: object) ->_T:
"""
Flush and call win API function.
"""
- pass
+ self.flush()
+ return func(*a, **kw)
def get_win32_screen_buffer_info(self) ->CONSOLE_SCREEN_BUFFER_INFO:
"""
Return Screen buffer info.
"""
- pass
+ info = CONSOLE_SCREEN_BUFFER_INFO()
+ success = self._winapi(windll.kernel32.GetConsoleScreenBufferInfo,
+ self.hconsole, byref(info))
+ if success:
+ return info
+ else:
+ raise NoConsoleScreenBufferError
def set_title(self, title: str) ->None:
"""
Set terminal title.
"""
- pass
+ self._winapi(windll.kernel32.SetConsoleTitleW, title)
def erase_end_of_line(self) ->None:
- """"""
- pass
+ """Erase from the current cursor position to the end of the line."""
+ info = self.get_win32_screen_buffer_info()
+ if info:
+ size = info.dwSize
+ cursor_pos = info.dwCursorPosition
+ length = size.X - cursor_pos.X
+ cells_written = c_ulong()
+ self._winapi(windll.kernel32.FillConsoleOutputCharacterA,
+ self.hconsole, c_char(b' '), length,
+ _coord_byval(cursor_pos), byref(cells_written))
+ self._winapi(windll.kernel32.FillConsoleOutputAttribute,
+ self.hconsole, info.wAttributes, length,
+ _coord_byval(cursor_pos), byref(cells_written))
def reset_attributes(self) ->None:
"""Reset the console foreground/background color."""
- pass
+ self._winapi(windll.kernel32.SetConsoleTextAttribute,
+ self.hconsole, self.default_attrs)
def flush(self) ->None:
"""
Write to output stream and flush.
"""
- pass
+ if not self._buffer:
+ return
+
+ data = ''.join(self._buffer)
+ self._buffer = []
+
+ if _DEBUG_RENDER_OUTPUT:
+ self.LOG.write(data.encode('utf-8', 'replace'))
+ self.LOG.flush()
+
+ self.stdout.write(data)
+ self.stdout.flush()
def scroll_buffer_to_prompt(self) ->None:
"""
To be called before drawing the prompt. This should scroll the console
to left, with the cursor at the bottom (if possible).
"""
- pass
+ info = self.get_win32_screen_buffer_info()
+ if info:
+ sr = SMALL_RECT(
+ Left=0,
+ Top=info.srWindow.Top,
+ Right=info.dwSize.X - 1,
+ Bottom=info.dwSize.Y - 1,
+ )
+ self._winapi(windll.kernel32.SetConsoleWindowInfo,
+ self.hconsole, True, byref(sr))
+
+ cursor_pos = COORD(x=0, y=info.dwSize.Y - 1)
+ self._winapi(windll.kernel32.SetConsoleCursorPosition,
+ self.hconsole, _coord_byval(cursor_pos))
def enter_alternate_screen(self) ->None:
"""
Go to alternate screen buffer.
"""
- pass
+ if not self._in_alternate_screen:
+ self._in_alternate_screen = True
+ self._winapi(windll.kernel32.SetConsoleActiveScreenBuffer,
+ self.hconsole)
def quit_alternate_screen(self) ->None:
"""
Make stdout again the active buffer.
"""
- pass
+ if self._in_alternate_screen:
+ self._in_alternate_screen = False
+ self._winapi(windll.kernel32.SetConsoleActiveScreenBuffer,
+ HANDLE(windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE)))
@classmethod
def win32_refresh_window(cls) ->None:
@@ -153,7 +202,8 @@ class Win32Output(Output):
for completion menus. When the menu disappears, it leaves traces due
to a bug in the Windows Console. Sending a repaint request solves it.
"""
- pass
+ hconsole = HANDLE(windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE))
+ windll.user32.InvalidateRect(hconsole, None, True)
def get_default_color_depth(self) ->ColorDepth:
"""
@@ -162,7 +212,14 @@ class Win32Output(Output):
Contrary to the Vt100 implementation, this doesn't depend on a $TERM
variable.
"""
- pass
+ if self.default_color_depth is not None:
+ return self.default_color_depth
+
+ # Windows 10 supports true color.
+ if sys.getwindowsversion().build >= 14393:
+ return ColorDepth.DEPTH_24_BIT
+
+ return ColorDepth.DEPTH_4_BIT
class FOREGROUND_COLOR:
@@ -192,7 +249,25 @@ class BACKGROUND_COLOR:
def _create_ansi_color_dict(color_cls: (type[FOREGROUND_COLOR] | type[
BACKGROUND_COLOR])) ->dict[str, int]:
"""Create a table that maps the 16 named ansi colors to their Windows code."""
- pass
+ return {
+ 'ansidefault': color_cls.GRAY,
+ 'ansiblack': color_cls.BLACK,
+ 'ansired': color_cls.RED,
+ 'ansigreen': color_cls.GREEN,
+ 'ansiyellow': color_cls.YELLOW,
+ 'ansiblue': color_cls.BLUE,
+ 'ansimagenta': color_cls.MAGENTA,
+ 'ansicyan': color_cls.CYAN,
+ 'ansigray': color_cls.GRAY,
+ 'ansibrightblack': color_cls.BLACK | color_cls.INTENSITY,
+ 'ansibrightred': color_cls.RED | color_cls.INTENSITY,
+ 'ansibrightgreen': color_cls.GREEN | color_cls.INTENSITY,
+ 'ansibrightyellow': color_cls.YELLOW | color_cls.INTENSITY,
+ 'ansibrightblue': color_cls.BLUE | color_cls.INTENSITY,
+ 'ansibrightmagenta': color_cls.MAGENTA | color_cls.INTENSITY,
+ 'ansibrightcyan': color_cls.CYAN | color_cls.INTENSITY,
+ 'ansiwhite': color_cls.GRAY | color_cls.INTENSITY,
+ }
FG_ANSI_COLORS = _create_ansi_color_dict(FOREGROUND_COLOR)
diff --git a/src/prompt_toolkit/output/windows10.py b/src/prompt_toolkit/output/windows10.py
index 8ed52363..57b585e5 100644
--- a/src/prompt_toolkit/output/windows10.py
+++ b/src/prompt_toolkit/output/windows10.py
@@ -34,7 +34,8 @@ class Windows10_Output:
"""
Write to output stream and flush.
"""
- pass
+ self.win32_output.flush()
+ self.vt100_output.flush()
def __getattr__(self, name: str) ->Any:
if name in ('get_size', 'get_rows_below_cursor_position',
@@ -52,7 +53,9 @@ class Windows10_Output:
Contrary to the Vt100 implementation, this doesn't depend on a $TERM
variable.
"""
- pass
+ if self.default_color_depth is not None:
+ return self.default_color_depth
+ return self.win32_output.get_default_color_depth()
Output.register(Windows10_Output)
@@ -63,4 +66,13 @@ def is_win_vt100_enabled() ->bool:
Returns True when we're running Windows and VT100 escape sequences are
supported.
"""
- pass
+ if sys.platform != 'win32':
+ return False
+
+ hconsole = HANDLE(windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE))
+ mode = DWORD()
+
+ if windll.kernel32.GetConsoleMode(hconsole, byref(mode)):
+ return bool(mode.value & ENABLE_VIRTUAL_TERMINAL_PROCESSING)
+
+ return False
diff --git a/src/prompt_toolkit/patch_stdout.py b/src/prompt_toolkit/patch_stdout.py
index c07354d2..c2e3b761 100644
--- a/src/prompt_toolkit/patch_stdout.py
+++ b/src/prompt_toolkit/patch_stdout.py
@@ -48,7 +48,14 @@ def patch_stdout(raw: bool=False) ->Generator[None, None, None]:
:param raw: (`bool`) When True, vt100 terminal escape sequences are not
removed/escaped.
"""
- pass
+ original_stdout = sys.stdout
+ try:
+ proxy = StdoutProxy(raw=raw)
+ sys.stdout = proxy
+ yield
+ finally:
+ proxy.close()
+ sys.stdout = original_stdout
class _Done:
@@ -98,14 +105,21 @@ class StdoutProxy:
This will terminate the write thread, make sure everything is flushed
and wait for the write thread to finish.
"""
- pass
+ if not self.closed:
+ self._flush_queue.put(_Done())
+ self._flush_thread.join()
+ self.flush()
+ self.closed = True
def _get_app_loop(self) ->(asyncio.AbstractEventLoop | None):
"""
Return the event loop for the application currently running in our
`AppSession`.
"""
- pass
+ app = self.app_session.app
+ if app and app.is_running:
+ return app.loop
+ return None
def _write_and_flush(self, loop: (asyncio.AbstractEventLoop | None),
text: str) ->None:
@@ -113,7 +127,14 @@ class StdoutProxy:
Write the given text to stdout and flush.
If an application is running, use `run_in_terminal`.
"""
- pass
+ def write_and_flush() ->None:
+ self._output.write_raw(text)
+ self._output.flush()
+
+ if loop is not None:
+ run_in_terminal(write_and_flush, in_executor=True)
+ else:
+ write_and_flush()
def _write(self, data: str) ->None:
"""
@@ -126,10 +147,27 @@ class StdoutProxy:
command line. Therefor, we have a little buffer which holds the
text until a newline is written to stdout.
"""
- pass
+ if not self.raw:
+ data = data.replace('\x1b', '?')
+
+ if '\n' in data:
+ # When there's a newline in the data, write everything before the
+ # newline, including the newline itself.
+ before, after = data.rsplit('\n', 1)
+ to_write = ''.join(self._buffer) + before + '\n'
+ self._buffer = [after]
+
+ if to_write:
+ self._flush_queue.put(to_write)
+ else:
+ # Otherwise, cache in buffer.
+ self._buffer.append(data)
def flush(self) ->None:
"""
Flush buffered output.
"""
- pass
+ if self._buffer:
+ data = ''.join(self._buffer)
+ self._buffer = []
+ self._flush_queue.put(data)
diff --git a/src/prompt_toolkit/renderer.py b/src/prompt_toolkit/renderer.py
index d60c37f7..497c6762 100644
--- a/src/prompt_toolkit/renderer.py
+++ b/src/prompt_toolkit/renderer.py
@@ -47,7 +47,56 @@ def _output_screen_diff(app: Application[Any], output: Output, screen:
:param width: The width of the terminal.
:param previous_width: The width of the terminal during the last rendering.
"""
- pass
+ width = size.columns
+
+ # Create locals for faster access
+ write = output.write
+ write_position = output.set_cursor_position
+ move_cursor = output.move_cursor
+
+ # Skip first empty lines. (This is a performance optimization.)
+ if previous_screen is None:
+ row = 0
+ else:
+ row = 0
+ while row < screen.height and row < previous_screen.height and screen.data_buffer[row] == previous_screen.data_buffer[row]:
+ row += 1
+
+ # Render output
+ for y in range(row, screen.height):
+ row = screen.data_buffer[y]
+ previous_row = previous_screen.data_buffer[y] if previous_screen and y < previous_screen.height else None
+
+ col = 0
+ while col < width:
+ char = row[col]
+ char_width = char.width or 1
+
+ if previous_row:
+ previous_char = previous_row[col]
+ if char == previous_char:
+ col += char_width
+ continue
+
+ current_pos = Point(x=col, y=y)
+ write_position(current_pos.x, current_pos.y)
+ style = char.style
+
+ if style != last_style:
+ attrs = attrs_for_style_string[style]
+ output.set_attributes(attrs, color_depth)
+ last_style = style
+
+ write(char.char)
+ col += char_width
+
+ # Move cursor
+ if is_done:
+ write_position(screen.cursor_position.x, screen.cursor_position.y)
+ else:
+ move_cursor(screen.cursor_position.x - current_pos.x, screen.cursor_position.y - current_pos.y)
+
+ return screen.cursor_position, last_style
class HeightIsUnknownError(Exception):
@@ -142,7 +191,7 @@ class Renderer:
The `Screen` class that was generated during the last rendering.
This can be `None`.
"""
- pass
+ return getattr(self, '_last_rendered_screen', None)
@property
def height_is_known(self) ->bool:
@@ -151,14 +200,17 @@ class Renderer:
is known. (It's often nicer to draw bottom toolbars only if the height
is known, in order to avoid flickering when the CPR response arrives.)
"""
- pass
+ return self.cpr_support != CPR_Support.UNKNOWN and not self.waiting_for_cpr
@property
def rows_above_layout(self) ->int:
"""
Return the number of rows visible in the terminal above the layout.
"""
- pass
+ if self.cpr_support == CPR_Support.SUPPORTED:
+ return self._rows_above_layout
+ else:
+ return 0
def request_absolute_cursor_position(self) ->None:
"""
@@ -171,14 +223,26 @@ class Renderer:
For vt100: Do CPR request. (answer will arrive later.)
For win32: Do API call. (Answer comes immediately.)
"""
- pass
+ if self.cpr_support != CPR_Support.NOT_SUPPORTED:
+ self.output.get_cursor_position()
+ self.cpr_support = CPR_Support.SUPPORTED
+
+ if self.cpr_support == CPR_Support.NOT_SUPPORTED:
+ if self.cpr_not_supported_callback:
+ self.cpr_not_supported_callback()
def report_absolute_cursor_row(self, row: int) ->None:
"""
To be called when we know the absolute cursor position.
(As an answer of a "Cursor Position Request" response.)
"""
- pass
+ self._rows_above_layout = row - 1
+
+ # Resolve future.
+ if self._waiting_for_cpr_futures:
+ for f in self._waiting_for_cpr_futures:
+ f.set_result(None)
+ self._waiting_for_cpr_futures = deque()
@property
def waiting_for_cpr(self) ->bool:
@@ -186,13 +250,14 @@ class Renderer:
Waiting for CPR flag. True when we send the request, but didn't got a
response.
"""
- pass
+ return bool(self._waiting_for_cpr_futures)
async def wait_for_cpr_responses(self, timeout: int=1) ->None:
"""
Wait for a CPR response.
"""
- pass
+ if self._waiting_for_cpr_futures:
+ await wait(list(self._waiting_for_cpr_futures), timeout=timeout)
def render(self, app: Application[Any], layout: Layout, is_done: bool=False
) ->None:
@@ -202,7 +267,79 @@ class Renderer:
:param is_done: When True, put the cursor at the end of the interface. We
won't print any changes to this part.
"""
- pass
+ output = self.output
+ screen = layout.screen
+
+ if is_done:
+ self.request_absolute_cursor_position()
+
+ # Enter alternate screen.
+ if self.full_screen and not self._in_alternate_screen:
+ self._in_alternate_screen = True
+ output.enter_alternate_screen()
+
+ # Enable/disable mouse support.
+ needs_mouse_support = self.mouse_support()
+ if needs_mouse_support != self._mouse_support_enabled:
+ if needs_mouse_support:
+ output.enable_mouse_support()
+ else:
+ output.disable_mouse_support()
+ self._mouse_support_enabled = needs_mouse_support
+
+ # Enable bracketed paste.
+ if not self._bracketed_paste_enabled:
+ output.enable_bracketed_paste()
+ self._bracketed_paste_enabled = True
+
+ # Reset cursor key mode.
+ if not self._cursor_key_mode_reset:
+ output.reset_cursor_key_mode()
+ self._cursor_key_mode_reset = True
+
+ # Create new style transformation.
+ style_transformation = app.style_transformation or DummyStyleTransformation()
+
+ # Create new Cache objects.
+ style_hash = hash((app.style, style_transformation))
+ color_depth = output.get_default_color_depth()
+
+ if (style_hash != self._last_style_hash or
+ color_depth != self._last_color_depth):
+ self._attrs_for_style = _StyleStringToAttrsCache(
+ get_attrs_for_style_str=lambda style_str: app.style.get_attrs_for_style_str(style_str),
+ style_transformation=style_transformation)
+ self._style_string_has_style = _StyleStringHasStyleCache(self._attrs_for_style)
+
+ self._last_style_hash = style_hash
+ self._last_color_depth = color_depth
+
+ # Render to screen.
+ size = output.get_size()
+ if self.full_screen:
+ screen.resize(size)
+ else:
+ screen.resize(Size(rows=size.rows, columns=size.columns))
+
+ # Calculate the difference between this and the previous screen.
+ current_pos, last_style = _output_screen_diff(
+ app,
+ output,
+ screen,
+ current_pos=Point(0, 0),
+ color_depth=color_depth,
+ previous_screen=self._last_rendered_screen,
+ last_style=None,
+ is_done=is_done,
+ full_screen=self.full_screen,
+ attrs_for_style_string=self._attrs_for_style,
+ style_string_has_style=self._style_string_has_style,
+ size=size,
+ previous_width=(self._last_rendered_screen.width
+ if self._last_rendered_screen else 0))
+
+ output.flush()
+ self._last_rendered_screen = screen
def erase(self, leave_alternate_screen: bool=True) ->None:
"""
@@ -213,13 +350,30 @@ class Renderer:
:param leave_alternate_screen: When True, and when inside an alternate
screen buffer, quit the alternate screen.
"""
- pass
+ output = self.output
+
+ output.erase_screen()
+ output.reset_attributes()
+ output.disable_mouse_support()
+ output.disable_bracketed_paste()
+ output.reset_cursor_key_mode()
+
+ if leave_alternate_screen and self._in_alternate_screen:
+ output.quit_alternate_screen()
+ self._in_alternate_screen = False
+
+ self._last_rendered_screen = None
+ output.flush()
def clear(self) ->None:
"""
Clear screen and go to 0,0
"""
- pass
+ output = self.output
+
+ output.erase_screen()
+ output.cursor_goto(0, 0)
+ output.flush()
def print_formatted_text(output: Output, formatted_text: AnyFormattedText,
diff --git a/src/prompt_toolkit/search.py b/src/prompt_toolkit/search.py
index 60c9f64a..a6597ca3 100644
--- a/src/prompt_toolkit/search.py
+++ b/src/prompt_toolkit/search.py
@@ -71,28 +71,68 @@ def start_search(buffer_control: (BufferControl | None)=None, direction:
:param buffer_control: Start search for this `BufferControl`. If not given,
search through the current control.
"""
- pass
+ app = get_app()
+
+ if buffer_control is None:
+ buffer_control = app.layout.current_control
+
+ search_control = _get_reverse_search_links(app.layout).get(buffer_control)
+
+ if search_control:
+ buffer_control.search_state = SearchState(direction=direction)
+ app.layout.focus(search_control)
+ search_control.buffer.reset()
def stop_search(buffer_control: (BufferControl | None)=None) ->None:
"""
Stop search through the given `buffer_control`.
"""
- pass
+ app = get_app()
+
+ if buffer_control is None:
+ buffer_control = app.layout.current_control
+
+ if buffer_control.search_state:
+ buffer_control.search_state = None
+ app.layout.focus(buffer_control)
def do_incremental_search(direction: SearchDirection, count: int=1) ->None:
"""
Apply search, but keep search buffer focused.
"""
- pass
+ app = get_app()
+ search_control = app.layout.current_control
+
+ if isinstance(search_control, SearchBufferControl):
+ buffer_control = search_control.buffer_control
+ if buffer_control and buffer_control.search_state:
+ buffer_control.search_state.direction = direction
+
+ for _ in range(count):
+ buffer_control.search(buffer_control.search_state.text,
+ direction,
+ count=1,
+ include_current_position=False)
def accept_search() ->None:
"""
Accept current search query. Focus original `BufferControl` again.
"""
- pass
+ app = get_app()
+ search_control = app.layout.current_control
+
+ if isinstance(search_control, SearchBufferControl):
+ buffer_control = search_control.buffer_control
+ if buffer_control:
+ app.layout.focus(buffer_control)
+
+ # If we're in Vi mode and in navigation mode, go back to
+ # insert mode.
+ if app.vi_state.input_mode == InputMode.NAVIGATION:
+ app.vi_state.input_mode = InputMode.INSERT
def _get_reverse_search_links(layout: Layout) ->dict[BufferControl,
@@ -100,4 +140,8 @@ def _get_reverse_search_links(layout: Layout) ->dict[BufferControl,
"""
Return mapping from BufferControl to SearchBufferControl.
"""
- pass
+ result = {}
+ for search_control in layout.find_all_controls(lambda c: isinstance(c, SearchBufferControl)):
+ if search_control.buffer_control:
+ result[search_control.buffer_control] = search_control
+ return result
diff --git a/src/prompt_toolkit/shortcuts/dialogs.py b/src/prompt_toolkit/shortcuts/dialogs.py
index 1ae0d91d..0951f225 100644
--- a/src/prompt_toolkit/shortcuts/dialogs.py
+++ b/src/prompt_toolkit/shortcuts/dialogs.py
@@ -30,7 +30,33 @@ def yes_no_dialog(title: AnyFormattedText='', text: AnyFormattedText='',
Display a Yes/No dialog.
Return a boolean.
"""
- pass
+ def yes_handler() -> None:
+ get_app().exit(result=True)
+
+ def no_handler() -> None:
+ get_app().exit(result=False)
+
+ dialog = Dialog(
+ title=title,
+ body=Label(text=text, dont_extend_height=True),
+ buttons=[
+ Button(text=yes_text, handler=yes_handler),
+ Button(text=no_text, handler=no_handler),
+ ],
+ with_background=True,
+ )
+
+ return Application(
+ layout=Layout(dialog),
+ key_bindings=merge_key_bindings([
+ load_key_bindings(),
+ focus_next,
+ focus_previous,
+ ]),
+ mouse_support=True,
+ style=style,
+ full_screen=True,
+ )
_T = TypeVar('_T')
@@ -43,7 +69,30 @@ def button_dialog(title: AnyFormattedText='', text: AnyFormattedText='',
Display a dialog with button choices (given as a list of tuples).
Return the value associated with button.
"""
- pass
+ def button_handler(value: _T) -> None:
+ get_app().exit(result=value)
+
+ dialog = Dialog(
+ title=title,
+ body=Label(text=text, dont_extend_height=True),
+ buttons=[
+ Button(text=button_text, handler=functools.partial(button_handler, value))
+ for button_text, value in buttons
+ ],
+ with_background=True,
+ )
+
+ return Application(
+ layout=Layout(dialog),
+ key_bindings=merge_key_bindings([
+ load_key_bindings(),
+ focus_next,
+ focus_previous,
+ ]),
+ mouse_support=True,
+ style=style,
+ full_screen=True,
+ )
def input_dialog(title: AnyFormattedText='', text: AnyFormattedText='',
@@ -54,7 +103,51 @@ def input_dialog(title: AnyFormattedText='', text: AnyFormattedText='',
Display a text input box.
Return the given text, or None when cancelled.
"""
- pass
+ def accept(buf: Buffer) -> bool:
+ get_app().layout.focus(ok_button)
+ return True
+
+ def ok_handler() -> None:
+ get_app().exit(result=textfield.text)
+
+ def cancel_handler() -> None:
+ get_app().exit(result=None)
+
+ textfield = TextArea(
+ completer=completer,
+ validator=validator,
+ password=password,
+ multiline=False,
+ width=D(preferred=40),
+ accept_handler=accept,
+ default=default,
+ )
+
+ ok_button = Button(text=ok_text, handler=ok_handler)
+ cancel_button = Button(text=cancel_text, handler=cancel_handler)
+
+ dialog = Dialog(
+ title=title,
+ body=HSplit([
+ Label(text=text, dont_extend_height=True),
+ textfield,
+ ValidationToolbar(),
+ ]),
+ buttons=[ok_button, cancel_button],
+ with_background=True,
+ )
+
+ return Application(
+ layout=Layout(dialog),
+ key_bindings=merge_key_bindings([
+ load_key_bindings(),
+ focus_next,
+ focus_previous,
+ ]),
+ mouse_support=True,
+ style=style,
+ full_screen=True,
+ )
def message_dialog(title: AnyFormattedText='', text: AnyFormattedText='',
@@ -62,7 +155,27 @@ def message_dialog(title: AnyFormattedText='', text: AnyFormattedText='',
"""
Display a simple message box and wait until the user presses enter.
"""
- pass
+ def ok_handler() -> None:
+ get_app().exit(result=None)
+
+ dialog = Dialog(
+ title=title,
+ body=Label(text=text, dont_extend_height=True),
+ buttons=[Button(text=ok_text, handler=ok_handler)],
+ with_background=True,
+ )
+
+ return Application(
+ layout=Layout(dialog),
+ key_bindings=merge_key_bindings([
+ load_key_bindings(),
+ focus_next,
+ focus_previous,
+ ]),
+ mouse_support=True,
+ style=style,
+ full_screen=True,
+ )
def radiolist_dialog(title: AnyFormattedText='', text: AnyFormattedText='',
@@ -75,7 +188,38 @@ def radiolist_dialog(title: AnyFormattedText='', text: AnyFormattedText='',
Only one element can be selected at a time using Arrow keys and Enter.
The focus can be moved between the list and the Ok/Cancel button with tab.
"""
- pass
+ def ok_handler() -> None:
+ get_app().exit(result=radio_list.current_value)
+
+ def cancel_handler() -> None:
+ get_app().exit(result=None)
+
+ radio_list = RadioList(values or [], default=default)
+
+ dialog = Dialog(
+ title=title,
+ body=HSplit([
+ Label(text=text, dont_extend_height=True),
+ radio_list,
+ ]),
+ buttons=[
+ Button(text=ok_text, handler=ok_handler),
+ Button(text=cancel_text, handler=cancel_handler),
+ ],
+ with_background=True,
+ )
+
+ return Application(
+ layout=Layout(dialog),
+ key_bindings=merge_key_bindings([
+ load_key_bindings(),
+ focus_next,
+ focus_previous,
+ ]),
+ mouse_support=True,
+ style=style,
+ full_screen=True,
+ )
def checkboxlist_dialog(title: AnyFormattedText='', text: AnyFormattedText=
@@ -88,7 +232,38 @@ def checkboxlist_dialog(title: AnyFormattedText='', text: AnyFormattedText=
Several elements can be selected at a time using Arrow keys and Enter.
The focus can be moved between the list and the Ok/Cancel button with tab.
"""
- pass
+ def ok_handler() -> None:
+ get_app().exit(result=checkbox_list.current_values)
+
+ def cancel_handler() -> None:
+ get_app().exit(result=None)
+
+ checkbox_list = CheckboxList(values or [], default_values=default_values or [])
+
+ dialog = Dialog(
+ title=title,
+ body=HSplit([
+ Label(text=text, dont_extend_height=True),
+ checkbox_list,
+ ]),
+ buttons=[
+ Button(text=ok_text, handler=ok_handler),
+ Button(text=cancel_text, handler=cancel_handler),
+ ],
+ with_background=True,
+ )
+
+ return Application(
+ layout=Layout(dialog),
+ key_bindings=merge_key_bindings([
+ load_key_bindings(),
+ focus_next,
+ focus_previous,
+ ]),
+ mouse_support=True,
+ style=style,
+ full_screen=True,
+ )
def progress_dialog(title: AnyFormattedText='', text: AnyFormattedText='',
@@ -98,9 +273,51 @@ def progress_dialog(title: AnyFormattedText='', text: AnyFormattedText='',
:param run_callback: A function that receives as input a `set_percentage`
function and it does the work.
"""
- pass
+ progressbar = ProgressBar()
+ text_area = TextArea(
+ focusable=False,
+ multiline=True,
+ width=D(preferred=40),
+ height=D(preferred=3),
+ )
+
+ dialog = Dialog(
+ title=title,
+ body=HSplit([
+ Label(text=text, dont_extend_height=True),
+ Box(progressbar, padding=1),
+ text_area,
+ ]),
+ with_background=True,
+ )
+
+ app = Application(
+ layout=Layout(dialog),
+ key_bindings=load_key_bindings(),
+ mouse_support=True,
+ style=style,
+ full_screen=True,
+ )
+
+ def set_percentage(value: int) -> None:
+ progressbar.percentage = value
+ app.invalidate()
+
+ def set_text(text: str) -> None:
+ text_area.text = text
+ app.invalidate()
+
+ async def run_in_executor() -> None:
+ await run_in_executor_with_context(
+ run_callback, set_percentage, set_text
+ )
+ app.exit()
+
+ app.after_create = lambda: get_running_loop().create_task(run_in_executor())
+
+ return app
def _return_none() ->None:
"""Button handler that returns None."""
- pass
+ get_app().exit(result=None)
diff --git a/src/prompt_toolkit/shortcuts/progress_bar/base.py b/src/prompt_toolkit/shortcuts/progress_bar/base.py
index 3cc8cbce..fe2689db 100644
--- a/src/prompt_toolkit/shortcuts/progress_bar/base.py
+++ b/src/prompt_toolkit/shortcuts/progress_bar/base.py
@@ -41,7 +41,15 @@ def create_key_bindings(cancel_callback: (Callable[[], None] | None)
Key bindings handled by the progress bar.
(The main thread is not supposed to handle any key bindings.)
"""
- pass
+ kb = KeyBindings()
+
+ @kb.add('c-c')
+ def _(event: E) -> None:
+ " Abort when Control-C has been pressed. "
+ if cancel_callback is not None:
+ cancel_callback()
+
+ return kb
_T = TypeVar('_T')
@@ -215,7 +223,9 @@ class ProgressBarCounter(Generic[_CounterItem]):
(Can be called manually in case we don't have a collection to loop through.)
"""
- pass
+ self.items_completed += 1
+ if self.total is not None and self.items_completed >= self.total:
+ self.done = True
@property
def done(self) ->bool:
@@ -227,7 +237,15 @@ class ProgressBarCounter(Generic[_CounterItem]):
Contrast this with stopped. A stopped counter may be terminated before
100% completion. A done counter has reached its 100% completion.
"""
- pass
+ return self._done
+
+ @done.setter
+ def done(self, value: bool) -> None:
+ self._done = value
+ if value:
+ self.stopped = True
+ if self.remove_when_done:
+ self.progress_bar.counters.remove(self)
@property
def stopped(self) ->bool:
@@ -245,18 +263,35 @@ class ProgressBarCounter(Generic[_CounterItem]):
Contrast this with done. A done counter has reached its 100% completion.
A stopped counter may be terminated before 100% completion.
"""
- pass
+ return self.stop_time is not None
+
+ @stopped.setter
+ def stopped(self, value: bool) -> None:
+ if value and self.stop_time is None:
+ self.stop_time = datetime.datetime.now()
+ elif not value:
+ self.stop_time = None
@property
def time_elapsed(self) ->datetime.timedelta:
"""
Return how much time has been elapsed since the start.
"""
- pass
+ if self.stop_time is not None:
+ return self.stop_time - self.start_time
+ return datetime.datetime.now() - self.start_time
@property
def time_left(self) ->(datetime.timedelta | None):
"""
Timedelta representing the time left.
"""
- pass
+ if self.total is None or self.items_completed == 0:
+ return None
+
+ elapsed = self.time_elapsed
+ rate = self.items_completed / elapsed.total_seconds()
+ remaining_items = self.total - self.items_completed
+ seconds_left = remaining_items / rate
+
+ return datetime.timedelta(seconds=int(seconds_left))
diff --git a/src/prompt_toolkit/shortcuts/progress_bar/formatters.py b/src/prompt_toolkit/shortcuts/progress_bar/formatters.py
index 8ff11a85..22f4b3c5 100644
--- a/src/prompt_toolkit/shortcuts/progress_bar/formatters.py
+++ b/src/prompt_toolkit/shortcuts/progress_bar/formatters.py
@@ -87,7 +87,14 @@ def _format_timedelta(timedelta: datetime.timedelta) ->str:
"""
Return hh:mm:ss, or mm:ss if the amount of hours is zero.
"""
- pass
+ total_seconds = int(timedelta.total_seconds())
+ hours, remainder = divmod(total_seconds, 3600)
+ minutes, seconds = divmod(remainder, 60)
+
+ if hours > 0:
+ return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
+ else:
+ return f"{minutes:02d}:{seconds:02d}"
class TimeElapsed(Formatter):
@@ -124,7 +131,20 @@ def _hue_to_rgb(hue: float) ->tuple[int, int, int]:
"""
Take hue between 0 and 1, return (r, g, b).
"""
- pass
+ hue *= 6
+ x = 1 - abs((hue % 2) - 1)
+ if hue < 1:
+ return (255, int(x * 255), 0)
+ elif hue < 2:
+ return (int(x * 255), 255, 0)
+ elif hue < 3:
+ return (0, 255, int(x * 255))
+ elif hue < 4:
+ return (0, int(x * 255), 255)
+ elif hue < 5:
+ return (int(x * 255), 0, 255)
+ else:
+ return (255, 0, int(x * 255))
class Rainbow(Formatter):
@@ -142,4 +162,17 @@ def create_default_formatters() ->list[Formatter]:
"""
Return the list of default formatters.
"""
- pass
+ return [
+ Label(),
+ Text(' '),
+ Percentage(),
+ Text(' '),
+ Bar(),
+ Text(' '),
+ Progress(),
+ Text(' '),
+ Text('eta [', style='class:time-left'),
+ TimeLeft(),
+ Text(']', style='class:time-left'),
+ Text(' '),
+ ]
diff --git a/src/prompt_toolkit/shortcuts/prompt.py b/src/prompt_toolkit/shortcuts/prompt.py
index 452ef1a4..e4df47d0 100644
--- a/src/prompt_toolkit/shortcuts/prompt.py
+++ b/src/prompt_toolkit/shortcuts/prompt.py
@@ -81,7 +81,22 @@ def _split_multiline_prompt(get_prompt_text: _StyleAndTextTuplesCallable
returns the fragments to be shown on the lines above the input; and another
one with the fragments to be shown at the first line of the input.
"""
- pass
+ def has_multiple_lines() -> bool:
+ return '\n' in fragment_list_to_text(get_prompt_text())
+
+ def before():
+ fragments = get_prompt_text()
+ if has_multiple_lines():
+ return fragments[:-1]
+ return []
+
+ def first_input_line():
+ fragments = get_prompt_text()
+ if has_multiple_lines():
+ return fragments[-1:]
+ return fragments
+
+ return has_multiple_lines, before, first_input_line
class _RPrompt(Window):
@@ -306,32 +321,273 @@ class PromptSession(Generic[_T]):
This returns something that can be used as either a `Filter`
or `Filter`.
"""
- pass
+ @Condition
+ def dynamic() -> bool:
+ value = getattr(self, attr_name)
+ return to_filter(value)()
+ return dynamic
def _create_default_buffer(self) ->Buffer:
"""
Create and return the default input buffer.
"""
- pass
+ return Buffer(
+ name=DEFAULT_BUFFER,
+ complete_while_typing=self._dyncond('complete_while_typing'),
+ validate_while_typing=self._dyncond('validate_while_typing'),
+ enable_history_search=self._dyncond('enable_history_search'),
+ completer=DynamicCompleter(lambda: self.completer),
+ history=self.history,
+ validator=DynamicValidator(lambda: self.validator),
+ auto_suggest=DynamicAutoSuggest(lambda: self.auto_suggest),
+ accept_handler=self._accept_handler,
+ tempfile_suffix=lambda: to_str(self.tempfile_suffix or ''),
+ tempfile=lambda: to_str(self.tempfile or ''),
+ )
def _create_layout(self) ->Layout:
"""
Create `Layout` for this prompt.
"""
- pass
+ # Create processors list.
+ processors = [
+ ConditionalProcessor(
+ AppendAutoSuggestion(), has_focus(DEFAULT_BUFFER) & ~is_done
+ ),
+ ConditionalProcessor(
+ HighlightIncrementalSearchProcessor(),
+ has_focus(SEARCH_BUFFER),
+ ),
+ PasswordProcessor(),
+ BeforeInput(lambda: self._get_prompt()),
+ AfterInput(lambda: self._get_rprompt()),
+ ]
+
+ if self.input_processors:
+ processors.extend(self.input_processors)
+
+ # Create bottom toolbars.
+ bottom_toolbar = ConditionalContainer(
+ Window(
+ FormattedTextControl(lambda: self.bottom_toolbar),
+ style="class:bottom-toolbar.text",
+ ),
+ filter=Condition(lambda: self.bottom_toolbar is not None),
+ )
+
+ search_toolbar = SearchToolbar(
+ search_buffer=self.search_buffer,
+ ignore_case=self._dyncond("search_ignore_case"),
+ )
+
+ search_buffer_control = SearchBufferControl(
+ buffer=self.search_buffer,
+ input_processors=[ReverseSearchProcessor()],
+ ignore_case=self._dyncond("search_ignore_case"),
+ )
+
+ system_toolbar = SystemToolbar(
+ enable_global_bindings=self._dyncond("enable_system_prompt")
+ )
+
+ def get_search_buffer_control():
+ "Return the UIControl to be focused when searching start."
+ if is_true(self._dyncond("multiline")):
+ return search_toolbar.control
+ else:
+ return search_buffer_control
+
+ default_buffer_control = BufferControl(
+ buffer=self.default_buffer,
+ search_buffer_control=get_search_buffer_control,
+ input_processors=processors,
+ include_default_input_processors=False,
+ lexer=DynamicLexer(lambda: self.lexer),
+ preview_search=True,
+ )
+
+ default_buffer_window = Window(
+ default_buffer_control,
+ height=self._dyncond("multiline"),
+ get_line_prefix=partial(
+ self._get_line_prefix, get_prompt_text_2=self._get_prompt_text_2
+ ),
+ wrap_lines=self._dyncond("wrap_lines"),
+ )
+
+ @Condition
+ def multi_column_complete_style():
+ return self.complete_style == CompleteStyle.MULTI_COLUMN
+
+ # Build the layout.
+ layout = HSplit(
+ [
+ # The main input, with completion menus floating on top of it.
+ FloatContainer(
+ HSplit(
+ [
+ ConditionalContainer(
+ Window(
+ FormattedTextControl(self._get_prompt_text_1),
+ dont_extend_height=True,
+ ),
+ Condition(lambda: self._get_prompt_text_1() != ""),
+ ),
+ ConditionalContainer(
+ default_buffer_window,
+ Condition(lambda: not is_true(self._dyncond("multiline"))),
+ ),
+ ConditionalContainer(
+ HSplit(
+ [
+ default_buffer_window,
+ ValidationToolbar(),
+ system_toolbar,
+ search_toolbar,
+ ]
+ ),
+ Condition(lambda: is_true(self._dyncond("multiline"))),
+ ),
+ ]
+ ),
+ [
+ Float(
+ xcursor=True,
+ ycursor=True,
+ content=CompletionsMenu(
+ max_height=16,
+ scroll_offset=1,
+ extra_filter=has_focus(DEFAULT_BUFFER)
+ & ~multi_column_complete_style,
+ ),
+ ),
+ Float(
+ xcursor=True,
+ ycursor=True,
+ content=MultiColumnCompletionsMenu(
+ show_meta=True,
+ extra_filter=has_focus(DEFAULT_BUFFER)
+ & multi_column_complete_style,
+ ),
+ ),
+ ],
+ ),
+ ConditionalContainer(search_toolbar, ~is_true(self._dyncond("multiline"))),
+ bottom_toolbar,
+ ]
+ )
+
+ return Layout(layout, default_buffer_window)
def _create_application(self, editing_mode: EditingMode,
erase_when_done: bool) ->Application[_T]:
"""
Create the `Application` object.
"""
- pass
+ app = Application[_T](
+ layout=self.layout,
+ style=DynamicStyle(lambda: self.style),
+ include_default_pygments_style=self._dyncond(
+ "include_default_pygments_style"
+ ),
+ style_transformation=DynamicStyleTransformation(
+ lambda: self.style_transformation
+ ),
+ key_bindings=merge_key_bindings(
+ [
+ ConditionalKeyBindings(
+ self.key_bindings,
+ self._dyncond("enable_system_prompt")
+ | Condition(lambda: self.key_bindings is None),
+ ),
+ self._create_prompt_bindings(),
+ ]
+ ),
+ mouse_support=self._dyncond("mouse_support"),
+ editing_mode=editing_mode,
+ erase_when_done=erase_when_done,
+ reverse_vi_search_direction=True,
+ color_depth=self.color_depth,
+ cursor=DynamicCursorShapeConfig(lambda: self.cursor),
+ on_reset=self.on_reset,
+ on_render=self.on_render,
+ after_render=self.after_render,
+ input=self._input,
+ output=self._output,
+ )
+
+ # During render time, make sure that we focus the right buffer.
+ app.layout.focus_stack.push(DEFAULT_BUFFER)
+
+ return app
def _create_prompt_bindings(self) ->KeyBindings:
"""
Create the KeyBindings for a prompt application.
"""
- pass
+ kb = KeyBindings()
+ handle = kb.add
+ default_focused = has_focus(DEFAULT_BUFFER)
+
+ @Condition
+ def do_accept() -> bool:
+ return not is_done() and self.app.layout.has_focus(DEFAULT_BUFFER)
+
+ @handle("enter", filter=do_accept & ~(vi_navigation_mode & default_focused))
+ def _(event: E) -> None:
+ "Accept input when enter has been pressed."
+ self._accept_handler(event.current_buffer)
+
+ @Condition
+ def readline_complete_style() -> bool:
+ return self.complete_style == CompleteStyle.READLINE_LIKE
+
+ # Readline-style tab completion.
+ @handle("tab", filter=readline_complete_style & default_focused)
+ def _(event: E) -> None:
+ "Display completions (like Readline)."
+ display_completions_like_readline(event)
+
+ @handle("c-c", filter=default_focused)
+ def _(event: E) -> None:
+ "Abort when Control-C has been pressed."
+ event.app.exit(exception=KeyboardInterrupt, style="class:aborting")
+
+ @Condition
+ def ctrl_d_condition() -> bool:
+ """ Ctrl-D binding is only active when the default buffer is selected
+ and empty. """
+ app = get_app()
+ return (
+ app.current_buffer.name == DEFAULT_BUFFER
+ and not app.current_buffer.text
+ )
+
+ @handle("c-d", filter=ctrl_d_condition & default_focused)
+ def _(event: E) -> None:
+ "Exit when Control-D has been pressed."
+ event.app.exit(exception=EOFError, style="class:exiting")
+
+ suspend_supported = Condition(suspend_to_background_supported)
+
+ @Condition
+ def enable_open_in_editor() -> bool:
+ return bool(self.enable_open_in_editor)
+
+ @handle("c-x", "c-e", filter=~vi_mode & enable_open_in_editor & default_focused)
+ @handle("v", filter=vi_mode & enable_open_in_editor & default_focused)
+ def _(event: E) -> None:
+ "Open editor."
+ event.app.current_buffer.open_in_editor(event.app)
+
+ @handle("c-z", filter=suspend_supported)
+ def _(event: E) -> None:
+ """
+ Suspend process to background.
+ """
+ event.app.suspend_to_background()
+
+ return merge_key_bindings([kb, load_auto_suggest_bindings()])
def prompt(self, message: (AnyFormattedText | None)=None, *,
editing_mode: (EditingMode | None)=None, refresh_interval: (float |
diff --git a/src/prompt_toolkit/shortcuts/utils.py b/src/prompt_toolkit/shortcuts/utils.py
index 44028b38..3e5106d3 100644
--- a/src/prompt_toolkit/shortcuts/utils.py
+++ b/src/prompt_toolkit/shortcuts/utils.py
@@ -73,7 +73,39 @@ def print_formatted_text(*values: Any, sep: str=' ', end: str='\n', file: (
:param include_default_pygments_style: `bool`. Include the default Pygments
style when set to `True` (the default).
"""
- pass
+ # Create a merged style
+ merged_style = _create_merged_style(style, include_default_pygments_style)
+
+ # Convert all values to formatted text
+ fragments = []
+ for i, value in enumerate(values):
+ fragments.extend(to_formatted_text(value))
+ if i < len(values) - 1:
+ fragments.append(('', sep))
+
+ fragments.append(('', end))
+
+ # Get output object
+ if output is None:
+ if file is None:
+ file = get_app_session().output
+ output = create_output(stdout=file)
+
+ # Get color depth
+ if color_depth is None:
+ color_depth = ColorDepth.default()
+
+ # Create and print formatted text
+ formatted_text = FormattedText(fragments)
+ renderer_print_formatted_text(
+ output, formatted_text, merged_style,
+ color_depth=color_depth,
+ style_transformation=style_transformation,
+ )
+
+ # Flush the output
+ if flush:
+ output.flush()
def print_container(container: AnyContainer, file: (TextIO | None)=None,
@@ -88,7 +120,17 @@ def print_container(container: AnyContainer, file: (TextIO | None)=None,
print_container(
Frame(TextArea(text='Hello world!')))
"""
- pass
+ output = create_output(stdout=file)
+ merged_style = _create_merged_style(style, include_default_pygments_style)
+
+ app = Application(
+ layout=Layout(container=container),
+ output=output,
+ style=merged_style,
+ )
+
+ with app.output.capture_stdout():
+ app.run(in_thread=True)
def _create_merged_style(style: (BaseStyle | None),
@@ -96,25 +138,34 @@ def _create_merged_style(style: (BaseStyle | None),
"""
Merge user defined style with built-in style.
"""
- pass
+ styles = [default_ui_style()]
+ if include_default_pygments_style:
+ styles.append(default_pygments_style())
+ if style:
+ styles.append(style)
+ return merge_styles(styles)
def clear() ->None:
"""
Clear the screen.
"""
- pass
+ output = get_app_session().output
+ output.erase_screen()
+ output.cursor_goto(0, 0)
+ output.flush()
def set_title(text: str) ->None:
"""
Set the terminal title.
"""
- pass
+ output = get_app_session().output
+ output.set_title(text)
def clear_title() ->None:
"""
Erase the current title.
"""
- pass
+ set_title('')
diff --git a/src/prompt_toolkit/styles/base.py b/src/prompt_toolkit/styles/base.py
index a78d7080..47453d0a 100644
--- a/src/prompt_toolkit/styles/base.py
+++ b/src/prompt_toolkit/styles/base.py
@@ -63,7 +63,7 @@ class BaseStyle(metaclass=ABCMeta):
well as classnames (e.g. "class:title").
:param default: `Attrs` to be used if no styling was defined.
"""
- pass
+ raise NotImplementedError
@abstractproperty
def style_rules(self) ->list[tuple[str, str]]:
@@ -71,7 +71,7 @@ class BaseStyle(metaclass=ABCMeta):
The list of style rules, used to create this style.
(Required for `DynamicStyle` and `_MergedStyle` to work.)
"""
- pass
+ raise NotImplementedError
@abstractmethod
def invalidation_hash(self) ->Hashable:
@@ -80,13 +80,22 @@ class BaseStyle(metaclass=ABCMeta):
renderer knows that something in the style changed, and that everything
has to be redrawn.
"""
- pass
+ raise NotImplementedError
class DummyStyle(BaseStyle):
"""
A style that doesn't style anything.
"""
+ def get_attrs_for_style_str(self, style_str: str, default: Attrs=DEFAULT_ATTRS) ->Attrs:
+ return default
+
+ @property
+ def style_rules(self) ->list[tuple[str, str]]:
+ return []
+
+ def invalidation_hash(self) ->Hashable:
+ return None
class DynamicStyle(BaseStyle):
@@ -99,3 +108,16 @@ class DynamicStyle(BaseStyle):
def __init__(self, get_style: Callable[[], BaseStyle | None]):
self.get_style = get_style
self._dummy = DummyStyle()
+
+ def get_attrs_for_style_str(self, style_str: str, default: Attrs=DEFAULT_ATTRS) ->Attrs:
+ style = self.get_style() or self._dummy
+ return style.get_attrs_for_style_str(style_str, default)
+
+ @property
+ def style_rules(self) ->list[tuple[str, str]]:
+ style = self.get_style() or self._dummy
+ return style.style_rules
+
+ def invalidation_hash(self) ->Hashable:
+ style = self.get_style() or self._dummy
+ return (self.get_style, style.invalidation_hash())
diff --git a/src/prompt_toolkit/styles/defaults.py b/src/prompt_toolkit/styles/defaults.py
index 21bda00d..a2d7a8b4 100644
--- a/src/prompt_toolkit/styles/defaults.py
+++ b/src/prompt_toolkit/styles/defaults.py
@@ -102,7 +102,7 @@ def default_ui_style() ->BaseStyle:
"""
Create a default `Style` object.
"""
- pass
+ return Style(PROMPT_TOOLKIT_STYLE + COLORS_STYLE + WIDGETS_STYLE)
@memoized()
@@ -110,4 +110,4 @@ def default_pygments_style() ->Style:
"""
Create a `Style` object that contains the default Pygments style.
"""
- pass
+ return Style.from_dict(PYGMENTS_DEFAULT_STYLE)
diff --git a/src/prompt_toolkit/styles/pygments.py b/src/prompt_toolkit/styles/pygments.py
index 07c41282..a9a00fd4 100644
--- a/src/prompt_toolkit/styles/pygments.py
+++ b/src/prompt_toolkit/styles/pygments.py
@@ -29,7 +29,8 @@ def style_from_pygments_cls(pygments_style_cls: type[PygmentsStyle]) ->Style:
:param pygments_style_cls: Pygments style class to start from.
"""
- pass
+ style_dict = pygments_style_cls.styles.copy()
+ return style_from_pygments_dict(style_dict)
def style_from_pygments_dict(pygments_dict: dict[Token, str]) ->Style:
@@ -37,7 +38,10 @@ def style_from_pygments_dict(pygments_dict: dict[Token, str]) ->Style:
Create a :class:`.Style` instance from a Pygments style dictionary.
(One that maps Token objects to style strings.)
"""
- pass
+ style_dict = {}
+ for token, style in pygments_dict.items():
+ style_dict[pygments_token_to_classname(token)] = style
+ return Style.from_dict(style_dict)
def pygments_token_to_classname(token: Token) ->str:
@@ -47,4 +51,4 @@ def pygments_token_to_classname(token: Token) ->str:
(Our Pygments lexer will also turn the tokens that pygments produces in a
prompt_toolkit list of fragments that match these styling rules.)
"""
- pass
+ return 'pygments.' + '.'.join(token).lower()
diff --git a/src/prompt_toolkit/styles/style.py b/src/prompt_toolkit/styles/style.py
index ddd7ee70..633c4a47 100644
--- a/src/prompt_toolkit/styles/style.py
+++ b/src/prompt_toolkit/styles/style.py
@@ -21,7 +21,17 @@ def parse_color(text: str) ->str:
Like in Pygments, but also support the ANSI color names.
(These will map to the colors of the 16 color palette.)
"""
- pass
+ if text in ANSI_COLOR_NAMES:
+ return text
+ if text.lower() in ANSI_COLOR_NAMES_ALIASES:
+ return ANSI_COLOR_NAMES_ALIASES[text.lower()]
+ if text.lower() in _named_colors_lowercase:
+ return _named_colors_lowercase[text.lower()]
+ if text.startswith('#') and len(text) in (4, 7, 9):
+ return text
+ if re.match(r'^#[0-9a-fA-F]{3}([0-9a-fA-F]{3})?([0-9a-fA-F]{2})?$', text):
+ return text
+ raise ValueError(f"Wrong color format: {text}")
_EMPTY_ATTRS = Attrs(color=None, bgcolor=None, bold=None, underline=None,
@@ -34,7 +44,8 @@ def _expand_classname(classname: str) ->list[str]:
E.g. 'a.b.c' becomes ['a', 'a.b', 'a.b.c']
"""
- pass
+ parts = classname.split('.')
+ return ['.'.join(parts[:i+1]) for i in range(len(parts))]
def _parse_style_str(style_str: str) ->Attrs:
@@ -42,7 +53,19 @@ def _parse_style_str(style_str: str) ->Attrs:
Take a style string, e.g. 'bg:red #88ff00 class:title'
and return a `Attrs` instance.
"""
- pass
+ attrs = _EMPTY_ATTRS._asdict()
+ for part in style_str.split():
+ if part.startswith('bg:'):
+ attrs['bgcolor'] = parse_color(part[3:])
+ elif part.startswith('fg:') or part.startswith('color:'):
+ attrs['color'] = parse_color(part.split(':', 1)[1])
+ elif part in ('bold', 'italic', 'underline', 'strike', 'reverse', 'hidden'):
+ attrs[part] = True
+ elif part == 'blink':
+ attrs['blink'] = True
+ elif ':' not in part:
+ attrs['color'] = parse_color(part)
+ return Attrs(**attrs)
CLASS_NAMES_RE = re.compile('^[a-z0-9.\\s_-]*$')
@@ -110,14 +133,34 @@ class Style(BaseStyle):
:param style_dict: Style dictionary.
:param priority: `Priority` value.
"""
- pass
+ if priority == Priority.DICT_KEY_ORDER:
+ return cls(list(style_dict.items()))
+ else: # Priority.MOST_PRECISE
+ return cls(sorted(
+ style_dict.items(),
+ key=lambda item: (-len(item[0].split()), item[0]),
+ reverse=True
+ ))
def get_attrs_for_style_str(self, style_str: str, default: Attrs=
DEFAULT_ATTRS) ->Attrs:
"""
Get `Attrs` for the given style string.
"""
- pass
+ list_of_attrs = [default]
+ class_names = set()
+
+ for part in style_str.split():
+ if part.startswith('class:'):
+ class_names.update(_expand_classname(part[6:]))
+ else:
+ list_of_attrs.append(_parse_style_str(part))
+
+ for names, attr in self.class_names_and_attrs:
+ if names & class_names:
+ list_of_attrs.append(attr)
+
+ return _merge_attrs(list_of_attrs)
_T = TypeVar('_T')
@@ -129,14 +172,19 @@ def _merge_attrs(list_of_attrs: list[Attrs]) ->Attrs:
Every `Attr` in the list can override the styling of the previous one. So,
the last one has highest priority.
"""
- pass
+ result = {}
+ for attr in list_of_attrs:
+ for k, v in attr._asdict().items():
+ if v is not None:
+ result[k] = v
+ return Attrs(**result)
def merge_styles(styles: list[BaseStyle]) ->_MergedStyle:
"""
Merge multiple `Style` objects.
"""
- pass
+ return _MergedStyle(styles)
class _MergedStyle(BaseStyle):
@@ -153,4 +201,13 @@ class _MergedStyle(BaseStyle):
@property
def _merged_style(self) ->Style:
"""The `Style` object that has the other styles merged together."""
- pass
+ def get_style():
+ style_rules = []
+ for style in self.styles:
+ if isinstance(style, Style):
+ style_rules.extend(style._style_rules)
+ elif isinstance(style, _MergedStyle):
+ style_rules.extend(style._merged_style._style_rules)
+ return Style(style_rules)
+
+ return self._style.get(tuple(self.styles), get_style)
diff --git a/src/prompt_toolkit/styles/style_transformation.py b/src/prompt_toolkit/styles/style_transformation.py
index d15adc5c..ac34b6e4 100644
--- a/src/prompt_toolkit/styles/style_transformation.py
+++ b/src/prompt_toolkit/styles/style_transformation.py
@@ -69,7 +69,17 @@ class SwapLightAndDarkStyleTransformation(StyleTransformation):
"""
Return the `Attrs` used when opposite luminosity should be used.
"""
- pass
+ return Attrs(
+ color=get_opposite_color(attrs.color),
+ bgcolor=get_opposite_color(attrs.bgcolor),
+ bold=attrs.bold,
+ italic=attrs.italic,
+ underline=attrs.underline,
+ strike=attrs.strike,
+ blink=attrs.blink,
+ reverse=attrs.reverse,
+ hidden=attrs.hidden
+ )
class ReverseStyleTransformation(StyleTransformation):
@@ -129,14 +139,40 @@ class AdjustBrightnessStyleTransformation(StyleTransformation):
"""
Parse `style.Attrs` color into RGB tuple.
"""
- pass
+ if color.startswith('ansi'):
+ # For ANSI colors, we'll use a predefined mapping
+ r, g, b = {
+ 'ansidefault': (0.5, 0.5, 0.5),
+ 'ansiblack': (0, 0, 0),
+ 'ansired': (0.5, 0, 0),
+ 'ansigreen': (0, 0.5, 0),
+ 'ansiyellow': (0.5, 0.5, 0),
+ 'ansiblue': (0, 0, 0.5),
+ 'ansimagenta': (0.5, 0, 0.5),
+ 'ansicyan': (0, 0.5, 0.5),
+ 'ansigray': (0.5, 0.5, 0.5),
+ 'ansiwhite': (1, 1, 1),
+ 'ansibrightblack': (0.25, 0.25, 0.25),
+ 'ansibrightred': (1, 0, 0),
+ 'ansibrightgreen': (0, 1, 0),
+ 'ansibrightyellow': (1, 1, 0),
+ 'ansibrightblue': (0, 0, 1),
+ 'ansibrightmagenta': (1, 0, 1),
+ 'ansibrightcyan': (0, 1, 1),
+ }.get(color, (0.5, 0.5, 0.5)) # Default to gray if not found
+ else:
+ # For hex colors
+ r = int(color[:2], 16) / 255.0
+ g = int(color[2:4], 16) / 255.0
+ b = int(color[4:], 16) / 255.0
+ return (r, g, b)
def _interpolate_brightness(self, value: float, min_brightness: float,
max_brightness: float) ->float:
"""
Map the brightness to the (min_brightness..max_brightness) range.
"""
- pass
+ return min_brightness + value * (max_brightness - min_brightness)
class DummyStyleTransformation(StyleTransformation):
@@ -182,7 +218,7 @@ def merge_style_transformations(style_transformations: Sequence[
"""
Merge multiple transformations together.
"""
- pass
+ return _MergedStyleTransformation(list(style_transformations))
OPPOSITE_ANSI_COLOR_NAMES = {'ansidefault': 'ansidefault', 'ansiblack':
@@ -207,4 +243,18 @@ def get_opposite_color(colorname: (str | None)) ->(str | None):
This is used for turning color schemes that work on a light background
usable on a dark background.
"""
- pass
+ if colorname is None:
+ return None
+
+ if colorname in OPPOSITE_ANSI_COLOR_NAMES:
+ return OPPOSITE_ANSI_COLOR_NAMES[colorname]
+
+ try:
+ r, g, b = int(colorname[:2], 16), int(colorname[2:4], 16), int(colorname[4:], 16)
+ h, l, s = rgb_to_hls(r / 255.0, g / 255.0, b / 255.0)
+ l = 1.0 - l # Invert luminosity
+ r, g, b = [int(x * 255) for x in hls_to_rgb(h, l, s)]
+ return f'{r:02x}{g:02x}{b:02x}'
+ except ValueError:
+ # If we can't parse the color, return it unchanged
+ return colorname
diff --git a/src/prompt_toolkit/utils.py b/src/prompt_toolkit/utils.py
index 32a37c22..fdf68978 100644
--- a/src/prompt_toolkit/utils.py
+++ b/src/prompt_toolkit/utils.py
@@ -50,7 +50,7 @@ class Event(Generic[_Sender]):
def fire(self) ->None:
"""Alias for just calling the event."""
- pass
+ self()
def add_handler(self, handler: Callable[[_Sender], None]) ->None:
"""
@@ -58,13 +58,13 @@ class Event(Generic[_Sender]):
(Handler should be a callable that takes exactly one parameter: the
sender object.)
"""
- pass
+ self._handlers.append(handler)
def remove_handler(self, handler: Callable[[_Sender], None]) ->None:
"""
Remove a handler from this callback.
"""
- pass
+ self._handlers.remove(handler)
def __iadd__(self, handler: Callable[[_Sender], None]) ->Event[_Sender]:
"""
@@ -128,7 +128,7 @@ def get_cwidth(string: str) ->int:
"""
Return width of a string. Wrapper around ``wcwidth``.
"""
- pass
+ return _CHAR_SIZES_CACHE[string]
def suspend_to_background_supported() ->bool:
@@ -136,47 +136,47 @@ def suspend_to_background_supported() ->bool:
Returns `True` when the Python implementation supports
suspend-to-background. This is typically `False' on Windows systems.
"""
- pass
+ return hasattr(signal, 'SIGTSTP')
def is_windows() ->bool:
"""
True when we are using Windows.
"""
- pass
+ return sys.platform.startswith('win')
def is_windows_vt100_supported() ->bool:
"""
True when we are using Windows, but VT100 escape sequences are supported.
"""
- pass
+ return is_windows() and 'WT_SESSION' in os.environ
def is_conemu_ansi() ->bool:
"""
True when the ConEmu Windows console is used.
"""
- pass
+ return is_windows() and 'ConEmuANSI' in os.environ
def in_main_thread() ->bool:
"""
True when the current thread is the main thread.
"""
- pass
+ return threading.current_thread() is threading.main_thread()
def get_bell_environment_variable() ->bool:
"""
True if env variable is set to true (true, TRUE, True, 1).
"""
- pass
+ return os.environ.get('PROMPT_TOOLKIT_BELL', '').lower() in ('true', '1')
def get_term_environment_variable() ->str:
"""Return the $TERM environment variable."""
- pass
+ return os.environ.get('TERM', '')
_T = TypeVar('_T')
@@ -196,17 +196,23 @@ def take_using_weights(items: list[_T], weights: list[int]) ->Generator[_T,
:param weights: Integers representing the weight. (Numbers have to be
integers, not floats.)
"""
- pass
+ assert len(items) == len(weights)
+ total_weight = sum(weights)
+
+ while True:
+ for item, weight in zip(items, weights):
+ for _ in range(weight):
+ yield item
def to_str(value: (Callable[[], str] | str)) ->str:
"""Turn callable or string into string."""
- pass
+ return value() if callable(value) else value
def to_int(value: (Callable[[], int] | int)) ->int:
"""Turn callable or int into int."""
- pass
+ return value() if callable(value) else value
AnyFloat = Union[Callable[[], float], float]
@@ -214,7 +220,7 @@ AnyFloat = Union[Callable[[], float], float]
def to_float(value: AnyFloat) ->float:
"""Turn callable or float into float."""
- pass
+ return value() if callable(value) else value
def is_dumb_terminal(term: (str | None)=None) ->bool:
@@ -224,4 +230,6 @@ def is_dumb_terminal(term: (str | None)=None) ->bool:
If so, we should fall back to the simplest possible form of line editing,
without cursor positioning and color support.
"""
- pass
+ if term is None:
+ term = get_term_environment_variable()
+ return term.lower() in ('dumb', 'unknown')
diff --git a/src/prompt_toolkit/validation.py b/src/prompt_toolkit/validation.py
index 76ed1cda..8a618fdb 100644
--- a/src/prompt_toolkit/validation.py
+++ b/src/prompt_toolkit/validation.py
@@ -80,7 +80,7 @@ class Validator(metaclass=ABCMeta):
:param move_cursor_to_end: Move the cursor to the end of the input, if
the input is invalid.
"""
- pass
+ return _ValidatorFromCallable(validate_func, error_message, move_cursor_to_end)
class _ValidatorFromCallable(Validator):
@@ -97,6 +97,14 @@ class _ValidatorFromCallable(Validator):
def __repr__(self) ->str:
return f'Validator.from_callable({self.func!r})'
+ def validate(self, document: Document) ->None:
+ if not self.func(document.text):
+ if self.move_cursor_to_end:
+ cursor_position = len(document.text)
+ else:
+ cursor_position = 0
+ raise ValidationError(cursor_position=cursor_position, message=self.error_message)
+
class ThreadedValidator(Validator):
"""
@@ -112,13 +120,15 @@ class ThreadedValidator(Validator):
"""
Run the `validate` function in a thread.
"""
- pass
+ await run_in_executor_with_context(lambda: self.validator.validate(document))
class DummyValidator(Validator):
"""
Validator class that accepts any input.
"""
+ def validate(self, document: Document) ->None:
+ pass # Always valid
class ConditionalValidator(Validator):
@@ -131,6 +141,10 @@ class ConditionalValidator(Validator):
self.validator = validator
self.filter = to_filter(filter)
+ def validate(self, document: Document) ->None:
+ if self.filter():
+ self.validator.validate(document)
+
class DynamicValidator(Validator):
"""
@@ -141,3 +155,8 @@ class DynamicValidator(Validator):
def __init__(self, get_validator: Callable[[], Validator | None]) ->None:
self.get_validator = get_validator
+
+ def validate(self, document: Document) ->None:
+ validator = self.get_validator()
+ if validator is not None:
+ validator.validate(document)
diff --git a/src/prompt_toolkit/widgets/base.py b/src/prompt_toolkit/widgets/base.py
index dbba3ba3..17f566ab 100644
--- a/src/prompt_toolkit/widgets/base.py
+++ b/src/prompt_toolkit/widgets/base.py
@@ -186,21 +186,21 @@ class TextArea:
"""
The `Buffer` text.
"""
- pass
+ return self.buffer.text
@property
def document(self) ->Document:
"""
The `Buffer` document (text + cursor position).
"""
- pass
+ return self.buffer.document
@property
def accept_handler(self) ->(BufferAcceptHandler | None):
"""
The accept handler. Called when the user accepts the input.
"""
- pass
+ return self.buffer.accept_handler
def __pt_container__(self) ->Container:
return self.window
@@ -286,7 +286,15 @@ class Button:
def _get_key_bindings(self) ->KeyBindings:
"""Key bindings for the Button."""
- pass
+ kb = KeyBindings()
+
+ @kb.add(' ')
+ @kb.add('enter')
+ def _(event):
+ if self.handler is not None:
+ self.handler()
+
+ return kb
def __pt_container__(self) ->Container:
return self.window
diff --git a/src/prompt_toolkit/widgets/toolbars.py b/src/prompt_toolkit/widgets/toolbars.py
index 69e19222..208772fd 100644
--- a/src/prompt_toolkit/widgets/toolbars.py
+++ b/src/prompt_toolkit/widgets/toolbars.py
@@ -111,17 +111,50 @@ class SearchToolbar:
class _CompletionsToolbarControl(UIControl):
- pass
+ def create_content(self, width: int, height: int) -> UIContent:
+ """Create the content for the completions toolbar."""
+ app = get_app()
+ if app.current_buffer.complete_state:
+ completions = app.current_buffer.complete_state.current_completions
+ index = app.current_buffer.complete_state.complete_index
+
+ # Format completions
+ formatted_completions = []
+ for i, completion in enumerate(completions):
+ if i == index:
+ formatted_completions.append(('class:completion-toolbar.completion.current', completion.display))
+ else:
+ formatted_completions.append(('class:completion-toolbar.completion', completion.display))
+
+ if i < len(completions) - 1:
+ formatted_completions.append(('class:completion-toolbar.arrow', ' > '))
+
+ return UIContent(
+ lambda i: formatted_completions,
+ line_count=1,
+ show_cursor=False
+ )
+ else:
+ return UIContent(lambda i: [], line_count=1)
+ def is_focusable(self) -> bool:
+ return False
-class CompletionsToolbar:
- def __init__(self) ->None:
- self.container = ConditionalContainer(content=Window(
- _CompletionsToolbarControl(), height=1, style=
- 'class:completion-toolbar'), filter=has_completions)
+class CompletionsToolbar:
- def __pt_container__(self) ->Container:
+ def __init__(self) -> None:
+ self.control = _CompletionsToolbarControl()
+ self.container = ConditionalContainer(
+ content=Window(
+ self.control,
+ height=1,
+ style='class:completion-toolbar'
+ ),
+ filter=has_completions
+ )
+
+ def __pt_container__(self) -> Container:
return self.container