diff --git a/=3.8.1 b/=3.8.1
new file mode 100644
index 0000000..ef39be2
--- /dev/null
+++ b/=3.8.1
@@ -0,0 +1,19 @@
+Collecting python-utils
+ Downloading python_utils-3.9.0-py2.py3-none-any.whl (32 kB)
+Collecting pytest
+ Downloading pytest-8.3.3-py3-none-any.whl (342 kB)
+ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 342.3/342.3 KB 7.9 MB/s eta 0:00:00
+Collecting typing-extensions>3.10.0.2
+ Downloading typing_extensions-4.12.2-py3-none-any.whl (37 kB)
+Collecting tomli>=1
+ Using cached tomli-2.1.0-py3-none-any.whl (13 kB)
+Collecting exceptiongroup>=1.0.0rc8
+ Downloading exceptiongroup-1.2.2-py3-none-any.whl (16 kB)
+Collecting iniconfig
+ Downloading iniconfig-2.0.0-py3-none-any.whl (5.9 kB)
+Collecting pluggy<2,>=1.5
+ Downloading pluggy-1.5.0-py3-none-any.whl (20 kB)
+Collecting packaging
+ Using cached packaging-24.2-py3-none-any.whl (65 kB)
+Installing collected packages: typing-extensions, tomli, pluggy, packaging, iniconfig, exceptiongroup, python-utils, pytest
+Successfully installed exceptiongroup-1.2.2 iniconfig-2.0.0 packaging-24.2 pluggy-1.5.0 pytest-8.3.3 python-utils-3.9.0 tomli-2.1.0 typing-extensions-4.12.2
diff --git a/progressbar/algorithms.py b/progressbar/algorithms.py
index 91e2731..3285c66 100644
--- a/progressbar/algorithms.py
+++ b/progressbar/algorithms.py
@@ -6,7 +6,7 @@ class SmoothingAlgorithm(abc.ABC):
@abc.abstractmethod
def __init__(self, **kwargs):
- raise NotImplementedError
+ pass
@abc.abstractmethod
def update(self, new_value: float, elapsed: timedelta) -> float:
@@ -23,9 +23,15 @@ class ExponentialMovingAverage(SmoothingAlgorithm):
"""
def __init__(self, alpha: float=0.5) -> None:
+ super().__init__()
self.alpha = alpha
self.value = 0
+ def update(self, new_value: float, elapsed: timedelta) -> float:
+ """Updates the EMA with a new value and returns the smoothed value."""
+ self.value = (self.alpha * new_value) + ((1 - self.alpha) * self.value)
+ return self.value
+
class DoubleExponentialMovingAverage(SmoothingAlgorithm):
"""
The Double Exponential Moving Average (DEMA) is essentially an EMA of an
@@ -34,6 +40,16 @@ class DoubleExponentialMovingAverage(SmoothingAlgorithm):
"""
def __init__(self, alpha: float=0.5) -> None:
+ super().__init__()
self.alpha = alpha
self.ema1 = 0
- self.ema2 = 0
\ No newline at end of file
+ self.ema2 = 0
+
+ def update(self, new_value: float, elapsed: timedelta) -> float:
+ """Updates the DEMA with a new value and returns the smoothed value."""
+ # Update first EMA
+ self.ema1 = (self.alpha * new_value) + ((1 - self.alpha) * self.ema1)
+ # Update second EMA
+ self.ema2 = (self.alpha * self.ema1) + ((1 - self.alpha) * self.ema2)
+ # DEMA = 2 * EMA1 - EMA2
+ return 2 * self.ema1 - self.ema2
\ No newline at end of file
diff --git a/progressbar/bar.py b/progressbar/bar.py
index 57bfd66..43a5093 100644
--- a/progressbar/bar.py
+++ b/progressbar/bar.py
@@ -46,10 +46,23 @@ class ProgressBarMixinBase(abc.ABC):
start_time: types.Optional[datetime]
seconds_elapsed: float
extra: types.Dict[str, types.Any]
+
+ def get_last_update_time(self) -> types.Optional[float]:
+ """Get the last update time."""
+ return self._last_update_time
+
+ def set_last_update_time(self, value: types.Optional[float]) -> None:
+ """Set the last update time."""
+ self._last_update_time = value
+
last_update_time = property(get_last_update_time, set_last_update_time)
def __init__(self, **kwargs):
- pass
+ self._started = False
+ self._finished = False
+ self._last_update_time = None
+ self.extra = {}
+ self.variables = utils.AttributeDict()
def __del__(self):
if not self._finished and self._started:
@@ -81,6 +94,14 @@ class DefaultFdMixin(ProgressBarMixinBase):
line_breaks: bool | None = True
enable_colors: progressbar.env.ColorSupport = progressbar.env.COLOR_SUPPORT
+ def _apply_line_offset(self, fd: base.TextIO, line_offset: int) -> base.TextIO:
+ """Apply line offset to the file descriptor."""
+ if line_offset > 0:
+ for _ in range(line_offset):
+ fd.write('\n')
+ fd.flush()
+ return fd
+
def __init__(self, fd: base.TextIO=sys.stderr, is_terminal: bool | None=None, line_breaks: bool | None=None, enable_colors: progressbar.env.ColorSupport | None=None, line_offset: int=0, **kwargs):
if fd is sys.stdout:
fd = utils.streams.original_stdout
@@ -94,6 +115,12 @@ class DefaultFdMixin(ProgressBarMixinBase):
self.enable_colors = self._determine_enable_colors(enable_colors)
super().__init__(**kwargs)
+ def _determine_line_breaks(self, line_breaks: bool | None) -> bool:
+ """Determine if line breaks should be used."""
+ if line_breaks is not None:
+ return line_breaks
+ return self.is_terminal
+
def _determine_enable_colors(self, enable_colors: progressbar.env.ColorSupport | None) -> progressbar.env.ColorSupport:
"""
Determines the color support for the progress bar.
@@ -122,11 +149,58 @@ class DefaultFdMixin(ProgressBarMixinBase):
ValueError: If `enable_colors` is not None, True, False, or an
instance of `progressbar.env.ColorSupport`.
"""
- pass
+ if enable_colors is None:
+ # Check environment variables
+ if progressbar.env.env_flag('PROGRESSBAR_ENABLE_COLORS', default=None):
+ return progressbar.env.ColorSupport.XTERM_256
+ elif progressbar.env.env_flag('FORCE_COLOR', default=None):
+ return progressbar.env.ColorSupport.XTERM_256
+ elif self.is_ansi_terminal:
+ return progressbar.env.COLOR_SUPPORT
+ else:
+ return progressbar.env.ColorSupport.NONE
+ elif enable_colors is True:
+ return progressbar.env.ColorSupport.XTERM_256
+ elif enable_colors is False:
+ return progressbar.env.ColorSupport.NONE
+ elif isinstance(enable_colors, progressbar.env.ColorSupport):
+ return enable_colors
+ else:
+ raise ValueError(f'Invalid value for enable_colors: {enable_colors}')
def _format_line(self):
"""Joins the widgets and justifies the line."""
- pass
+ result = []
+
+ # Add prefix if specified
+ if self.prefix:
+ result.append(self.prefix)
+
+ # Add widgets
+ for widget in self.widgets:
+ if isinstance(widget, str):
+ result.append(widget)
+ else:
+ result.append(str(widget))
+
+ # Add suffix if specified
+ if self.suffix:
+ result.append(self.suffix)
+
+ # Join all parts
+ line = ''.join(result)
+
+ # Calculate width and padding
+ width = self.custom_len(line)
+ padding = max(0, self.term_width - width)
+
+ # Apply justification
+ if padding and self.left_justify:
+ return line + ' ' * padding
+ elif padding:
+ return ' ' * padding + line
+ else:
+ return line
class ResizableMixin(ProgressBarMixinBase):
@@ -145,7 +219,13 @@ class ResizableMixin(ProgressBarMixinBase):
def _handle_resize(self, signum=None, frame=None):
"""Tries to catch resize signals sent from the terminal."""
- pass
+ try:
+ from python_utils.terminal import get_terminal_size
+ terminal_width = get_terminal_size()[0]
+ if terminal_width:
+ self.term_width = terminal_width
+ except (ImportError, OSError):
+ pass
class StdRedirectMixin(DefaultFdMixin):
redirect_stderr: bool = False
@@ -269,6 +349,10 @@ class ProgressBar(StdRedirectMixin, ResizableMixin, ProgressBarBase):
self.value = initial_value
self._iterable = None
self.custom_len = custom_len
+ self.initial_start_time = None
+ self.poll_interval = poll_interval
+ self.min_poll_interval = min_poll_interval
+ self.variables = utils.AttributeDict(variables or {})
self.initial_start_time = kwargs.get('start_time')
self.init()
poll_interval = utils.deltas_to_seconds(poll_interval, default=None)
@@ -286,7 +370,69 @@ class ProgressBar(StdRedirectMixin, ResizableMixin, ProgressBarBase):
(re)initialize values to original state so the progressbar can be
used (again).
"""
- pass
+ self._started = False
+ self._finished = False
+ self._last_update_time = None
+ self.previous_value = None
+ self.value = self.min_value
+ self.num_intervals = 0
+ self.next_update = 0
+ self.start_time = None
+ self.end_time = None
+ self.extra = {}
+
+ def start(self, max_value=None, init=True):
+ """Start measuring time and progress.
+
+ max_value - The maximum value of the progressbar
+ init - Whether to initialize the progressbar or not
+ """
+ if init:
+ self.init()
+
+ if max_value is not None:
+ self.max_value = max_value
+
+ if self.max_value is None:
+ self.max_value = base.UnknownLength
+
+ self.num_intervals = max(100, self.term_width)
+ self.next_update = 0
+
+ if not self._started:
+ self.start_time = self.initial_start_time or datetime.now()
+ self._started = True
+
+ return self
+
+ def update(self, value=None, force=False, **kwargs):
+ """Updates the ProgressBar to a new value.
+
+ value - New value of progress
+ force - Skip the time/interval checks and force update
+ """
+ if self.end_time:
+ return self
+
+ if value is not None:
+ self.previous_value = self.value
+ self.value = value
+
+ # Update variables
+ for key, val in kwargs.items():
+ self.variables[key] = val
+
+ # Skip update if not enough time has passed
+ now = datetime.now()
+ if not force and self._last_update_time:
+ delta = now - self._last_update_time
+ if delta < timedelta(seconds=self.min_poll_interval):
+ return self
+
+ self._last_update_time = now
+ self.num_intervals += 1
+
+ return self
@property
def percentage(self) -> float | None:
@@ -320,7 +466,16 @@ class ProgressBar(StdRedirectMixin, ResizableMixin, ProgressBarBase):
>>> progress.max_value = None
>>> progress.percentage
"""
- pass
+ if self.max_value is None or isinstance(self.max_value, base.UnknownLength):
+ return None
+
+ # Calculate the total range and current position
+ total_range = self.max_value - self.min_value
+ if total_range == 0:
+ return 100.0
+
+ current_pos = self.value - self.min_value
+ return (current_pos / total_range) * 100.0
def data(self) -> types.Dict[str, types.Any]:
"""
@@ -350,7 +505,28 @@ class ProgressBar(StdRedirectMixin, ResizableMixin, ProgressBarBase):
:py:class:`~progressbar.widgets.Variable`'s.
"""
- pass
+ now = datetime.now()
+ time_elapsed = now - (self.start_time or now)
+ total_seconds = time_elapsed.total_seconds()
+
+ return {
+ 'max_value': self.max_value,
+ 'start_time': self.start_time,
+ 'last_update_time': self.last_update_time,
+ 'end_time': self.end_time,
+ 'value': self.value,
+ 'previous_value': self.previous_value,
+ 'updates': self.num_intervals,
+ 'total_seconds_elapsed': total_seconds,
+ 'seconds_elapsed': int(total_seconds % 60),
+ 'minutes_elapsed': int((total_seconds // 60) % 60),
+ 'hours_elapsed': int((total_seconds // 3600) % 24),
+ 'days_elapsed': int(total_seconds // (24 * 3600)),
+ 'time_elapsed': time_elapsed,
+ 'percentage': self.percentage,
+ 'dynamic_messages': self.variables, # For backwards compatibility
+ 'variables': self.variables,
+ }
def __call__(self, iterable, max_value=None):
"""Use a ProgressBar to iterate through an iterable."""
diff --git a/progressbar/env.py b/progressbar/env.py
index 8d34cb1..c856064 100644
--- a/progressbar/env.py
+++ b/progressbar/env.py
@@ -14,7 +14,16 @@ def env_flag(name, default=None):
If the environment variable is not defined, or has an unknown value,
returns `default`
"""
- pass
+ value = os.environ.get(name)
+ if value is None:
+ return default
+
+ value = value.lower().strip()
+ if value in ('y', 'yes', '1', 'true', 'on'):
+ return True
+ elif value in ('n', 'no', '0', 'false', 'off'):
+ return False
+ return default
class ColorSupport(enum.IntEnum):
"""Color support for the terminal."""
@@ -39,10 +48,48 @@ class ColorSupport(enum.IntEnum):
Note that the highest available value will be used! Having
`COLORTERM=truecolor` will override `TERM=xterm-256color`.
"""
- pass
+ if JUPYTER:
+ return cls.XTERM_TRUECOLOR
+
+ term = os.environ.get('TERM', '').lower()
+ colorterm = os.environ.get('COLORTERM', '').lower()
+ color = os.environ.get('COLOR', '').lower()
+
+ for value in (term, colorterm, color):
+ if '24bit' in value or 'truecolor' in value:
+ return cls.XTERM_TRUECOLOR
+ elif '256' in value:
+ return cls.XTERM_256
+ elif 'xterm' in value:
+ return cls.XTERM
+
+ return cls.NONE
if os.name == 'nt':
- pass
+ try:
+ import colorama
+ colorama.init()
+ except ImportError:
+ pass
+
JUPYTER = bool(os.environ.get('JUPYTER_COLUMNS') or os.environ.get('JUPYTER_LINES') or os.environ.get('JPY_PARENT_PID'))
COLOR_SUPPORT = ColorSupport.from_env()
ANSI_TERMS = ('([xe]|bv)term', '(sco)?ansi', 'cygwin', 'konsole', 'linux', 'rxvt', 'screen', 'tmux', 'vt(10[02]|220|320)')
-ANSI_TERM_RE = re.compile(f'^({'|'.join(ANSI_TERMS)})', re.IGNORECASE)
\ No newline at end of file
+ANSI_TERM_RE = re.compile('^(' + '|'.join(ANSI_TERMS) + ')', re.IGNORECASE)
+
+def is_terminal(fd, is_terminal=None):
+ """Check if the file descriptor is a terminal."""
+ if is_terminal is not None:
+ return is_terminal
+
+ try:
+ return fd.isatty()
+ except (AttributeError, ValueError):
+ return False
+
+def is_ansi_terminal(fd):
+ """Check if the file descriptor supports ANSI escape sequences."""
+ if not is_terminal(fd):
+ return False
+
+ term = os.environ.get('TERM', '').lower()
+ return bool(ANSI_TERM_RE.match(term))
\ No newline at end of file
diff --git a/progressbar/shortcuts.py b/progressbar/shortcuts.py
index df5c3dd..fc8b47b 100644
--- a/progressbar/shortcuts.py
+++ b/progressbar/shortcuts.py
@@ -1 +1,14 @@
-from . import bar
\ No newline at end of file
+from . import bar
+
+def progressbar(*args, **kwargs):
+ """Create and start a progress bar, then return an iterator.
+
+ The context manager API is more convenient than this function since the
+ progress bar is automatically cleared on exit, but not all implementations
+ may support the context manager API.
+
+ >>> progress = progressbar(range(100))
+ >>> for i in progress:
+ ... pass
+ """
+ return bar.ProgressBar(*args, **kwargs)
\ No newline at end of file
diff --git a/progressbar/terminal/base.py b/progressbar/terminal/base.py
index 41bed4d..bdb7c91 100644
--- a/progressbar/terminal/base.py
+++ b/progressbar/terminal/base.py
@@ -109,7 +109,20 @@ class WindowsColors(enum.Enum):
>>> WindowsColors.from_rgb((128, 0, 128))
<WindowsColors.MAGENTA: (128, 0, 128)>
"""
- pass
+ min_distance = float('inf')
+ closest_color = None
+
+ for color in WindowsColors:
+ # Calculate Euclidean distance between colors
+ r1, g1, b1 = rgb
+ r2, g2, b2 = color.value
+ distance = ((r1 - r2) ** 2 + (g1 - g2) ** 2 + (b1 - b2) ** 2) ** 0.5
+
+ if distance < min_distance:
+ min_distance = distance
+ closest_color = color
+
+ return closest_color
class WindowsColor:
"""
@@ -139,7 +152,7 @@ class RGB(collections.namedtuple('RGB', ['red', 'green', 'blue'])):
Convert an RGB color (0-255 per channel) to the closest color in the
Windows 16 color scheme.
"""
- pass
+ return WindowsColors.from_rgb((self.red, self.green, self.blue))
class HSL(collections.namedtuple('HSL', ['hue', 'saturation', 'lightness'])):
"""
@@ -156,7 +169,20 @@ class HSL(collections.namedtuple('HSL', ['hue', 'saturation', 'lightness'])):
"""
Convert a 0-255 RGB color to a 0-255 HLS color.
"""
- pass
+ # Convert RGB values to 0-1 range for colorsys
+ r = rgb.red / 255.0
+ g = rgb.green / 255.0
+ b = rgb.blue / 255.0
+
+ # Convert to HSL
+ h, l, s = colorsys.rgb_to_hls(r, g, b)
+
+ # Convert hue to 0-360 range and saturation/lightness to 0-100 range
+ h = h * 360
+ s = s * 100
+ l = l * 100
+
+ return cls(h, s, l)
class ColorBase(abc.ABC):
pass
@@ -195,6 +221,47 @@ class Colors:
by_hls: ClassVar[defaultdict[HSL, types.List[Color]]] = collections.defaultdict(list)
by_xterm: ClassVar[dict[int, Color]] = dict()
+ @classmethod
+ def register(cls, rgb: RGB, hls: HSL, name: str, xterm: int) -> Color:
+ """Register a new color with the given RGB, HSL, name and xterm code."""
+ color = Color(rgb, hls, name, xterm)
+ cls.by_name[name].append(color)
+ cls.by_lowername[name.lower()].append(color)
+ cls.by_rgb[rgb].append(color)
+ cls.by_hls[hls].append(color)
+ cls.by_xterm[xterm] = color
+ return color
+
+ @staticmethod
+ def interpolate(color1: Color, color2: Color, value: float) -> Color:
+ """Interpolate between two colors based on a value between 0 and 1."""
+ if value <= 0:
+ return color1
+ elif value >= 1:
+ return color2
+
+ # Interpolate RGB values
+ r1, g1, b1 = color1.rgb
+ r2, g2, b2 = color2.rgb
+ r = int(r1 + (r2 - r1) * value)
+ g = int(g1 + (g2 - g1) * value)
+ b = int(b1 + (b2 - b1) * value)
+ rgb = RGB(r, g, b)
+
+ # Interpolate HSL values
+ h1, s1, l1 = color1.hls
+ h2, s2, l2 = color2.hls
+ h = h1 + (h2 - h1) * value
+ s = s1 + (s2 - s1) * value
+ l = l1 + (l2 - l1) * value
+ hsl = HSL(h, s, l)
+
+ # Use the name of the color we're closer to
+ name = color1.name if value < 0.5 else color2.name
+ xterm = color1.xterm if value < 0.5 else color2.xterm
+
+ return Color(rgb, hsl, name, xterm)
+
class ColorGradient(ColorBase):
def __init__(self, *colors: Color, interpolate=Colors.interpolate):
@@ -207,7 +274,17 @@ class ColorGradient(ColorBase):
def get_color(self, value: float) -> Color:
"""Map a value from 0 to 1 to a color."""
- pass
+ if value <= 0:
+ return self.colors[0]
+ elif value >= 1:
+ return self.colors[-1]
+
+ # Calculate which segment of the gradient we're in
+ segment_size = 1.0 / (len(self.colors) - 1)
+ segment = int(value / segment_size)
+ segment_value = (value - segment * segment_size) / segment_size
+
+ return self.interpolate(self.colors[segment], self.colors[segment + 1], segment_value)
OptionalColor = types.Union[Color, ColorGradient, None]
def apply_colors(text: str, percentage: float | None=None, *, fg: OptionalColor=None, bg: OptionalColor=None, fg_none: Color | None=None, bg_none: Color | None=None, **kwargs: types.Any) -> str:
@@ -217,7 +294,23 @@ def apply_colors(text: str, percentage: float | None=None, *, fg: OptionalColor=
Otherwise, the `fg` and `bg` colors will be used. If the colors are
gradients, the color will be interpolated depending on the percentage.
"""
- pass
+ if percentage is None:
+ fg_color = fg_none
+ bg_color = bg_none
+ else:
+ fg_color = fg(percentage) if isinstance(fg, ColorGradient) else fg
+ bg_color = bg(percentage) if isinstance(bg, ColorGradient) else bg
+
+ if fg_color is None and bg_color is None:
+ return text
+
+ # Apply colors
+ if fg_color:
+ text = fg_color(text)
+ if bg_color:
+ text = bg_color(text)
+
+ return text
class DummyColor:
diff --git a/progressbar/terminal/os_specific/posix.py b/progressbar/terminal/os_specific/posix.py
index 38f7626..47e07cb 100644
--- a/progressbar/terminal/os_specific/posix.py
+++ b/progressbar/terminal/os_specific/posix.py
@@ -1,3 +1,14 @@
import sys
import termios
-import tty
\ No newline at end of file
+import tty
+
+def getch():
+ """Get a single character from stdin without echoing."""
+ fd = sys.stdin.fileno()
+ old_settings = termios.tcgetattr(fd)
+ try:
+ tty.setraw(fd)
+ ch = sys.stdin.read(1)
+ finally:
+ termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
+ return ch
\ No newline at end of file
diff --git a/progressbar/utils.py b/progressbar/utils.py
index 7a30cc8..0a9afb7 100644
--- a/progressbar/utils.py
+++ b/progressbar/utils.py
@@ -48,7 +48,22 @@ def deltas_to_seconds(*deltas, default: types.Optional[types.Type[ValueError]]=V
>>> deltas_to_seconds(default=0.0)
0.0
"""
- pass
+ if not deltas and default is ValueError:
+ raise ValueError('No valid deltas passed to `deltas_to_seconds`')
+ elif not deltas:
+ return default
+
+ for delta in deltas:
+ if delta is None:
+ continue
+ elif isinstance(delta, datetime.timedelta):
+ return timedelta_to_seconds(delta)
+ elif isinstance(delta, (int, float)):
+ return float(delta)
+
+ if default is ValueError:
+ raise ValueError('No valid deltas passed to `deltas_to_seconds`')
+ return default
def no_color(value: StringT) -> StringT:
"""
@@ -65,7 +80,12 @@ def no_color(value: StringT) -> StringT:
...
TypeError: `value` must be a string or bytes, got 123
"""
- pass
+ if not isinstance(value, (str, bytes)):
+ raise TypeError(f'`value` must be a string or bytes, got {value}')
+
+ # Pattern to match ANSI escape sequences
+ pattern = re.compile(rb'\x1b\[[^m]*m' if isinstance(value, bytes) else r'\x1b\[[^m]*m')
+ return types.cast(StringT, pattern.sub(b'' if isinstance(value, bytes) else '', value))
def len_color(value: types.StringTypes) -> int:
"""
@@ -78,7 +98,7 @@ def len_color(value: types.StringTypes) -> int:
>>> len_color('\x1b[1234]abc')
3
"""
- pass
+ return len(no_color(value))
class WrappingIO:
buffer: io.StringIO
@@ -106,6 +126,26 @@ class WrappingIO:
def __exit__(self, __t: type[BaseException] | None, __value: BaseException | None, __traceback: TracebackType | None) -> None:
self.close()
+ def fileno(self) -> int:
+ """Return the file descriptor of the target."""
+ return self.target.fileno()
+
+ def close(self) -> None:
+ """Close the buffer and target."""
+ self.buffer.close()
+ if hasattr(self.target, 'close'):
+ self.target.close()
+
+ def flush(self) -> None:
+ """Flush the buffer and target."""
+ self.buffer.flush()
+ if hasattr(self.target, 'flush'):
+ self.target.flush()
+
+ def isatty(self) -> bool:
+ """Return True if the target is a terminal."""
+ return hasattr(self.target, 'isatty') and self.target.isatty()
+
class StreamWrapper:
"""Wrap stdout and stderr globally."""
stdout: base.TextIO | WrappingIO
@@ -131,6 +171,13 @@ class StreamWrapper:
if env.env_flag('WRAP_STDERR', default=False):
self.wrap_stderr()
+ def flush(self) -> None:
+ """Flush both stdout and stderr streams."""
+ if hasattr(self.stdout, 'flush'):
+ self.stdout.flush()
+ if hasattr(self.stderr, 'flush'):
+ self.stderr.flush()
+
class AttributeDict(dict):
"""
A dict that can be accessed with .attribute.
diff --git a/progressbar/widgets.py b/progressbar/widgets.py
index e2fed01..2062a31 100644
--- a/progressbar/widgets.py
+++ b/progressbar/widgets.py
@@ -19,6 +19,20 @@ Data = types.Dict[str, types.Any]
FormatString = typing.Optional[str]
T = typing.TypeVar('T')
+def string_or_lambda(input_value):
+ """Convert a string to a lambda or return the callable."""
+ if callable(input_value):
+ return input_value
+ return lambda progress, data: input_value
+
+def create_marker(marker, marker_wrap=None):
+ """Create a marker function that wraps the marker string if needed."""
+ if callable(marker):
+ return marker
+ if marker_wrap:
+ return lambda progress, data: marker_wrap % marker
+ return lambda progress, data: marker
+
def create_wrapper(wrapper):
"""Convert a wrapper tuple or format string to a format string.
@@ -30,14 +44,25 @@ def create_wrapper(wrapper):
>>> print(create_wrapper(('a', 'b')))
a{}b
"""
- pass
+ if not wrapper:
+ return None
+ elif isinstance(wrapper, tuple):
+ return wrapper[0] + '{}' + wrapper[1]
+ else:
+ return wrapper
def wrapper(function, wrapper_):
"""Wrap the output of a function in a template string or a tuple with
begin/end strings.
"""
- pass
+ wrapper_format = create_wrapper(wrapper_)
+
+ def wrap_text(progress, data):
+ text = function(progress, data)
+ return wrapper_format.format(text) if wrapper_format else text
+
+ return wrap_text
class FormatWidgetMixin(abc.ABC):
"""Mixin to format widgets using a formatstring.
@@ -148,6 +173,12 @@ class WidgetBase(WidthWidgetMixin, metaclass=abc.ABCMeta):
_gradient_colors: ClassVar[TGradientColors] = TGradientColors(fg=None, bg=None)
_len: typing.Callable[[str | bytes], int] = len
+ @property
+ def uses_colors(self) -> bool:
+ """Check if this widget uses colors."""
+ return bool(self._fixed_colors.get('fg_none') or self._fixed_colors.get('bg_none') or
+ self._gradient_colors.get('fg') or self._gradient_colors.get('bg'))
+
def __init__(self, *args, fixed_colors=None, gradient_colors=None, **kwargs):
if fixed_colors is not None:
self._fixed_colors.update(fixed_colors)
@@ -564,6 +595,16 @@ class Bar(AutoWidthWidgetBase):
self.fill_left = fill_left
AutoWidthWidgetBase.__init__(self, **kwargs)
+ def _apply_colors(self, text: str, data: Data) -> str:
+ """Apply colors to the text based on the progress percentage."""
+ percentage = data.get('percentage')
+ if percentage is None:
+ return text
+
+ return terminal.apply_colors(text, percentage / 100.0,
+ fg=self.fg, bg=self.bg,
+ **self._fixed_colors)
+
def __call__(self, progress: ProgressBarMixinBase, data: Data, width: int=0, color=True):
"""Updates the progress bar and its subcomponents."""
left = converters.to_unicode(self.left(progress, data, width))