back to Reference (Gold) summary
Reference (Gold): loguru
Pytest Summary for test tests
status | count |
---|---|
passed | 1459 |
skipped | 38 |
failed | 41 |
total | 1538 |
collected | 1538 |
Failed pytests:
test_logger.yml::basic_logger_usage[method=trace]
test_logger.yml::basic_logger_usage[method=trace]
/testbed/tests/typesafety/test_logger.yml:14: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:3: note: Revealed type is "Any" (diff) E main:4: note: Revealed type is "Any" (diff) E Expected: E main:3: note: Revealed type is "loguru.Logger" (diff) E main:4: note: Revealed type is "None" (diff) E Alignment of first line difference: E E: main:3: note: Revealed type is "loguru.Logger"... E A: main:1: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::basic_logger_usage[method=debug]
test_logger.yml::basic_logger_usage[method=debug]
/testbed/tests/typesafety/test_logger.yml:14: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:3: note: Revealed type is "Any" (diff) E main:4: note: Revealed type is "Any" (diff) E Expected: E main:3: note: Revealed type is "loguru.Logger" (diff) E main:4: note: Revealed type is "None" (diff) E Alignment of first line difference: E E: main:3: note: Revealed type is "loguru.Logger"... E A: main:1: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::basic_logger_usage[method=info]
test_logger.yml::basic_logger_usage[method=info]
/testbed/tests/typesafety/test_logger.yml:14: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:3: note: Revealed type is "Any" (diff) E main:4: note: Revealed type is "Any" (diff) E Expected: E main:3: note: Revealed type is "loguru.Logger" (diff) E main:4: note: Revealed type is "None" (diff) E Alignment of first line difference: E E: main:3: note: Revealed type is "loguru.Logger"... E A: main:1: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::basic_logger_usage[method=success]
test_logger.yml::basic_logger_usage[method=success]
/testbed/tests/typesafety/test_logger.yml:14: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:3: note: Revealed type is "Any" (diff) E main:4: note: Revealed type is "Any" (diff) E Expected: E main:3: note: Revealed type is "loguru.Logger" (diff) E main:4: note: Revealed type is "None" (diff) E Alignment of first line difference: E E: main:3: note: Revealed type is "loguru.Logger"... E A: main:1: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::basic_logger_usage[method=warning]
test_logger.yml::basic_logger_usage[method=warning]
/testbed/tests/typesafety/test_logger.yml:14: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:3: note: Revealed type is "Any" (diff) E main:4: note: Revealed type is "Any" (diff) E Expected: E main:3: note: Revealed type is "loguru.Logger" (diff) E main:4: note: Revealed type is "None" (diff) E Alignment of first line difference: E E: main:3: note: Revealed type is "loguru.Logger"... E A: main:1: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::basic_logger_usage[method=error]
test_logger.yml::basic_logger_usage[method=error]
/testbed/tests/typesafety/test_logger.yml:14: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:3: note: Revealed type is "Any" (diff) E main:4: note: Revealed type is "Any" (diff) E Expected: E main:3: note: Revealed type is "loguru.Logger" (diff) E main:4: note: Revealed type is "None" (diff) E Alignment of first line difference: E E: main:3: note: Revealed type is "loguru.Logger"... E A: main:1: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::basic_logger_usage[method=exception]
test_logger.yml::basic_logger_usage[method=exception]
/testbed/tests/typesafety/test_logger.yml:14: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:3: note: Revealed type is "Any" (diff) E main:4: note: Revealed type is "Any" (diff) E Expected: E main:3: note: Revealed type is "loguru.Logger" (diff) E main:4: note: Revealed type is "None" (diff) E Alignment of first line difference: E E: main:3: note: Revealed type is "loguru.Logger"... E A: main:1: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::basic_logger_usage[method=critical]
test_logger.yml::basic_logger_usage[method=critical]
/testbed/tests/typesafety/test_logger.yml:14: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:3: note: Revealed type is "Any" (diff) E main:4: note: Revealed type is "Any" (diff) E Expected: E main:3: note: Revealed type is "loguru.Logger" (diff) E main:4: note: Revealed type is "None" (diff) E Alignment of first line difference: E E: main:3: note: Revealed type is "loguru.Logger"... E A: main:1: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::using_log_function[level='INFO']
test_logger.yml::using_log_function[level='INFO']
/testbed/tests/typesafety/test_logger.yml:24: E pytest_mypy_plugins.utils.TypecheckAssertionError: Output is not expected: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E Expected: E (empty)
test_logger.yml::using_log_function[level=30]
test_logger.yml::using_log_function[level=30]
/testbed/tests/typesafety/test_logger.yml:24: E pytest_mypy_plugins.utils.TypecheckAssertionError: Output is not expected: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E Expected: E (empty)
test_logger.yml::using_logging_arguments
test_logger.yml::using_logging_arguments
/testbed/tests/typesafety/test_logger.yml:29: E pytest_mypy_plugins.utils.TypecheckAssertionError: Output is not expected: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E Expected: E (empty)
test_logger.yml::logging_non_string[message=123]
test_logger.yml::logging_non_string[message=123]
/testbed/tests/typesafety/test_logger.yml:38: E pytest_mypy_plugins.utils.TypecheckAssertionError: Output is not expected: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E Expected: E (empty)
test_logger.yml::logging_non_string[message=dict(foo=456)]
test_logger.yml::logging_non_string[message=dict(foo=456)]
/testbed/tests/typesafety/test_logger.yml:38: E pytest_mypy_plugins.utils.TypecheckAssertionError: Output is not expected: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E Expected: E (empty)
test_logger.yml::logging_non_string[message=object()]
test_logger.yml::logging_non_string[message=object()]
/testbed/tests/typesafety/test_logger.yml:38: E pytest_mypy_plugins.utils.TypecheckAssertionError: Output is not expected: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E Expected: E (empty)
test_logger.yml::add_sink[sink=sys.stderr]
test_logger.yml::add_sink[sink=sys.stderr]
/testbed/tests/typesafety/test_logger.yml:55: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:2: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:2: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:6: note: Revealed type is "Any" (diff) E Expected: E main:6: note: Revealed type is "builtins.int" (diff) E Alignment of first line difference: E E: main:6: note: Revealed type is "builtins.int"... E A: main:2: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::add_sink[sink='test.txt']
test_logger.yml::add_sink[sink='test.txt']
/testbed/tests/typesafety/test_logger.yml:55: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:2: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:2: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:6: note: Revealed type is "Any" (diff) E Expected: E main:6: note: Revealed type is "builtins.int" (diff) E Alignment of first line difference: E E: main:6: note: Revealed type is "builtins.int"... E A: main:2: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::add_sink[sink=Path('file.log')]
test_logger.yml::add_sink[sink=Path('file.log')]
/testbed/tests/typesafety/test_logger.yml:55: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:2: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:2: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:6: note: Revealed type is "Any" (diff) E Expected: E main:6: note: Revealed type is "builtins.int" (diff) E Alignment of first line difference: E E: main:6: note: Revealed type is "builtins.int"... E A: main:2: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::add_sink[sink=lambda m: None]
test_logger.yml::add_sink[sink=lambda m: None]
/testbed/tests/typesafety/test_logger.yml:55: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:2: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:2: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:6: note: Revealed type is "Any" (diff) E Expected: E main:6: note: Revealed type is "builtins.int" (diff) E Alignment of first line difference: E E: main:6: note: Revealed type is "builtins.int"... E A: main:2: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::add_sink[sink=StreamHandler()]
test_logger.yml::add_sink[sink=StreamHandler()]
/testbed/tests/typesafety/test_logger.yml:55: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:2: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:2: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:6: note: Revealed type is "Any" (diff) E Expected: E main:6: note: Revealed type is "builtins.int" (diff) E Alignment of first line difference: E E: main:6: note: Revealed type is "builtins.int"... E A: main:2: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::basic_sink_options[format='{message}',filter='module',context='fork']
test_logger.yml::basic_sink_options[format='{message}',filter='module',context='fork']
/testbed/tests/typesafety/test_logger.yml:67: E pytest_mypy_plugins.utils.TypecheckAssertionError: Output is not expected: E Actual: E main:3: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:3: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E Expected: E (empty)
test_logger.yml::basic_sink_options[format=lambda r: '{message}\n',filter=lambda r: True,context=get_context('fork')]
test_logger.yml::basic_sink_options[format=lambda r: '{message}\n',filter=lambda r: True,context=get_context('fork')]
/testbed/tests/typesafety/test_logger.yml:67: E pytest_mypy_plugins.utils.TypecheckAssertionError: Output is not expected: E Actual: E main:3: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:3: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E Expected: E (empty)
test_logger.yml::file_sink_options
test_logger.yml::file_sink_options
/testbed/tests/typesafety/test_logger.yml:85: E pytest_mypy_plugins.utils.TypecheckAssertionError: Output is not expected: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E Expected: E (empty)
test_logger.yml::async_sink_options
test_logger.yml::async_sink_options
/testbed/tests/typesafety/test_logger.yml:101: E pytest_mypy_plugins.utils.TypecheckAssertionError: Output is not expected: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E Expected: E (empty)
test_logger.yml::remove_sink
test_logger.yml::remove_sink
/testbed/tests/typesafety/test_logger.yml:113: E pytest_mypy_plugins.utils.TypecheckAssertionError: Output is not expected: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E Expected: E (empty)
test_logger.yml::await_completion
test_logger.yml::await_completion
/testbed/tests/typesafety/test_logger.yml:123: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:5: note: Revealed type is "Any" (diff) E Expected: E main:5: note: Revealed type is "typing.Awaitable[None]" (diff) E Alignment of first line difference: E E: main:5: note: Revealed type is "typing.Awaitable[None]"... E A: main:1: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::catch_as_decorator_with_parentheses
test_logger.yml::catch_as_decorator_with_parentheses
/testbed/tests/typesafety/test_logger.yml:134: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:6: note: Revealed type is "Any" (diff) E Expected: E main:6: note: Revealed type is "def (a: builtins.int, b: builtins.int) -> builtins.int" (diff) E Alignment of first line difference: E E: main:6: note: Revealed type is "def (a: builtins.int, b: builtins.int) -... E A: main:1: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::catch_as_decorator_without_parentheses
test_logger.yml::catch_as_decorator_without_parentheses
/testbed/tests/typesafety/test_logger.yml:145: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:6: note: Revealed type is "Any" (diff) E Expected: E main:6: note: Revealed type is "def (a: builtins.int, b: builtins.int) -> builtins.int" (diff) E Alignment of first line difference: E E: main:6: note: Revealed type is "def (a: builtins.int, b: builtins.int) -... E A: main:1: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::catch_as_context_manager
test_logger.yml::catch_as_context_manager
/testbed/tests/typesafety/test_logger.yml:155: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:5: note: Revealed type is "Any" (diff) E Expected: E main:5: note: Revealed type is "loguru.Catcher" (diff) E Alignment of first line difference: E E: main:5: note: Revealed type is "loguru.Catcher"... E A: main:1: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::opt
test_logger.yml::opt
/testbed/tests/typesafety/test_logger.yml:164: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:4: note: Revealed type is "Any" (diff) E Expected: E main:4: note: Revealed type is "loguru.Logger" (diff) E Alignment of first line difference: E E: main:4: note: Revealed type is "loguru.Logger"... E A: main:1: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::bind
test_logger.yml::bind
/testbed/tests/typesafety/test_logger.yml:173: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:4: note: Revealed type is "Any" (diff) E Expected: E main:4: note: Revealed type is "loguru.Logger" (diff) E Alignment of first line difference: E E: main:4: note: Revealed type is "loguru.Logger"... E A: main:1: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::patch
test_logger.yml::patch
/testbed/tests/typesafety/test_logger.yml:182: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:4: note: Revealed type is "Any" (diff) E Expected: E main:4: note: Revealed type is "loguru.Logger" (diff) E Alignment of first line difference: E E: main:4: note: Revealed type is "loguru.Logger"... E A: main:1: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::contextualize
test_logger.yml::contextualize
/testbed/tests/typesafety/test_logger.yml:192: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:5: note: Revealed type is "Any" (diff) E Expected: E main:5: note: Revealed type is "loguru.Contextualizer" (diff) E Alignment of first line difference: E E: main:5: note: Revealed type is "loguru.Contextualizer"... E A: main:1: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::level_get
test_logger.yml::level_get
/testbed/tests/typesafety/test_logger.yml:202: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:4: note: Revealed type is "Any" (diff) E Expected: E main:4: note: Revealed type is "tuple[builtins.str, builtins.int, builtins.str, builtins.str, fallback=loguru.Level]" (diff) E Alignment of first line difference: E E: main:4: note: Revealed type is "tuple[builtins.str, builtins.int, builti... E A: main:1: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::level_set
test_logger.yml::level_set
/testbed/tests/typesafety/test_logger.yml:222: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:4: note: Revealed type is "Any" (diff) E Expected: E main:4: note: Revealed type is "tuple[builtins.str, builtins.int, builtins.str, builtins.str, fallback=loguru.Level]" (diff) E Alignment of first line difference: E E: main:4: note: Revealed type is "tuple[builtins.str, builtins.int, builti... E A: main:1: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::level_update
test_logger.yml::level_update
/testbed/tests/typesafety/test_logger.yml:242: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:4: note: Revealed type is "Any" (diff) E Expected: E main:4: note: Revealed type is "tuple[builtins.str, builtins.int, builtins.str, builtins.str, fallback=loguru.Level]" (diff) E Alignment of first line difference: E E: main:4: note: Revealed type is "tuple[builtins.str, builtins.int, builti... E A: main:1: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::enable_and_disable_logger
test_logger.yml::enable_and_disable_logger
/testbed/tests/typesafety/test_logger.yml:257: E pytest_mypy_plugins.utils.TypecheckAssertionError: Output is not expected: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E Expected: E (empty)
test_logger.yml::configure
test_logger.yml::configure
/testbed/tests/typesafety/test_logger.yml:272: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:9: note: Revealed type is "Any" (diff) E Expected: E main:9: note: Revealed type is "builtins.list[builtins.int]" (diff) E Alignment of first line difference: E E: main:9: note: Revealed type is "builtins.list[builtins.int]"... E A: main:1: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::parse
test_logger.yml::parse
/testbed/tests/typesafety/test_logger.yml:282: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" [import] (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E main:5: note: Revealed type is "Any" (diff) E main:6: note: Revealed type is "Any" (diff) E Expected: E main:5: note: Revealed type is "typing.Generator[builtins.dict[builtins.str, Any], None, None]" (diff) E main:6: note: Revealed type is "builtins.dict[builtins.str, Any]" (diff) E Alignment of first line difference: E E: main:5: note: Revealed type is "typing.Generator[builtins.dict[builtins.... E A: main:1: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::invalid_add_argument
test_logger.yml::invalid_add_argument
/testbed/tests/typesafety/test_logger.yml:292: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E Expected: E main:2: error: No overload variant of "add" of "Logger" matches argument types "Callable[[Any], None]", "int" (diff) E main:2: note: Possible overload variants: (diff) E main:2: note: def add(self, sink: TextIO | Writable | Callable[[Message], None] | Handler, *, level: str | int = ..., format: str | Callable[[Record], str] = ..., filter: str | Callable[[Record], bool] | dict[str | None, str | int | bool] | None = ..., colorize: bool | None = ..., serialize: bool = ..., backtrace: bool = ..., diagnose: bool = ..., enqueue: bool = ..., context: str | BaseContext | None = ..., catch: bool = ...) -> int (diff) E main:2: note: def add(self, sink: Callable[[Message], Awaitable[None]], *, level: str | int = ..., format: str | Callable[[Record], str] = ..., filter: str | Callable[[Record], bool] | dict[str | None, str | int | bool] | None = ..., colorize: bool | None = ..., serialize: bool = ..., backtrace: bool = ..., diagnose: bool = ..., enqueue: bool = ..., context: str | BaseContext | None = ..., catch: bool = ..., loop: AbstractEventLoop | None = ...) -> int (diff) E main:2: note: def add(self, sink: str | PathLike[str], *, level: str | int = ..., format: str | Callable[[Record], str] = ..., filter: str | Callable[[Record], bool] | dict[str | None, str | int | bool] | None = ..., colorize: bool | None = ..., serialize: bool = ..., backtrace: bool = ..., diagnose: bool = ..., enqueue: bool = ..., context: str | BaseContext | None = ..., catch: bool = ..., rotation: str | int | time | timedelta | Callable[[Message, TextIO], bool] | None = ..., retention: str | int | timedelta | Callable[[list[str]], None] | None = ..., compression: str | Callable[[str], None] | None = ..., delay: bool = ..., watch: bool = ..., mode: str = ..., buffering: int = ..., encoding: str = ..., **kwargs: Any) -> int (diff) E Alignment of first line difference: E E: main:2: error: No overload variant of "add" of "Logger" matches argument... E A: main:1: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::invalid_logged_object_formatting
test_logger.yml::invalid_logged_object_formatting
/testbed/tests/typesafety/test_logger.yml:334: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E Expected: E main:2: error: No overload variant of "info" of "Logger" matches argument types "int", "int" (diff) E main:2: note: Possible overload variants: (diff) E main:2: note: def info(__self, str, /, *args: Any, **kwargs: Any) -> None (diff) E main:2: note: def info(__self, Any, /) -> None (diff) E Alignment of first line difference: E E: main:2: error: No overload variant of "info" of "Logger" matches argumen... E A: main:1: error: Cannot find implementation or library stub for module nam... E ^
test_logger.yml::invalid_configuration
test_logger.yml::invalid_configuration
/testbed/tests/typesafety/test_logger.yml:362: E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: E Actual: E main:1: error: Cannot find implementation or library stub for module named "loguru" (diff) E main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff) E Expected: E main:4: error: Extra key "baz" for TypedDict "LevelConfig" (diff) E main:5: error: Argument "patcher" to "configure" of "Logger" has incompatible type "int"; expected "Callable[[Record], None] | None" (diff) E main:6: error: List item 0 has incompatible type "dict[str, str]"; expected "tuple[str | None, bool]" (diff) E main:7: error: Argument "extra" to "configure" of "Logger" has incompatible type "list[]"; expected "dict[Any, Any] | None" (diff) E Alignment of first line difference: E E: main:4: error: Extra key "baz" for TypedDict "LevelConfig"... E A: main:1: error: Cannot find implementation or library stub for module nam... E ^
Patch diff
diff --git a/loguru/_asyncio_loop.py b/loguru/_asyncio_loop.py
index a1d4421..e981955 100644
--- a/loguru/_asyncio_loop.py
+++ b/loguru/_asyncio_loop.py
@@ -1,3 +1,27 @@
import asyncio
import sys
+
+
+def load_loop_functions():
+ if sys.version_info >= (3, 7):
+
+ def get_task_loop(task):
+ return task.get_loop()
+
+ get_running_loop = asyncio.get_running_loop
+
+ else:
+
+ def get_task_loop(task):
+ return task._loop
+
+ def get_running_loop():
+ loop = asyncio.get_event_loop()
+ if not loop.is_running():
+ raise RuntimeError("There is no running event loop")
+ return loop
+
+ return get_task_loop, get_running_loop
+
+
get_task_loop, get_running_loop = load_loop_functions()
diff --git a/loguru/_better_exceptions.py b/loguru/_better_exceptions.py
index 6b15688..8327a13 100644
--- a/loguru/_better_exceptions.py
+++ b/loguru/_better_exceptions.py
@@ -9,46 +9,142 @@ import sys
import sysconfig
import tokenize
import traceback
+
if sys.version_info >= (3, 11):
+
+ def is_exception_group(exc):
+ return isinstance(exc, ExceptionGroup)
+
else:
try:
from exceptiongroup import ExceptionGroup
except ImportError:
+ def is_exception_group(exc):
+ return False
+
+ else:
+
+ def is_exception_group(exc):
+ return isinstance(exc, ExceptionGroup)
+
class SyntaxHighlighter:
- _default_style = {'comment': '\x1b[30m\x1b[1m{}\x1b[0m', 'keyword':
- '\x1b[35m\x1b[1m{}\x1b[0m', 'builtin': '\x1b[1m{}\x1b[0m', 'string':
- '\x1b[36m{}\x1b[0m', 'number': '\x1b[34m\x1b[1m{}\x1b[0m',
- 'operator': '\x1b[35m\x1b[1m{}\x1b[0m', 'punctuation':
- '\x1b[1m{}\x1b[0m', 'constant': '\x1b[36m\x1b[1m{}\x1b[0m',
- 'identifier': '\x1b[1m{}\x1b[0m', 'other': '{}'}
+ _default_style = {
+ "comment": "\x1b[30m\x1b[1m{}\x1b[0m",
+ "keyword": "\x1b[35m\x1b[1m{}\x1b[0m",
+ "builtin": "\x1b[1m{}\x1b[0m",
+ "string": "\x1b[36m{}\x1b[0m",
+ "number": "\x1b[34m\x1b[1m{}\x1b[0m",
+ "operator": "\x1b[35m\x1b[1m{}\x1b[0m",
+ "punctuation": "\x1b[1m{}\x1b[0m",
+ "constant": "\x1b[36m\x1b[1m{}\x1b[0m",
+ "identifier": "\x1b[1m{}\x1b[0m",
+ "other": "{}",
+ }
+
_builtins = set(dir(builtins))
- _constants = {'True', 'False', 'None'}
- _punctation = {'(', ')', '[', ']', '{', '}', ':', ',', ';'}
+ _constants = {"True", "False", "None"}
+ _punctation = {"(", ")", "[", "]", "{", "}", ":", ",", ";"}
_strings = {tokenize.STRING}
_fstring_middle = None
+
if sys.version_info >= (3, 12):
- _strings.update({tokenize.FSTRING_START, tokenize.FSTRING_MIDDLE,
- tokenize.FSTRING_END})
+ _strings.update({tokenize.FSTRING_START, tokenize.FSTRING_MIDDLE, tokenize.FSTRING_END})
_fstring_middle = tokenize.FSTRING_MIDDLE
def __init__(self, style=None):
self._style = style or self._default_style
+ def highlight(self, source):
+ style = self._style
+ row, column = 0, 0
+ output = ""
+
+ for token in self.tokenize(source):
+ type_, string, (start_row, start_column), (_, end_column), line = token
+
+ if type_ == self._fstring_middle:
+ # When an f-string contains "{{" or "}}", they appear as "{" or "}" in the "string"
+ # attribute of the token. However, they do not count in the column position.
+ end_column += string.count("{") + string.count("}")
+
+ if type_ == tokenize.NAME:
+ if string in self._constants:
+ color = style["constant"]
+ elif keyword.iskeyword(string):
+ color = style["keyword"]
+ elif string in self._builtins:
+ color = style["builtin"]
+ else:
+ color = style["identifier"]
+ elif type_ == tokenize.OP:
+ if string in self._punctation:
+ color = style["punctuation"]
+ else:
+ color = style["operator"]
+ elif type_ == tokenize.NUMBER:
+ color = style["number"]
+ elif type_ in self._strings:
+ color = style["string"]
+ elif type_ == tokenize.COMMENT:
+ color = style["comment"]
+ else:
+ color = style["other"]
+
+ if start_row != row:
+ source = source[column:]
+ row, column = start_row, 0
+
+ if type_ != tokenize.ENCODING:
+ output += line[column:start_column]
+ output += color.format(line[start_column:end_column])
+
+ column = end_column
+
+ output += source[column:]
+
+ return output
+
+ @staticmethod
+ def tokenize(source):
+ # Worth reading: https://www.asmeurer.com/brown-water-python/
+ source = source.encode("utf-8")
+ source = io.BytesIO(source)
+
+ try:
+ yield from tokenize.tokenize(source.readline)
+ except tokenize.TokenError:
+ return
+
class ExceptionFormatter:
- _default_theme = {'introduction': '\x1b[33m\x1b[1m{}\x1b[0m', 'cause':
- '\x1b[1m{}\x1b[0m', 'context': '\x1b[1m{}\x1b[0m', 'dirname':
- '\x1b[32m{}\x1b[0m', 'basename': '\x1b[32m\x1b[1m{}\x1b[0m', 'line':
- '\x1b[33m{}\x1b[0m', 'function': '\x1b[35m{}\x1b[0m',
- 'exception_type': '\x1b[31m\x1b[1m{}\x1b[0m', 'exception_value':
- '\x1b[1m{}\x1b[0m', 'arrows': '\x1b[36m{}\x1b[0m', 'value':
- '\x1b[36m\x1b[1m{}\x1b[0m'}
-
- def __init__(self, colorize=False, backtrace=False, diagnose=True,
- theme=None, style=None, max_length=128, encoding='ascii',
- hidden_frames_filename=None, prefix=''):
+ _default_theme = {
+ "introduction": "\x1b[33m\x1b[1m{}\x1b[0m",
+ "cause": "\x1b[1m{}\x1b[0m",
+ "context": "\x1b[1m{}\x1b[0m",
+ "dirname": "\x1b[32m{}\x1b[0m",
+ "basename": "\x1b[32m\x1b[1m{}\x1b[0m",
+ "line": "\x1b[33m{}\x1b[0m",
+ "function": "\x1b[35m{}\x1b[0m",
+ "exception_type": "\x1b[31m\x1b[1m{}\x1b[0m",
+ "exception_value": "\x1b[1m{}\x1b[0m",
+ "arrows": "\x1b[36m{}\x1b[0m",
+ "value": "\x1b[36m\x1b[1m{}\x1b[0m",
+ }
+
+ def __init__(
+ self,
+ colorize=False,
+ backtrace=False,
+ diagnose=True,
+ theme=None,
+ style=None,
+ max_length=128,
+ encoding="ascii",
+ hidden_frames_filename=None,
+ prefix="",
+ ):
self._colorize = colorize
self._diagnose = diagnose
self._theme = theme or self._default_theme
@@ -59,6 +155,374 @@ class ExceptionFormatter:
self._hidden_frames_filename = hidden_frames_filename
self._prefix = prefix
self._lib_dirs = self._get_lib_dirs()
- self._pipe_char = self._get_char('│', '|')
- self._cap_char = self._get_char('└', '->')
- self._catch_point_identifier = ' <Loguru catch point here>'
+ self._pipe_char = self._get_char("\u2502", "|")
+ self._cap_char = self._get_char("\u2514", "->")
+ self._catch_point_identifier = " <Loguru catch point here>"
+
+ @staticmethod
+ def _get_lib_dirs():
+ schemes = sysconfig.get_scheme_names()
+ names = ["stdlib", "platstdlib", "platlib", "purelib"]
+ paths = {sysconfig.get_path(name, scheme) for scheme in schemes for name in names}
+ return [os.path.abspath(path).lower() + os.sep for path in paths if path in sys.path]
+
+ @staticmethod
+ def _indent(text, count, *, prefix="| "):
+ if count == 0:
+ yield text
+ return
+ for line in text.splitlines(True):
+ indented = " " * count + prefix + line
+ yield indented.rstrip() + "\n"
+
+ def _get_char(self, char, default):
+ try:
+ char.encode(self._encoding)
+ except (UnicodeEncodeError, LookupError):
+ return default
+ else:
+ return char
+
+ def _is_file_mine(self, file):
+ filepath = os.path.abspath(file).lower()
+ if not filepath.endswith(".py"):
+ return False
+ return not any(filepath.startswith(d) for d in self._lib_dirs)
+
+ def _extract_frames(self, tb, is_first, *, limit=None, from_decorator=False):
+ frames, final_source = [], None
+
+ if tb is None or (limit is not None and limit <= 0):
+ return frames, final_source
+
+ def is_valid(frame):
+ return frame.f_code.co_filename != self._hidden_frames_filename
+
+ def get_info(frame, lineno):
+ filename = frame.f_code.co_filename
+ function = frame.f_code.co_name
+ source = linecache.getline(filename, lineno).strip()
+ return filename, lineno, function, source
+
+ infos = []
+
+ if is_valid(tb.tb_frame):
+ infos.append((get_info(tb.tb_frame, tb.tb_lineno), tb.tb_frame))
+
+ get_parent_only = from_decorator and not self._backtrace
+
+ if (self._backtrace and is_first) or get_parent_only:
+ frame = tb.tb_frame.f_back
+ while frame:
+ if is_valid(frame):
+ infos.insert(0, (get_info(frame, frame.f_lineno), frame))
+ if get_parent_only:
+ break
+ frame = frame.f_back
+
+ if infos and not get_parent_only:
+ (filename, lineno, function, source), frame = infos[-1]
+ function += self._catch_point_identifier
+ infos[-1] = ((filename, lineno, function, source), frame)
+
+ tb = tb.tb_next
+
+ while tb:
+ if is_valid(tb.tb_frame):
+ infos.append((get_info(tb.tb_frame, tb.tb_lineno), tb.tb_frame))
+ tb = tb.tb_next
+
+ if limit is not None:
+ infos = infos[-limit:]
+
+ for (filename, lineno, function, source), frame in infos:
+ final_source = source
+ if source:
+ colorize = self._colorize and self._is_file_mine(filename)
+ lines = []
+ if colorize:
+ lines.append(self._syntax_highlighter.highlight(source))
+ else:
+ lines.append(source)
+ if self._diagnose:
+ relevant_values = self._get_relevant_values(source, frame)
+ values = self._format_relevant_values(list(relevant_values), colorize)
+ lines += list(values)
+ source = "\n ".join(lines)
+ frames.append((filename, lineno, function, source))
+
+ return frames, final_source
+
+ def _get_relevant_values(self, source, frame):
+ value = None
+ pending = None
+ is_attribute = False
+ is_valid_value = False
+ is_assignment = True
+
+ for token in self._syntax_highlighter.tokenize(source):
+ type_, string, (_, col), *_ = token
+
+ if pending is not None:
+ # Keyword arguments are ignored
+ if type_ != tokenize.OP or string != "=" or is_assignment:
+ yield pending
+ pending = None
+
+ if type_ == tokenize.NAME and not keyword.iskeyword(string):
+ if not is_attribute:
+ for variables in (frame.f_locals, frame.f_globals):
+ try:
+ value = variables[string]
+ except KeyError:
+ continue
+ else:
+ is_valid_value = True
+ pending = (col, self._format_value(value))
+ break
+ elif is_valid_value:
+ try:
+ value = inspect.getattr_static(value, string)
+ except AttributeError:
+ is_valid_value = False
+ else:
+ yield (col, self._format_value(value))
+ elif type_ == tokenize.OP and string == ".":
+ is_attribute = True
+ is_assignment = False
+ elif type_ == tokenize.OP and string == ";":
+ is_assignment = True
+ is_attribute = False
+ is_valid_value = False
+ else:
+ is_attribute = False
+ is_valid_value = False
+ is_assignment = False
+
+ if pending is not None:
+ yield pending
+
+ def _format_relevant_values(self, relevant_values, colorize):
+ for i in reversed(range(len(relevant_values))):
+ col, value = relevant_values[i]
+ pipe_cols = [pcol for pcol, _ in relevant_values[:i]]
+ pre_line = ""
+ index = 0
+
+ for pc in pipe_cols:
+ pre_line += (" " * (pc - index)) + self._pipe_char
+ index = pc + 1
+
+ pre_line += " " * (col - index)
+ value_lines = value.split("\n")
+
+ for n, value_line in enumerate(value_lines):
+ if n == 0:
+ arrows = pre_line + self._cap_char + " "
+ else:
+ arrows = pre_line + " " * (len(self._cap_char) + 1)
+
+ if colorize:
+ arrows = self._theme["arrows"].format(arrows)
+ value_line = self._theme["value"].format(value_line)
+
+ yield arrows + value_line
+
+ def _format_value(self, v):
+ try:
+ v = repr(v)
+ except Exception:
+ v = "<unprintable %s object>" % type(v).__name__
+
+ max_length = self._max_length
+ if max_length is not None and len(v) > max_length:
+ v = v[: max_length - 3] + "..."
+ return v
+
+ def _format_locations(self, frames_lines, *, has_introduction):
+ prepend_with_new_line = has_introduction
+ regex = r'^ File "(?P<file>.*?)", line (?P<line>[^,]+)(?:, in (?P<function>.*))?\n'
+
+ for frame in frames_lines:
+ match = re.match(regex, frame)
+
+ if match:
+ file, line, function = match.group("file", "line", "function")
+
+ is_mine = self._is_file_mine(file)
+
+ if function is not None:
+ pattern = ' File "{}", line {}, in {}\n'
+ else:
+ pattern = ' File "{}", line {}\n'
+
+ if self._backtrace and function and function.endswith(self._catch_point_identifier):
+ function = function[: -len(self._catch_point_identifier)]
+ pattern = ">" + pattern[1:]
+
+ if self._colorize and is_mine:
+ dirname, basename = os.path.split(file)
+ if dirname:
+ dirname += os.sep
+ dirname = self._theme["dirname"].format(dirname)
+ basename = self._theme["basename"].format(basename)
+ file = dirname + basename
+ line = self._theme["line"].format(line)
+ function = self._theme["function"].format(function)
+
+ if self._diagnose and (is_mine or prepend_with_new_line):
+ pattern = "\n" + pattern
+
+ location = pattern.format(file, line, function)
+ frame = location + frame[match.end() :]
+ prepend_with_new_line = is_mine
+
+ yield frame
+
+ def _format_exception(
+ self, value, tb, *, seen=None, is_first=False, from_decorator=False, group_nesting=0
+ ):
+ # Implemented from built-in traceback module:
+ # https://github.com/python/cpython/blob/a5b76167/Lib/traceback.py#L468
+ exc_type, exc_value, exc_traceback = type(value), value, tb
+
+ if seen is None:
+ seen = set()
+
+ seen.add(id(exc_value))
+
+ if exc_value:
+ if exc_value.__cause__ is not None and id(exc_value.__cause__) not in seen:
+ yield from self._format_exception(
+ exc_value.__cause__,
+ exc_value.__cause__.__traceback__,
+ seen=seen,
+ group_nesting=group_nesting,
+ )
+ cause = "The above exception was the direct cause of the following exception:"
+ if self._colorize:
+ cause = self._theme["cause"].format(cause)
+ if self._diagnose:
+ yield from self._indent("\n\n" + cause + "\n\n\n", group_nesting)
+ else:
+ yield from self._indent("\n" + cause + "\n\n", group_nesting)
+
+ elif (
+ exc_value.__context__ is not None
+ and id(exc_value.__context__) not in seen
+ and not exc_value.__suppress_context__
+ ):
+ yield from self._format_exception(
+ exc_value.__context__,
+ exc_value.__context__.__traceback__,
+ seen=seen,
+ group_nesting=group_nesting,
+ )
+ context = "During handling of the above exception, another exception occurred:"
+ if self._colorize:
+ context = self._theme["context"].format(context)
+ if self._diagnose:
+ yield from self._indent("\n\n" + context + "\n\n\n", group_nesting)
+ else:
+ yield from self._indent("\n" + context + "\n\n", group_nesting)
+
+ is_grouped = is_exception_group(value)
+
+ if is_grouped and group_nesting == 0:
+ yield from self._format_exception(
+ value,
+ tb,
+ seen=seen,
+ group_nesting=1,
+ is_first=is_first,
+ from_decorator=from_decorator,
+ )
+ return
+
+ try:
+ traceback_limit = sys.tracebacklimit
+ except AttributeError:
+ traceback_limit = None
+
+ frames, final_source = self._extract_frames(
+ exc_traceback, is_first, limit=traceback_limit, from_decorator=from_decorator
+ )
+ exception_only = traceback.format_exception_only(exc_type, exc_value)
+
+ # Determining the correct index for the "Exception: message" part in the formatted exception
+ # is challenging. This is because it might be preceded by multiple lines specific to
+ # "SyntaxError" or followed by various notes. However, we can make an educated guess based
+ # on the indentation; the preliminary context for "SyntaxError" is always indented, while
+ # the Exception itself is not. This allows us to identify the correct index for the
+ # exception message.
+ for error_message_index, part in enumerate(exception_only): # noqa: B007
+ if not part.startswith(" "):
+ break
+
+ error_message = exception_only[error_message_index][:-1] # Remove last new line temporarily
+
+ if self._colorize:
+ if ":" in error_message:
+ exception_type, exception_value = error_message.split(":", 1)
+ exception_type = self._theme["exception_type"].format(exception_type)
+ exception_value = self._theme["exception_value"].format(exception_value)
+ error_message = exception_type + ":" + exception_value
+ else:
+ error_message = self._theme["exception_type"].format(error_message)
+
+ if self._diagnose and frames:
+ if issubclass(exc_type, AssertionError) and not str(exc_value) and final_source:
+ if self._colorize:
+ final_source = self._syntax_highlighter.highlight(final_source)
+ error_message += ": " + final_source
+
+ error_message = "\n" + error_message
+
+ exception_only[error_message_index] = error_message + "\n"
+
+ if is_first:
+ yield self._prefix
+
+ has_introduction = bool(frames)
+
+ if has_introduction:
+ if is_grouped:
+ introduction = "Exception Group Traceback (most recent call last):"
+ else:
+ introduction = "Traceback (most recent call last):"
+ if self._colorize:
+ introduction = self._theme["introduction"].format(introduction)
+ if group_nesting == 1: # Implies we're processing the root ExceptionGroup.
+ yield from self._indent(introduction + "\n", group_nesting, prefix="+ ")
+ else:
+ yield from self._indent(introduction + "\n", group_nesting)
+
+ frames_lines = traceback.format_list(frames) + exception_only
+ if self._colorize or self._backtrace or self._diagnose:
+ frames_lines = self._format_locations(frames_lines, has_introduction=has_introduction)
+
+ yield from self._indent("".join(frames_lines), group_nesting)
+
+ if is_grouped:
+ for n, exc in enumerate(value.exceptions, start=1):
+ ruler = "+" + (" %s " % ("..." if n > 15 else n)).center(35, "-")
+ yield from self._indent(ruler, group_nesting, prefix="+-" if n == 1 else " ")
+ if n > 15:
+ message = "and %d more exceptions\n" % (len(value.exceptions) - 15)
+ yield from self._indent(message, group_nesting + 1)
+ break
+ elif group_nesting == 10 and is_exception_group(exc):
+ message = "... (max_group_depth is 10)\n"
+ yield from self._indent(message, group_nesting + 1)
+ else:
+ yield from self._format_exception(
+ exc,
+ exc.__traceback__,
+ seen=seen,
+ group_nesting=group_nesting + 1,
+ )
+ if not is_exception_group(exc) or group_nesting == 10:
+ yield from self._indent("-" * 35, group_nesting + 1, prefix="+-")
+
+ def format_exception(self, type_, value, tb, *, from_decorator=False):
+ yield from self._format_exception(value, tb, is_first=True, from_decorator=from_decorator)
diff --git a/loguru/_colorama.py b/loguru/_colorama.py
index 82cda0b..b69c700 100644
--- a/loguru/_colorama.py
+++ b/loguru/_colorama.py
@@ -1,2 +1,66 @@
import os
import sys
+
+
+def should_colorize(stream):
+ if stream is None:
+ return False
+
+ if stream is sys.stdout or stream is sys.stderr:
+ try:
+ import ipykernel
+ import IPython
+
+ ipython = IPython.get_ipython()
+ is_jupyter_stream = isinstance(stream, ipykernel.iostream.OutStream)
+ is_jupyter_shell = isinstance(ipython, ipykernel.zmqshell.ZMQInteractiveShell)
+ except Exception:
+ pass
+ else:
+ if is_jupyter_stream and is_jupyter_shell:
+ return True
+
+ if stream is sys.__stdout__ or stream is sys.__stderr__:
+ if "CI" in os.environ and any(
+ ci in os.environ
+ for ci in ["TRAVIS", "CIRCLECI", "APPVEYOR", "GITLAB_CI", "GITHUB_ACTIONS"]
+ ):
+ return True
+ if "PYCHARM_HOSTED" in os.environ:
+ return True
+ if os.name == "nt" and "TERM" in os.environ:
+ return True
+
+ try:
+ return stream.isatty()
+ except Exception:
+ return False
+
+
+def should_wrap(stream):
+ if os.name != "nt":
+ return False
+
+ if stream is not sys.__stdout__ and stream is not sys.__stderr__:
+ return False
+
+ from colorama.win32 import winapi_test
+
+ if not winapi_test():
+ return False
+
+ try:
+ from colorama.winterm import enable_vt_processing
+ except ImportError:
+ return True
+
+ try:
+ return not enable_vt_processing(stream.fileno())
+ except Exception:
+ return True
+
+
+def wrap(stream):
+ from colorama import AnsiToWin32
+
+ return AnsiToWin32(stream, convert=True, strip=True, autoreset=False).stream
diff --git a/loguru/_colorizer.py b/loguru/_colorizer.py
index b5aa9a1..ed87cdb 100644
--- a/loguru/_colorizer.py
+++ b/loguru/_colorizer.py
@@ -25,6 +25,7 @@ class Fore:
CYAN = 36
WHITE = 37
RESET = 39
+
LIGHTBLACK_EX = 90
LIGHTRED_EX = 91
LIGHTGREEN_EX = 92
@@ -45,6 +46,7 @@ class Back:
CYAN = 46
WHITE = 47
RESET = 49
+
LIGHTBLACK_EX = 100
LIGHTRED_EX = 101
LIGHTGREEN_EX = 102
@@ -55,6 +57,10 @@ class Back:
LIGHTWHITE_EX = 107
+def ansi_escape(codes):
+ return {name: "\033[%dm" % code for name, code in codes.items()}
+
+
class TokenType:
TEXT = 1
ANSI = 2
@@ -63,66 +69,403 @@ class TokenType:
class AnsiParser:
- _style = ansi_escape({'b': Style.BOLD, 'd': Style.DIM, 'n': Style.
- NORMAL, 'h': Style.HIDE, 'i': Style.ITALIC, 'l': Style.BLINK, 's':
- Style.STRIKE, 'u': Style.UNDERLINE, 'v': Style.REVERSE, 'bold':
- Style.BOLD, 'dim': Style.DIM, 'normal': Style.NORMAL, 'hide': Style
- .HIDE, 'italic': Style.ITALIC, 'blink': Style.BLINK, 'strike':
- Style.STRIKE, 'underline': Style.UNDERLINE, 'reverse': Style.REVERSE})
- _foreground = ansi_escape({'k': Fore.BLACK, 'r': Fore.RED, 'g': Fore.
- GREEN, 'y': Fore.YELLOW, 'e': Fore.BLUE, 'm': Fore.MAGENTA, 'c':
- Fore.CYAN, 'w': Fore.WHITE, 'lk': Fore.LIGHTBLACK_EX, 'lr': Fore.
- LIGHTRED_EX, 'lg': Fore.LIGHTGREEN_EX, 'ly': Fore.LIGHTYELLOW_EX,
- 'le': Fore.LIGHTBLUE_EX, 'lm': Fore.LIGHTMAGENTA_EX, 'lc': Fore.
- LIGHTCYAN_EX, 'lw': Fore.LIGHTWHITE_EX, 'black': Fore.BLACK, 'red':
- Fore.RED, 'green': Fore.GREEN, 'yellow': Fore.YELLOW, 'blue': Fore.
- BLUE, 'magenta': Fore.MAGENTA, 'cyan': Fore.CYAN, 'white': Fore.
- WHITE, 'light-black': Fore.LIGHTBLACK_EX, 'light-red': Fore.
- LIGHTRED_EX, 'light-green': Fore.LIGHTGREEN_EX, 'light-yellow':
- Fore.LIGHTYELLOW_EX, 'light-blue': Fore.LIGHTBLUE_EX,
- 'light-magenta': Fore.LIGHTMAGENTA_EX, 'light-cyan': Fore.
- LIGHTCYAN_EX, 'light-white': Fore.LIGHTWHITE_EX})
- _background = ansi_escape({'K': Back.BLACK, 'R': Back.RED, 'G': Back.
- GREEN, 'Y': Back.YELLOW, 'E': Back.BLUE, 'M': Back.MAGENTA, 'C':
- Back.CYAN, 'W': Back.WHITE, 'LK': Back.LIGHTBLACK_EX, 'LR': Back.
- LIGHTRED_EX, 'LG': Back.LIGHTGREEN_EX, 'LY': Back.LIGHTYELLOW_EX,
- 'LE': Back.LIGHTBLUE_EX, 'LM': Back.LIGHTMAGENTA_EX, 'LC': Back.
- LIGHTCYAN_EX, 'LW': Back.LIGHTWHITE_EX, 'BLACK': Back.BLACK, 'RED':
- Back.RED, 'GREEN': Back.GREEN, 'YELLOW': Back.YELLOW, 'BLUE': Back.
- BLUE, 'MAGENTA': Back.MAGENTA, 'CYAN': Back.CYAN, 'WHITE': Back.
- WHITE, 'LIGHT-BLACK': Back.LIGHTBLACK_EX, 'LIGHT-RED': Back.
- LIGHTRED_EX, 'LIGHT-GREEN': Back.LIGHTGREEN_EX, 'LIGHT-YELLOW':
- Back.LIGHTYELLOW_EX, 'LIGHT-BLUE': Back.LIGHTBLUE_EX,
- 'LIGHT-MAGENTA': Back.LIGHTMAGENTA_EX, 'LIGHT-CYAN': Back.
- LIGHTCYAN_EX, 'LIGHT-WHITE': Back.LIGHTWHITE_EX})
- _regex_tag = re.compile('\\\\?</?((?:[fb]g\\s)?[^<>\\s]*)>')
+ _style = ansi_escape(
+ {
+ "b": Style.BOLD,
+ "d": Style.DIM,
+ "n": Style.NORMAL,
+ "h": Style.HIDE,
+ "i": Style.ITALIC,
+ "l": Style.BLINK,
+ "s": Style.STRIKE,
+ "u": Style.UNDERLINE,
+ "v": Style.REVERSE,
+ "bold": Style.BOLD,
+ "dim": Style.DIM,
+ "normal": Style.NORMAL,
+ "hide": Style.HIDE,
+ "italic": Style.ITALIC,
+ "blink": Style.BLINK,
+ "strike": Style.STRIKE,
+ "underline": Style.UNDERLINE,
+ "reverse": Style.REVERSE,
+ }
+ )
+
+ _foreground = ansi_escape(
+ {
+ "k": Fore.BLACK,
+ "r": Fore.RED,
+ "g": Fore.GREEN,
+ "y": Fore.YELLOW,
+ "e": Fore.BLUE,
+ "m": Fore.MAGENTA,
+ "c": Fore.CYAN,
+ "w": Fore.WHITE,
+ "lk": Fore.LIGHTBLACK_EX,
+ "lr": Fore.LIGHTRED_EX,
+ "lg": Fore.LIGHTGREEN_EX,
+ "ly": Fore.LIGHTYELLOW_EX,
+ "le": Fore.LIGHTBLUE_EX,
+ "lm": Fore.LIGHTMAGENTA_EX,
+ "lc": Fore.LIGHTCYAN_EX,
+ "lw": Fore.LIGHTWHITE_EX,
+ "black": Fore.BLACK,
+ "red": Fore.RED,
+ "green": Fore.GREEN,
+ "yellow": Fore.YELLOW,
+ "blue": Fore.BLUE,
+ "magenta": Fore.MAGENTA,
+ "cyan": Fore.CYAN,
+ "white": Fore.WHITE,
+ "light-black": Fore.LIGHTBLACK_EX,
+ "light-red": Fore.LIGHTRED_EX,
+ "light-green": Fore.LIGHTGREEN_EX,
+ "light-yellow": Fore.LIGHTYELLOW_EX,
+ "light-blue": Fore.LIGHTBLUE_EX,
+ "light-magenta": Fore.LIGHTMAGENTA_EX,
+ "light-cyan": Fore.LIGHTCYAN_EX,
+ "light-white": Fore.LIGHTWHITE_EX,
+ }
+ )
+
+ _background = ansi_escape(
+ {
+ "K": Back.BLACK,
+ "R": Back.RED,
+ "G": Back.GREEN,
+ "Y": Back.YELLOW,
+ "E": Back.BLUE,
+ "M": Back.MAGENTA,
+ "C": Back.CYAN,
+ "W": Back.WHITE,
+ "LK": Back.LIGHTBLACK_EX,
+ "LR": Back.LIGHTRED_EX,
+ "LG": Back.LIGHTGREEN_EX,
+ "LY": Back.LIGHTYELLOW_EX,
+ "LE": Back.LIGHTBLUE_EX,
+ "LM": Back.LIGHTMAGENTA_EX,
+ "LC": Back.LIGHTCYAN_EX,
+ "LW": Back.LIGHTWHITE_EX,
+ "BLACK": Back.BLACK,
+ "RED": Back.RED,
+ "GREEN": Back.GREEN,
+ "YELLOW": Back.YELLOW,
+ "BLUE": Back.BLUE,
+ "MAGENTA": Back.MAGENTA,
+ "CYAN": Back.CYAN,
+ "WHITE": Back.WHITE,
+ "LIGHT-BLACK": Back.LIGHTBLACK_EX,
+ "LIGHT-RED": Back.LIGHTRED_EX,
+ "LIGHT-GREEN": Back.LIGHTGREEN_EX,
+ "LIGHT-YELLOW": Back.LIGHTYELLOW_EX,
+ "LIGHT-BLUE": Back.LIGHTBLUE_EX,
+ "LIGHT-MAGENTA": Back.LIGHTMAGENTA_EX,
+ "LIGHT-CYAN": Back.LIGHTCYAN_EX,
+ "LIGHT-WHITE": Back.LIGHTWHITE_EX,
+ }
+ )
+
+ _regex_tag = re.compile(r"\\?</?((?:[fb]g\s)?[^<>\s]*)>")
def __init__(self):
self._tokens = []
self._tags = []
self._color_tokens = []
+ @staticmethod
+ def strip(tokens):
+ output = ""
+ for type_, value in tokens:
+ if type_ == TokenType.TEXT:
+ output += value
+ return output
+
+ @staticmethod
+ def colorize(tokens, ansi_level):
+ output = ""
+
+ for type_, value in tokens:
+ if type_ == TokenType.LEVEL:
+ if ansi_level is None:
+ raise ValueError(
+ "The '<level>' color tag is not allowed in this context, "
+ "it has not yet been associated to any color value."
+ )
+ value = ansi_level
+ output += value
+
+ return output
+
+ @staticmethod
+ def wrap(tokens, *, ansi_level, color_tokens):
+ output = ""
+
+ for type_, value in tokens:
+ if type_ == TokenType.LEVEL:
+ value = ansi_level
+ output += value
+ if type_ == TokenType.CLOSING:
+ for subtype, subvalue in color_tokens:
+ if subtype == TokenType.LEVEL:
+ subvalue = ansi_level
+ output += subvalue
+
+ return output
+
+ def feed(self, text, *, raw=False):
+ if raw:
+ self._tokens.append((TokenType.TEXT, text))
+ return
+
+ position = 0
+
+ for match in self._regex_tag.finditer(text):
+ markup, tag = match.group(0), match.group(1)
+
+ self._tokens.append((TokenType.TEXT, text[position : match.start()]))
+
+ position = match.end()
+
+ if markup[0] == "\\":
+ self._tokens.append((TokenType.TEXT, markup[1:]))
+ continue
+
+ if markup[1] == "/":
+ if self._tags and (tag == "" or tag == self._tags[-1]):
+ self._tags.pop()
+ self._color_tokens.pop()
+ self._tokens.append((TokenType.CLOSING, "\033[0m"))
+ self._tokens.extend(self._color_tokens)
+ continue
+ elif tag in self._tags:
+ raise ValueError('Closing tag "%s" violates nesting rules' % markup)
+ else:
+ raise ValueError('Closing tag "%s" has no corresponding opening tag' % markup)
+
+ if tag in {"lvl", "level"}:
+ token = (TokenType.LEVEL, None)
+ else:
+ ansi = self._get_ansicode(tag)
+
+ if ansi is None:
+ raise ValueError(
+ 'Tag "%s" does not correspond to any known color directive, '
+ "make sure you did not misspelled it (or prepend '\\' to escape it)"
+ % markup
+ )
+
+ token = (TokenType.ANSI, ansi)
+
+ self._tags.append(tag)
+ self._color_tokens.append(token)
+ self._tokens.append(token)
+
+ self._tokens.append((TokenType.TEXT, text[position:]))
+
+ def done(self, *, strict=True):
+ if strict and self._tags:
+ faulty_tag = self._tags.pop(0)
+ raise ValueError('Opening tag "<%s>" has no corresponding closing tag' % faulty_tag)
+ return self._tokens
+
+ def current_color_tokens(self):
+ return list(self._color_tokens)
+
+ def _get_ansicode(self, tag):
+ style = self._style
+ foreground = self._foreground
+ background = self._background
+
+ # Substitute on a direct match.
+ if tag in style:
+ return style[tag]
+ elif tag in foreground:
+ return foreground[tag]
+ elif tag in background:
+ return background[tag]
+
+ # An alternative syntax for setting the color (e.g. <fg red>, <bg red>).
+ elif tag.startswith("fg ") or tag.startswith("bg "):
+ st, color = tag[:2], tag[3:]
+ code = "38" if st == "fg" else "48"
+
+ if st == "fg" and color.lower() in foreground:
+ return foreground[color.lower()]
+ elif st == "bg" and color.upper() in background:
+ return background[color.upper()]
+ elif color.isdigit() and int(color) <= 255:
+ return "\033[%s;5;%sm" % (code, color)
+ elif re.match(r"#(?:[a-fA-F0-9]{3}){1,2}$", color):
+ hex_color = color[1:]
+ if len(hex_color) == 3:
+ hex_color *= 2
+ rgb = tuple(int(hex_color[i : i + 2], 16) for i in (0, 2, 4))
+ return "\033[%s;2;%s;%s;%sm" % ((code,) + rgb)
+ elif color.count(",") == 2:
+ colors = tuple(color.split(","))
+ if all(x.isdigit() and int(x) <= 255 for x in colors):
+ return "\033[%s;2;%s;%s;%sm" % ((code,) + colors)
+
+ return None
+
class ColoringMessage(str):
- __fields__ = '_messages',
+ __fields__ = ("_messages",)
def __format__(self, spec):
return next(self._messages).__format__(spec)
class ColoredMessage:
-
def __init__(self, tokens):
self.tokens = tokens
self.stripped = AnsiParser.strip(tokens)
+ def colorize(self, ansi_level):
+ return AnsiParser.colorize(self.tokens, ansi_level)
-class ColoredFormat:
+class ColoredFormat:
def __init__(self, tokens, messages_color_tokens):
self._tokens = tokens
self._messages_color_tokens = messages_color_tokens
+ def strip(self):
+ return AnsiParser.strip(self._tokens)
+
+ def colorize(self, ansi_level):
+ return AnsiParser.colorize(self._tokens, ansi_level)
+
+ def make_coloring_message(self, message, *, ansi_level, colored_message):
+ messages = [
+ message
+ if color_tokens is None
+ else AnsiParser.wrap(
+ colored_message.tokens, ansi_level=ansi_level, color_tokens=color_tokens
+ )
+ for color_tokens in self._messages_color_tokens
+ ]
+ coloring = ColoringMessage(message)
+ coloring._messages = iter(messages)
+ return coloring
+
class Colorizer:
- pass
+ @staticmethod
+ def prepare_format(string):
+ tokens, messages_color_tokens = Colorizer._parse_without_formatting(string)
+ return ColoredFormat(tokens, messages_color_tokens)
+
+ @staticmethod
+ def prepare_message(string, args=(), kwargs={}): # noqa: B006
+ tokens = Colorizer._parse_with_formatting(string, args, kwargs)
+ return ColoredMessage(tokens)
+
+ @staticmethod
+ def prepare_simple_message(string):
+ parser = AnsiParser()
+ parser.feed(string)
+ tokens = parser.done()
+ return ColoredMessage(tokens)
+
+ @staticmethod
+ def ansify(text):
+ parser = AnsiParser()
+ parser.feed(text.strip())
+ tokens = parser.done(strict=False)
+ return AnsiParser.colorize(tokens, None)
+
+ @staticmethod
+ def _parse_with_formatting(
+ string, args, kwargs, *, recursion_depth=2, auto_arg_index=0, recursive=False
+ ):
+ # This function re-implements Formatter._vformat()
+
+ if recursion_depth < 0:
+ raise ValueError("Max string recursion exceeded")
+
+ formatter = Formatter()
+ parser = AnsiParser()
+
+ for literal_text, field_name, format_spec, conversion in formatter.parse(string):
+ parser.feed(literal_text, raw=recursive)
+
+ if field_name is not None:
+ if field_name == "":
+ if auto_arg_index is False:
+ raise ValueError(
+ "cannot switch from manual field "
+ "specification to automatic field "
+ "numbering"
+ )
+ field_name = str(auto_arg_index)
+ auto_arg_index += 1
+ elif field_name.isdigit():
+ if auto_arg_index:
+ raise ValueError(
+ "cannot switch from manual field "
+ "specification to automatic field "
+ "numbering"
+ )
+ auto_arg_index = False
+
+ obj, _ = formatter.get_field(field_name, args, kwargs)
+ obj = formatter.convert_field(obj, conversion)
+
+ format_spec, auto_arg_index = Colorizer._parse_with_formatting(
+ format_spec,
+ args,
+ kwargs,
+ recursion_depth=recursion_depth - 1,
+ auto_arg_index=auto_arg_index,
+ recursive=True,
+ )
+
+ formatted = formatter.format_field(obj, format_spec)
+ parser.feed(formatted, raw=True)
+
+ tokens = parser.done()
+
+ if recursive:
+ return AnsiParser.strip(tokens), auto_arg_index
+
+ return tokens
+
+ @staticmethod
+ def _parse_without_formatting(string, *, recursion_depth=2, recursive=False):
+ if recursion_depth < 0:
+ raise ValueError("Max string recursion exceeded")
+
+ formatter = Formatter()
+ parser = AnsiParser()
+
+ messages_color_tokens = []
+
+ for literal_text, field_name, format_spec, conversion in formatter.parse(string):
+ if literal_text and literal_text[-1] in "{}":
+ literal_text += literal_text[-1]
+
+ parser.feed(literal_text, raw=recursive)
+
+ if field_name is not None:
+ if field_name == "message":
+ if recursive:
+ messages_color_tokens.append(None)
+ else:
+ color_tokens = parser.current_color_tokens()
+ messages_color_tokens.append(color_tokens)
+ field = "{%s" % field_name
+ if conversion:
+ field += "!%s" % conversion
+ if format_spec:
+ field += ":%s" % format_spec
+ field += "}"
+ parser.feed(field, raw=True)
+
+ _, color_tokens = Colorizer._parse_without_formatting(
+ format_spec, recursion_depth=recursion_depth - 1, recursive=True
+ )
+ messages_color_tokens.extend(color_tokens)
+
+ return parser.done(), messages_color_tokens
diff --git a/loguru/_contextvars.py b/loguru/_contextvars.py
index 69d6932..2e8bbb2 100644
--- a/loguru/_contextvars.py
+++ b/loguru/_contextvars.py
@@ -1,2 +1,15 @@
import sys
+
+
+def load_contextvar_class():
+ if sys.version_info >= (3, 7):
+ from contextvars import ContextVar
+ elif sys.version_info >= (3, 5, 3):
+ from aiocontextvars import ContextVar
+ else:
+ from contextvars import ContextVar
+
+ return ContextVar
+
+
ContextVar = load_contextvar_class()
diff --git a/loguru/_ctime_functions.py b/loguru/_ctime_functions.py
index 0e6aed3..e232b42 100644
--- a/loguru/_ctime_functions.py
+++ b/loguru/_ctime_functions.py
@@ -1,2 +1,57 @@
import os
+
+
+def load_ctime_functions():
+ if os.name == "nt":
+ import win32_setctime
+
+ def get_ctime_windows(filepath):
+ return os.stat(filepath).st_ctime
+
+ def set_ctime_windows(filepath, timestamp):
+ if not win32_setctime.SUPPORTED:
+ return
+
+ try:
+ win32_setctime.setctime(filepath, timestamp)
+ except (OSError, ValueError):
+ pass
+
+ return get_ctime_windows, set_ctime_windows
+
+ elif hasattr(os.stat_result, "st_birthtime"):
+
+ def get_ctime_macos(filepath):
+ return os.stat(filepath).st_birthtime
+
+ def set_ctime_macos(filepath, timestamp):
+ pass
+
+ return get_ctime_macos, set_ctime_macos
+
+ elif hasattr(os, "getxattr") and hasattr(os, "setxattr"):
+
+ def get_ctime_linux(filepath):
+ try:
+ return float(os.getxattr(filepath, b"user.loguru_crtime"))
+ except OSError:
+ return os.stat(filepath).st_mtime
+
+ def set_ctime_linux(filepath, timestamp):
+ try:
+ os.setxattr(filepath, b"user.loguru_crtime", str(timestamp).encode("ascii"))
+ except OSError:
+ pass
+
+ return get_ctime_linux, set_ctime_linux
+
+ def get_ctime_fallback(filepath):
+ return os.stat(filepath).st_mtime
+
+ def set_ctime_fallback(filepath, timestamp):
+ pass
+
+ return get_ctime_fallback, set_ctime_fallback
+
+
get_ctime, set_ctime = load_ctime_functions()
diff --git a/loguru/_datetime.py b/loguru/_datetime.py
index 754c834..76626db 100644
--- a/loguru/_datetime.py
+++ b/loguru/_datetime.py
@@ -3,58 +3,103 @@ from calendar import day_abbr, day_name, month_abbr, month_name
from datetime import datetime as datetime_
from datetime import timedelta, timezone
from time import localtime, strftime
-tokens = (
- 'H{1,2}|h{1,2}|m{1,2}|s{1,2}|S+|YYYY|YY|M{1,4}|D{1,4}|Z{1,2}|zz|A|X|x|E|Q|dddd|ddd|d'
- )
-pattern = re.compile('(?:{0})|\\[(?:{0}|!UTC|)\\]'.format(tokens))
+tokens = r"H{1,2}|h{1,2}|m{1,2}|s{1,2}|S+|YYYY|YY|M{1,4}|D{1,4}|Z{1,2}|zz|A|X|x|E|Q|dddd|ddd|d"
-class datetime(datetime_):
+pattern = re.compile(r"(?:{0})|\[(?:{0}|!UTC|)\]".format(tokens))
+
+class datetime(datetime_): # noqa: N801
def __format__(self, spec):
- if spec.endswith('!UTC'):
+ if spec.endswith("!UTC"):
dt = self.astimezone(timezone.utc)
spec = spec[:-4]
else:
dt = self
+
if not spec:
- spec = '%Y-%m-%dT%H:%M:%S.%f%z'
- if '%' in spec:
+ spec = "%Y-%m-%dT%H:%M:%S.%f%z"
+
+ if "%" in spec:
return datetime_.__format__(dt, spec)
- if 'SSSSSSS' in spec:
+
+ if "SSSSSSS" in spec:
raise ValueError(
- "Invalid time format: the provided format string contains more than six successive 'S' characters. This may be due to an attempt to use nanosecond precision, which is not supported."
- )
- year, month, day, hour, minute, second, weekday, yearday, _ = (dt.
- timetuple())
+ "Invalid time format: the provided format string contains more than six successive "
+ "'S' characters. This may be due to an attempt to use nanosecond precision, which "
+ "is not supported."
+ )
+
+ year, month, day, hour, minute, second, weekday, yearday, _ = dt.timetuple()
microsecond = dt.microsecond
timestamp = dt.timestamp()
tzinfo = dt.tzinfo or timezone(timedelta(seconds=0))
offset = tzinfo.utcoffset(dt).total_seconds()
- sign = ('-', '+')[offset >= 0]
+ sign = ("-", "+")[offset >= 0]
(h, m), s = divmod(abs(offset // 60), 60), abs(offset) % 60
- rep = {'YYYY': '%04d' % year, 'YY': '%02d' % (year % 100), 'Q':
- '%d' % ((month - 1) // 3 + 1), 'MMMM': month_name[month], 'MMM':
- month_abbr[month], 'MM': '%02d' % month, 'M': '%d' % month,
- 'DDDD': '%03d' % yearday, 'DDD': '%d' % yearday, 'DD': '%02d' %
- day, 'D': '%d' % day, 'dddd': day_name[weekday], 'ddd':
- day_abbr[weekday], 'd': '%d' % weekday, 'E': '%d' % (weekday +
- 1), 'HH': '%02d' % hour, 'H': '%d' % hour, 'hh': '%02d' % ((
- hour - 1) % 12 + 1), 'h': '%d' % ((hour - 1) % 12 + 1), 'mm':
- '%02d' % minute, 'm': '%d' % minute, 'ss': '%02d' % second, 's':
- '%d' % second, 'S': '%d' % (microsecond // 100000), 'SS':
- '%02d' % (microsecond // 10000), 'SSS': '%03d' % (microsecond //
- 1000), 'SSSS': '%04d' % (microsecond // 100), 'SSSSS': '%05d' %
- (microsecond // 10), 'SSSSSS': '%06d' % microsecond, 'A': ('AM',
- 'PM')[hour // 12], 'Z': '%s%02d:%02d%s' % (sign, h, m, (
- ':%09.06f' % s)[:11 if s % 1 else 3] * (s > 0)), 'ZZ':
- '%s%02d%02d%s' % (sign, h, m, ('%09.06f' % s)[:10 if s % 1 else
- 2] * (s > 0)), 'zz': tzinfo.tzname(dt) or '', 'X': '%d' %
- timestamp, 'x': '%d' % (int(timestamp) * 1000000 + microsecond)}
+
+ rep = {
+ "YYYY": "%04d" % year,
+ "YY": "%02d" % (year % 100),
+ "Q": "%d" % ((month - 1) // 3 + 1),
+ "MMMM": month_name[month],
+ "MMM": month_abbr[month],
+ "MM": "%02d" % month,
+ "M": "%d" % month,
+ "DDDD": "%03d" % yearday,
+ "DDD": "%d" % yearday,
+ "DD": "%02d" % day,
+ "D": "%d" % day,
+ "dddd": day_name[weekday],
+ "ddd": day_abbr[weekday],
+ "d": "%d" % weekday,
+ "E": "%d" % (weekday + 1),
+ "HH": "%02d" % hour,
+ "H": "%d" % hour,
+ "hh": "%02d" % ((hour - 1) % 12 + 1),
+ "h": "%d" % ((hour - 1) % 12 + 1),
+ "mm": "%02d" % minute,
+ "m": "%d" % minute,
+ "ss": "%02d" % second,
+ "s": "%d" % second,
+ "S": "%d" % (microsecond // 100000),
+ "SS": "%02d" % (microsecond // 10000),
+ "SSS": "%03d" % (microsecond // 1000),
+ "SSSS": "%04d" % (microsecond // 100),
+ "SSSSS": "%05d" % (microsecond // 10),
+ "SSSSSS": "%06d" % microsecond,
+ "A": ("AM", "PM")[hour // 12],
+ "Z": "%s%02d:%02d%s" % (sign, h, m, (":%09.06f" % s)[: 11 if s % 1 else 3] * (s > 0)),
+ "ZZ": "%s%02d%02d%s" % (sign, h, m, ("%09.06f" % s)[: 10 if s % 1 else 2] * (s > 0)),
+ "zz": tzinfo.tzname(dt) or "",
+ "X": "%d" % timestamp,
+ "x": "%d" % (int(timestamp) * 1000000 + microsecond),
+ }
def get(m):
try:
return rep[m.group(0)]
except KeyError:
return m.group(0)[1:-1]
+
return pattern.sub(get, spec)
+
+
+def aware_now():
+ now = datetime_.now()
+ timestamp = now.timestamp()
+ local = localtime(timestamp)
+
+ try:
+ seconds = local.tm_gmtoff
+ zone = local.tm_zone
+ except AttributeError:
+ # Workaround for Python 3.5.
+ utc_naive = datetime_.fromtimestamp(timestamp, tz=timezone.utc).replace(tzinfo=None)
+ offset = datetime_.fromtimestamp(timestamp) - utc_naive
+ seconds = offset.total_seconds()
+ zone = strftime("%Z")
+
+ tzinfo = timezone(timedelta(seconds=seconds), zone)
+
+ return datetime.combine(now.date(), now.time().replace(tzinfo=tzinfo))
diff --git a/loguru/_defaults.py b/loguru/_defaults.py
index 2e33c3d..d3d8de7 100644
--- a/loguru/_defaults.py
+++ b/loguru/_defaults.py
@@ -1,35 +1,74 @@
from os import environ
-LOGURU_AUTOINIT = env('LOGURU_AUTOINIT', bool, True)
-LOGURU_FORMAT = env('LOGURU_FORMAT', str,
- '<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>'
- )
-LOGURU_FILTER = env('LOGURU_FILTER', str, None)
-LOGURU_LEVEL = env('LOGURU_LEVEL', str, 'DEBUG')
-LOGURU_COLORIZE = env('LOGURU_COLORIZE', bool, None)
-LOGURU_SERIALIZE = env('LOGURU_SERIALIZE', bool, False)
-LOGURU_BACKTRACE = env('LOGURU_BACKTRACE', bool, True)
-LOGURU_DIAGNOSE = env('LOGURU_DIAGNOSE', bool, True)
-LOGURU_ENQUEUE = env('LOGURU_ENQUEUE', bool, False)
-LOGURU_CONTEXT = env('LOGURU_CONTEXT', str, None)
-LOGURU_CATCH = env('LOGURU_CATCH', bool, True)
-LOGURU_TRACE_NO = env('LOGURU_TRACE_NO', int, 5)
-LOGURU_TRACE_COLOR = env('LOGURU_TRACE_COLOR', str, '<cyan><bold>')
-LOGURU_TRACE_ICON = env('LOGURU_TRACE_ICON', str, '✏️')
-LOGURU_DEBUG_NO = env('LOGURU_DEBUG_NO', int, 10)
-LOGURU_DEBUG_COLOR = env('LOGURU_DEBUG_COLOR', str, '<blue><bold>')
-LOGURU_DEBUG_ICON = env('LOGURU_DEBUG_ICON', str, '🐞')
-LOGURU_INFO_NO = env('LOGURU_INFO_NO', int, 20)
-LOGURU_INFO_COLOR = env('LOGURU_INFO_COLOR', str, '<bold>')
-LOGURU_INFO_ICON = env('LOGURU_INFO_ICON', str, 'ℹ️')
-LOGURU_SUCCESS_NO = env('LOGURU_SUCCESS_NO', int, 25)
-LOGURU_SUCCESS_COLOR = env('LOGURU_SUCCESS_COLOR', str, '<green><bold>')
-LOGURU_SUCCESS_ICON = env('LOGURU_SUCCESS_ICON', str, '✅')
-LOGURU_WARNING_NO = env('LOGURU_WARNING_NO', int, 30)
-LOGURU_WARNING_COLOR = env('LOGURU_WARNING_COLOR', str, '<yellow><bold>')
-LOGURU_WARNING_ICON = env('LOGURU_WARNING_ICON', str, '⚠️')
-LOGURU_ERROR_NO = env('LOGURU_ERROR_NO', int, 40)
-LOGURU_ERROR_COLOR = env('LOGURU_ERROR_COLOR', str, '<red><bold>')
-LOGURU_ERROR_ICON = env('LOGURU_ERROR_ICON', str, '❌')
-LOGURU_CRITICAL_NO = env('LOGURU_CRITICAL_NO', int, 50)
-LOGURU_CRITICAL_COLOR = env('LOGURU_CRITICAL_COLOR', str, '<RED><bold>')
-LOGURU_CRITICAL_ICON = env('LOGURU_CRITICAL_ICON', str, '☠️')
+
+
+def env(key, type_, default=None):
+ if key not in environ:
+ return default
+
+ val = environ[key]
+
+ if type_ == str:
+ return val
+ elif type_ == bool:
+ if val.lower() in ["1", "true", "yes", "y", "ok", "on"]:
+ return True
+ if val.lower() in ["0", "false", "no", "n", "nok", "off"]:
+ return False
+ raise ValueError(
+ "Invalid environment variable '%s' (expected a boolean): '%s'" % (key, val)
+ )
+ elif type_ == int:
+ try:
+ return int(val)
+ except ValueError:
+ raise ValueError(
+ "Invalid environment variable '%s' (expected an integer): '%s'" % (key, val)
+ ) from None
+
+
+LOGURU_AUTOINIT = env("LOGURU_AUTOINIT", bool, True)
+
+LOGURU_FORMAT = env(
+ "LOGURU_FORMAT",
+ str,
+ "<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
+ "<level>{level: <8}</level> | "
+ "<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
+)
+LOGURU_FILTER = env("LOGURU_FILTER", str, None)
+LOGURU_LEVEL = env("LOGURU_LEVEL", str, "DEBUG")
+LOGURU_COLORIZE = env("LOGURU_COLORIZE", bool, None)
+LOGURU_SERIALIZE = env("LOGURU_SERIALIZE", bool, False)
+LOGURU_BACKTRACE = env("LOGURU_BACKTRACE", bool, True)
+LOGURU_DIAGNOSE = env("LOGURU_DIAGNOSE", bool, True)
+LOGURU_ENQUEUE = env("LOGURU_ENQUEUE", bool, False)
+LOGURU_CONTEXT = env("LOGURU_CONTEXT", str, None)
+LOGURU_CATCH = env("LOGURU_CATCH", bool, True)
+
+LOGURU_TRACE_NO = env("LOGURU_TRACE_NO", int, 5)
+LOGURU_TRACE_COLOR = env("LOGURU_TRACE_COLOR", str, "<cyan><bold>")
+LOGURU_TRACE_ICON = env("LOGURU_TRACE_ICON", str, "\u270F\uFE0F") # Pencil
+
+LOGURU_DEBUG_NO = env("LOGURU_DEBUG_NO", int, 10)
+LOGURU_DEBUG_COLOR = env("LOGURU_DEBUG_COLOR", str, "<blue><bold>")
+LOGURU_DEBUG_ICON = env("LOGURU_DEBUG_ICON", str, "\U0001F41E") # Lady Beetle
+
+LOGURU_INFO_NO = env("LOGURU_INFO_NO", int, 20)
+LOGURU_INFO_COLOR = env("LOGURU_INFO_COLOR", str, "<bold>")
+LOGURU_INFO_ICON = env("LOGURU_INFO_ICON", str, "\u2139\uFE0F") # Information
+
+LOGURU_SUCCESS_NO = env("LOGURU_SUCCESS_NO", int, 25)
+LOGURU_SUCCESS_COLOR = env("LOGURU_SUCCESS_COLOR", str, "<green><bold>")
+LOGURU_SUCCESS_ICON = env("LOGURU_SUCCESS_ICON", str, "\u2705") # White Heavy Check Mark
+
+LOGURU_WARNING_NO = env("LOGURU_WARNING_NO", int, 30)
+LOGURU_WARNING_COLOR = env("LOGURU_WARNING_COLOR", str, "<yellow><bold>")
+LOGURU_WARNING_ICON = env("LOGURU_WARNING_ICON", str, "\u26A0\uFE0F") # Warning
+
+LOGURU_ERROR_NO = env("LOGURU_ERROR_NO", int, 40)
+LOGURU_ERROR_COLOR = env("LOGURU_ERROR_COLOR", str, "<red><bold>")
+LOGURU_ERROR_ICON = env("LOGURU_ERROR_ICON", str, "\u274C") # Cross Mark
+
+LOGURU_CRITICAL_NO = env("LOGURU_CRITICAL_NO", int, 50)
+LOGURU_CRITICAL_COLOR = env("LOGURU_CRITICAL_COLOR", str, "<RED><bold>")
+LOGURU_CRITICAL_ICON = env("LOGURU_CRITICAL_ICON", str, "\u2620\uFE0F") # Skull and Crossbones
diff --git a/loguru/_error_interceptor.py b/loguru/_error_interceptor.py
index e9758e4..9f63d3d 100644
--- a/loguru/_error_interceptor.py
+++ b/loguru/_error_interceptor.py
@@ -3,7 +3,32 @@ import traceback
class ErrorInterceptor:
-
def __init__(self, should_catch, handler_id):
self._should_catch = should_catch
self._handler_id = handler_id
+
+ def should_catch(self):
+ return self._should_catch
+
+ def print(self, record=None, *, exception=None):
+ if not sys.stderr:
+ return
+
+ if exception is None:
+ type_, value, traceback_ = sys.exc_info()
+ else:
+ type_, value, traceback_ = (type(exception), exception, exception.__traceback__)
+
+ try:
+ sys.stderr.write("--- Logging error in Loguru Handler #%d ---\n" % self._handler_id)
+ try:
+ record_repr = str(record)
+ except Exception:
+ record_repr = "/!\\ Unprintable record /!\\"
+ sys.stderr.write("Record was: %s\n" % record_repr)
+ traceback.print_exception(type_, value, traceback_, None, sys.stderr)
+ sys.stderr.write("--- End of logging error ---\n")
+ except OSError:
+ pass
+ finally:
+ del type_, value, traceback_
diff --git a/loguru/_file_sink.py b/loguru/_file_sink.py
index 8053660..bdc6ccd 100644
--- a/loguru/_file_sink.py
+++ b/loguru/_file_sink.py
@@ -7,66 +7,146 @@ import shutil
import string
from functools import partial
from stat import ST_DEV, ST_INO
+
from . import _string_parsers as string_parsers
from ._ctime_functions import get_ctime, set_ctime
from ._datetime import aware_now
-class FileDateFormatter:
+def generate_rename_path(root, ext, creation_time):
+ creation_datetime = datetime.datetime.fromtimestamp(creation_time)
+ date = FileDateFormatter(creation_datetime)
+
+ renamed_path = "{}.{}{}".format(root, date, ext)
+ counter = 1
+
+ while os.path.exists(renamed_path):
+ counter += 1
+ renamed_path = "{}.{}.{}{}".format(root, date, counter, ext)
+
+ return renamed_path
+
+class FileDateFormatter:
def __init__(self, datetime=None):
self.datetime = datetime or aware_now()
def __format__(self, spec):
if not spec:
- spec = '%Y-%m-%d_%H-%M-%S_%f'
+ spec = "%Y-%m-%d_%H-%M-%S_%f"
return self.datetime.__format__(spec)
class Compression:
- pass
+ @staticmethod
+ def add_compress(path_in, path_out, opener, **kwargs):
+ with opener(path_out, **kwargs) as f_comp:
+ f_comp.add(path_in, os.path.basename(path_in))
+
+ @staticmethod
+ def write_compress(path_in, path_out, opener, **kwargs):
+ with opener(path_out, **kwargs) as f_comp:
+ f_comp.write(path_in, os.path.basename(path_in))
+
+ @staticmethod
+ def copy_compress(path_in, path_out, opener, **kwargs):
+ with open(path_in, "rb") as f_in:
+ with opener(path_out, **kwargs) as f_out:
+ shutil.copyfileobj(f_in, f_out)
+
+ @staticmethod
+ def compression(path_in, ext, compress_function):
+ path_out = "{}{}".format(path_in, ext)
+
+ if os.path.exists(path_out):
+ creation_time = get_ctime(path_out)
+ root, ext_before = os.path.splitext(path_in)
+ renamed_path = generate_rename_path(root, ext_before + ext, creation_time)
+ os.rename(path_out, renamed_path)
+ compress_function(path_in, path_out)
+ os.remove(path_in)
class Retention:
- pass
+ @staticmethod
+ def retention_count(logs, number):
+ def key_log(log):
+ return (-os.stat(log).st_mtime, log)
+
+ for log in sorted(logs, key=key_log)[number:]:
+ os.remove(log)
+
+ @staticmethod
+ def retention_age(logs, seconds):
+ t = datetime.datetime.now().timestamp()
+ for log in logs:
+ if os.stat(log).st_mtime <= t - seconds:
+ os.remove(log)
class Rotation:
+ @staticmethod
+ def forward_day(t):
+ return t + datetime.timedelta(days=1)
+ @staticmethod
+ def forward_weekday(t, weekday):
+ while True:
+ t += datetime.timedelta(days=1)
+ if t.weekday() == weekday:
+ return t
- class RotationTime:
+ @staticmethod
+ def forward_interval(t, interval):
+ return t + interval
+ @staticmethod
+ def rotation_size(message, file, size_limit):
+ file.seek(0, 2)
+ return file.tell() + len(message) > size_limit
+
+ class RotationTime:
def __init__(self, step_forward, time_init=None):
self._step_forward = step_forward
self._time_init = time_init
self._limit = None
def __call__(self, message, file):
- record_time = message.record['time']
+ record_time = message.record["time"]
+
if self._limit is None:
filepath = os.path.realpath(file.name)
creation_time = get_ctime(filepath)
set_ctime(filepath, creation_time)
- start_time = datetime.datetime.fromtimestamp(creation_time,
- tz=datetime.timezone.utc)
+ start_time = datetime.datetime.fromtimestamp(
+ creation_time, tz=datetime.timezone.utc
+ )
+
time_init = self._time_init
+
if time_init is None:
- limit = start_time.astimezone(record_time.tzinfo).replace(
- tzinfo=None)
+ limit = start_time.astimezone(record_time.tzinfo).replace(tzinfo=None)
limit = self._step_forward(limit)
else:
- tzinfo = (record_time.tzinfo if time_init.tzinfo is
- None else time_init.tzinfo)
- limit = start_time.astimezone(tzinfo).replace(hour=
- time_init.hour, minute=time_init.minute, second=
- time_init.second, microsecond=time_init.microsecond)
+ tzinfo = record_time.tzinfo if time_init.tzinfo is None else time_init.tzinfo
+ limit = start_time.astimezone(tzinfo).replace(
+ hour=time_init.hour,
+ minute=time_init.minute,
+ second=time_init.second,
+ microsecond=time_init.microsecond,
+ )
+
if limit <= start_time:
limit = self._step_forward(limit)
+
if time_init.tzinfo is None:
limit = limit.replace(tzinfo=None)
+
self._limit = limit
+
if self._limit.tzinfo is None:
record_time = record_time.replace(tzinfo=None)
+
if record_time >= self._limit:
while self._limit <= record_time:
self._limit = self._step_forward(self._limit)
@@ -75,25 +155,280 @@ class Rotation:
class FileSink:
-
- def __init__(self, path, *, rotation=None, retention=None, compression=
- None, delay=False, watch=False, mode='a', buffering=1, encoding=
- 'utf8', **kwargs):
+ def __init__(
+ self,
+ path,
+ *,
+ rotation=None,
+ retention=None,
+ compression=None,
+ delay=False,
+ watch=False,
+ mode="a",
+ buffering=1,
+ encoding="utf8",
+ **kwargs
+ ):
self.encoding = encoding
- self._kwargs = {**kwargs, 'mode': mode, 'buffering': buffering,
- 'encoding': self.encoding}
+
+ self._kwargs = {**kwargs, "mode": mode, "buffering": buffering, "encoding": self.encoding}
self._path = str(path)
+
self._glob_patterns = self._make_glob_patterns(self._path)
self._rotation_function = self._make_rotation_function(rotation)
self._retention_function = self._make_retention_function(retention)
- self._compression_function = self._make_compression_function(
- compression)
+ self._compression_function = self._make_compression_function(compression)
+
self._file = None
self._file_path = None
+
self._watch = watch
self._file_dev = -1
self._file_ino = -1
+
if not delay:
path = self._create_path()
self._create_dirs(path)
self._create_file(path)
+
+ def write(self, message):
+ if self._file is None:
+ path = self._create_path()
+ self._create_dirs(path)
+ self._create_file(path)
+
+ if self._watch:
+ self._reopen_if_needed()
+
+ if self._rotation_function is not None and self._rotation_function(message, self._file):
+ self._terminate_file(is_rotating=True)
+
+ self._file.write(message)
+
+ def stop(self):
+ if self._watch:
+ self._reopen_if_needed()
+
+ self._terminate_file(is_rotating=False)
+
+ def tasks_to_complete(self):
+ return []
+
+ def _create_path(self):
+ path = self._path.format_map({"time": FileDateFormatter()})
+ return os.path.abspath(path)
+
+ def _create_dirs(self, path):
+ dirname = os.path.dirname(path)
+ os.makedirs(dirname, exist_ok=True)
+
+ def _create_file(self, path):
+ self._file = open(path, **self._kwargs)
+ self._file_path = path
+
+ if self._watch:
+ fileno = self._file.fileno()
+ result = os.fstat(fileno)
+ self._file_dev = result[ST_DEV]
+ self._file_ino = result[ST_INO]
+
+ def _close_file(self):
+ self._file.flush()
+ self._file.close()
+
+ self._file = None
+ self._file_path = None
+ self._file_dev = -1
+ self._file_ino = -1
+
+ def _reopen_if_needed(self):
+ # Implemented based on standard library:
+ # https://github.com/python/cpython/blob/cb589d1b/Lib/logging/handlers.py#L486
+ if not self._file:
+ return
+
+ filepath = self._file_path
+
+ try:
+ result = os.stat(filepath)
+ except FileNotFoundError:
+ result = None
+
+ if not result or result[ST_DEV] != self._file_dev or result[ST_INO] != self._file_ino:
+ self._close_file()
+ self._create_dirs(filepath)
+ self._create_file(filepath)
+
+ def _terminate_file(self, *, is_rotating=False):
+ old_path = self._file_path
+
+ if self._file is not None:
+ self._close_file()
+
+ if is_rotating:
+ new_path = self._create_path()
+ self._create_dirs(new_path)
+
+ if new_path == old_path:
+ creation_time = get_ctime(old_path)
+ root, ext = os.path.splitext(old_path)
+ renamed_path = generate_rename_path(root, ext, creation_time)
+ os.rename(old_path, renamed_path)
+ old_path = renamed_path
+
+ if is_rotating or self._rotation_function is None:
+ if self._compression_function is not None and old_path is not None:
+ self._compression_function(old_path)
+
+ if self._retention_function is not None:
+ logs = {
+ file
+ for pattern in self._glob_patterns
+ for file in glob.glob(pattern)
+ if os.path.isfile(file)
+ }
+ self._retention_function(list(logs))
+
+ if is_rotating:
+ self._create_file(new_path)
+ set_ctime(new_path, datetime.datetime.now().timestamp())
+
+ @staticmethod
+ def _make_glob_patterns(path):
+ formatter = string.Formatter()
+ tokens = formatter.parse(path)
+ escaped = "".join(glob.escape(text) + "*" * (name is not None) for text, name, *_ in tokens)
+
+ root, ext = os.path.splitext(escaped)
+
+ if not ext:
+ return [escaped, escaped + ".*"]
+
+ return [escaped, escaped + ".*", root + ".*" + ext, root + ".*" + ext + ".*"]
+
+ @staticmethod
+ def _make_rotation_function(rotation):
+ if rotation is None:
+ return None
+ elif isinstance(rotation, str):
+ size = string_parsers.parse_size(rotation)
+ if size is not None:
+ return FileSink._make_rotation_function(size)
+ interval = string_parsers.parse_duration(rotation)
+ if interval is not None:
+ return FileSink._make_rotation_function(interval)
+ frequency = string_parsers.parse_frequency(rotation)
+ if frequency is not None:
+ return Rotation.RotationTime(frequency)
+ daytime = string_parsers.parse_daytime(rotation)
+ if daytime is not None:
+ day, time = daytime
+ if day is None:
+ return FileSink._make_rotation_function(time)
+ if time is None:
+ time = datetime.time(0, 0, 0)
+ step_forward = partial(Rotation.forward_weekday, weekday=day)
+ return Rotation.RotationTime(step_forward, time)
+ raise ValueError("Cannot parse rotation from: '%s'" % rotation)
+ elif isinstance(rotation, (numbers.Real, decimal.Decimal)):
+ return partial(Rotation.rotation_size, size_limit=rotation)
+ elif isinstance(rotation, datetime.time):
+ return Rotation.RotationTime(Rotation.forward_day, rotation)
+ elif isinstance(rotation, datetime.timedelta):
+ step_forward = partial(Rotation.forward_interval, interval=rotation)
+ return Rotation.RotationTime(step_forward)
+ elif callable(rotation):
+ return rotation
+ else:
+ raise TypeError(
+ "Cannot infer rotation for objects of type: '%s'" % type(rotation).__name__
+ )
+
+ @staticmethod
+ def _make_retention_function(retention):
+ if retention is None:
+ return None
+ elif isinstance(retention, str):
+ interval = string_parsers.parse_duration(retention)
+ if interval is None:
+ raise ValueError("Cannot parse retention from: '%s'" % retention)
+ return FileSink._make_retention_function(interval)
+ elif isinstance(retention, int):
+ return partial(Retention.retention_count, number=retention)
+ elif isinstance(retention, datetime.timedelta):
+ return partial(Retention.retention_age, seconds=retention.total_seconds())
+ elif callable(retention):
+ return retention
+ else:
+ raise TypeError(
+ "Cannot infer retention for objects of type: '%s'" % type(retention).__name__
+ )
+
+ @staticmethod
+ def _make_compression_function(compression):
+ if compression is None:
+ return None
+ elif isinstance(compression, str):
+ ext = compression.strip().lstrip(".")
+
+ if ext == "gz":
+ import gzip
+
+ compress = partial(Compression.copy_compress, opener=gzip.open, mode="wb")
+ elif ext == "bz2":
+ import bz2
+
+ compress = partial(Compression.copy_compress, opener=bz2.open, mode="wb")
+
+ elif ext == "xz":
+ import lzma
+
+ compress = partial(
+ Compression.copy_compress, opener=lzma.open, mode="wb", format=lzma.FORMAT_XZ
+ )
+
+ elif ext == "lzma":
+ import lzma
+
+ compress = partial(
+ Compression.copy_compress, opener=lzma.open, mode="wb", format=lzma.FORMAT_ALONE
+ )
+ elif ext == "tar":
+ import tarfile
+
+ compress = partial(Compression.add_compress, opener=tarfile.open, mode="w:")
+ elif ext == "tar.gz":
+ import gzip
+ import tarfile
+
+ compress = partial(Compression.add_compress, opener=tarfile.open, mode="w:gz")
+ elif ext == "tar.bz2":
+ import bz2
+ import tarfile
+
+ compress = partial(Compression.add_compress, opener=tarfile.open, mode="w:bz2")
+
+ elif ext == "tar.xz":
+ import lzma
+ import tarfile
+
+ compress = partial(Compression.add_compress, opener=tarfile.open, mode="w:xz")
+ elif ext == "zip":
+ import zipfile
+
+ compress = partial(
+ Compression.write_compress,
+ opener=zipfile.ZipFile,
+ mode="w",
+ compression=zipfile.ZIP_DEFLATED,
+ )
+ else:
+ raise ValueError("Invalid compression format: '%s'" % ext)
+
+ return partial(Compression.compression, ext="." + ext, compress_function=compress)
+ elif callable(compression):
+ return compression
+ else:
+ raise TypeError(
+ "Cannot infer compression for objects of type: '%s'" % type(compression).__name__
+ )
diff --git a/loguru/_filters.py b/loguru/_filters.py
index e69de29..a192b26 100644
--- a/loguru/_filters.py
+++ b/loguru/_filters.py
@@ -0,0 +1,24 @@
+def filter_none(record):
+ return record["name"] is not None
+
+
+def filter_by_name(record, parent, length):
+ name = record["name"]
+ if name is None:
+ return False
+ return (name + ".")[:length] == parent
+
+
+def filter_by_level(record, level_per_module):
+ name = record["name"]
+
+ while True:
+ level = level_per_module.get(name, None)
+ if level is False:
+ return False
+ if level is not None:
+ return record["level"].no >= level
+ if not name:
+ return True
+ index = name.rfind(".")
+ name = name[:index] if index != -1 else ""
diff --git a/loguru/_get_frame.py b/loguru/_get_frame.py
index 2acdfbf..ee6eaab 100644
--- a/loguru/_get_frame.py
+++ b/loguru/_get_frame.py
@@ -1,3 +1,23 @@
import sys
from sys import exc_info
+
+
+def get_frame_fallback(n):
+ try:
+ raise Exception
+ except Exception:
+ frame = exc_info()[2].tb_frame.f_back
+ for _ in range(n):
+ frame = frame.f_back
+ return frame
+
+
+def load_get_frame_function():
+ if hasattr(sys, "_getframe"):
+ get_frame = sys._getframe
+ else:
+ get_frame = get_frame_fallback
+ return get_frame
+
+
get_frame = load_get_frame_function()
diff --git a/loguru/_handler.py b/loguru/_handler.py
index cbb5940..81a3dca 100644
--- a/loguru/_handler.py
+++ b/loguru/_handler.py
@@ -5,20 +5,48 @@ import os
import threading
from contextlib import contextmanager
from threading import Thread
+
from ._colorizer import Colorizer
from ._locks_machinery import create_handler_lock
+def prepare_colored_format(format_, ansi_level):
+ colored = Colorizer.prepare_format(format_)
+ return colored, colored.colorize(ansi_level)
+
+
+def prepare_stripped_format(format_):
+ colored = Colorizer.prepare_format(format_)
+ return colored.strip()
+
+
+def memoize(function):
+ return functools.lru_cache(maxsize=64)(function)
+
+
class Message(str):
- __slots__ = 'record',
+ __slots__ = ("record",)
class Handler:
-
- def __init__(self, *, sink, name, levelno, formatter,
- is_formatter_dynamic, filter_, colorize, serialize, enqueue,
- multiprocessing_context, error_interceptor, exception_formatter,
- id_, levels_ansi_codes):
+ def __init__(
+ self,
+ *,
+ sink,
+ name,
+ levelno,
+ formatter,
+ is_formatter_dynamic,
+ filter_,
+ colorize,
+ serialize,
+ enqueue,
+ multiprocessing_context,
+ error_interceptor,
+ exception_formatter,
+ id_,
+ levels_ansi_codes
+ ):
self._name = name
self._sink = sink
self._levelno = levelno
@@ -32,10 +60,12 @@ class Handler:
self._error_interceptor = error_interceptor
self._exception_formatter = exception_formatter
self._id = id_
- self._levels_ansi_codes = levels_ansi_codes
+ self._levels_ansi_codes = levels_ansi_codes # Warning, reference shared among handlers
+
self._decolorized_format = None
self._precolorized_formats = {}
self._memoize_dynamic_format = None
+
self._stopped = False
self._lock = create_handler_lock()
self._lock_acquired = threading.local()
@@ -45,16 +75,19 @@ class Handler:
self._confirmation_lock = None
self._owner_process_pid = None
self._thread = None
+
if self._is_formatter_dynamic:
if self._colorize:
self._memoize_dynamic_format = memoize(prepare_colored_format)
else:
self._memoize_dynamic_format = memoize(prepare_stripped_format)
- elif self._colorize:
- for level_name in self._levels_ansi_codes:
- self.update_format(level_name)
else:
- self._decolorized_format = self._formatter.strip()
+ if self._colorize:
+ for level_name in self._levels_ansi_codes:
+ self.update_format(level_name)
+ else:
+ self._decolorized_format = self._formatter.strip()
+
if self._enqueue:
if self._multiprocessing_context is None:
self._queue = multiprocessing.SimpleQueue()
@@ -62,34 +95,237 @@ class Handler:
self._confirmation_lock = multiprocessing.Lock()
else:
self._queue = self._multiprocessing_context.SimpleQueue()
- self._confirmation_event = self._multiprocessing_context.Event(
- )
+ self._confirmation_event = self._multiprocessing_context.Event()
self._confirmation_lock = self._multiprocessing_context.Lock()
self._queue_lock = create_handler_lock()
self._owner_process_pid = os.getpid()
- self._thread = Thread(target=self._queued_writer, daemon=True,
- name='loguru-writer-%d' % self._id)
+ self._thread = Thread(
+ target=self._queued_writer, daemon=True, name="loguru-writer-%d" % self._id
+ )
self._thread.start()
def __repr__(self):
- return '(id=%d, level=%d, sink=%s)' % (self._id, self._levelno,
- self._name)
+ return "(id=%d, level=%d, sink=%s)" % (self._id, self._levelno, self._name)
@contextmanager
def _protected_lock(self):
"""Acquire the lock, but fail fast if its already acquired by the current thread."""
- pass
+ if getattr(self._lock_acquired, "acquired", False):
+ raise RuntimeError(
+ "Could not acquire internal lock because it was already in use (deadlock avoided). "
+ "This likely happened because the logger was re-used inside a sink, a signal "
+ "handler or a '__del__' method. This is not permitted because the logger and its "
+ "handlers are not re-entrant."
+ )
+ self._lock_acquired.acquired = True
+ try:
+ with self._lock:
+ yield
+ finally:
+ self._lock_acquired.acquired = False
+
+ def emit(self, record, level_id, from_decorator, is_raw, colored_message):
+ try:
+ if self._levelno > record["level"].no:
+ return
+
+ if self._filter is not None:
+ if not self._filter(record):
+ return
+
+ if self._is_formatter_dynamic:
+ dynamic_format = self._formatter(record)
+
+ formatter_record = record.copy()
+
+ if not record["exception"]:
+ formatter_record["exception"] = ""
+ else:
+ type_, value, tb = record["exception"]
+ formatter = self._exception_formatter
+ lines = formatter.format_exception(type_, value, tb, from_decorator=from_decorator)
+ formatter_record["exception"] = "".join(lines)
+
+ if colored_message is not None and colored_message.stripped != record["message"]:
+ colored_message = None
+
+ if is_raw:
+ if colored_message is None or not self._colorize:
+ formatted = record["message"]
+ else:
+ ansi_level = self._levels_ansi_codes[level_id]
+ formatted = colored_message.colorize(ansi_level)
+ elif self._is_formatter_dynamic:
+ if not self._colorize:
+ precomputed_format = self._memoize_dynamic_format(dynamic_format)
+ formatted = precomputed_format.format_map(formatter_record)
+ elif colored_message is None:
+ ansi_level = self._levels_ansi_codes[level_id]
+ _, precomputed_format = self._memoize_dynamic_format(dynamic_format, ansi_level)
+ formatted = precomputed_format.format_map(formatter_record)
+ else:
+ ansi_level = self._levels_ansi_codes[level_id]
+ formatter, precomputed_format = self._memoize_dynamic_format(
+ dynamic_format, ansi_level
+ )
+ coloring_message = formatter.make_coloring_message(
+ record["message"], ansi_level=ansi_level, colored_message=colored_message
+ )
+ formatter_record["message"] = coloring_message
+ formatted = precomputed_format.format_map(formatter_record)
+
+ else:
+ if not self._colorize:
+ precomputed_format = self._decolorized_format
+ formatted = precomputed_format.format_map(formatter_record)
+ elif colored_message is None:
+ ansi_level = self._levels_ansi_codes[level_id]
+ precomputed_format = self._precolorized_formats[level_id]
+ formatted = precomputed_format.format_map(formatter_record)
+ else:
+ ansi_level = self._levels_ansi_codes[level_id]
+ precomputed_format = self._precolorized_formats[level_id]
+ coloring_message = self._formatter.make_coloring_message(
+ record["message"], ansi_level=ansi_level, colored_message=colored_message
+ )
+ formatter_record["message"] = coloring_message
+ formatted = precomputed_format.format_map(formatter_record)
+
+ if self._serialize:
+ formatted = self._serialize_record(formatted, record)
+
+ str_record = Message(formatted)
+ str_record.record = record
+
+ with self._protected_lock():
+ if self._stopped:
+ return
+ if self._enqueue:
+ self._queue.put(str_record)
+ else:
+ self._sink.write(str_record)
+ except Exception:
+ if not self._error_interceptor.should_catch():
+ raise
+ self._error_interceptor.print(record)
+
+ def stop(self):
+ with self._protected_lock():
+ self._stopped = True
+ if self._enqueue:
+ if self._owner_process_pid != os.getpid():
+ return
+ self._queue.put(None)
+ self._thread.join()
+ if hasattr(self._queue, "close"):
+ self._queue.close()
+
+ self._sink.stop()
+
+ def complete_queue(self):
+ if not self._enqueue:
+ return
+
+ with self._confirmation_lock:
+ self._queue.put(True)
+ self._confirmation_event.wait()
+ self._confirmation_event.clear()
+
+ def tasks_to_complete(self):
+ if self._enqueue and self._owner_process_pid != os.getpid():
+ return []
+ lock = self._queue_lock if self._enqueue else self._protected_lock()
+ with lock:
+ return self._sink.tasks_to_complete()
+
+ def update_format(self, level_id):
+ if not self._colorize or self._is_formatter_dynamic:
+ return
+ ansi_code = self._levels_ansi_codes[level_id]
+ self._precolorized_formats[level_id] = self._formatter.colorize(ansi_code)
+
+ @property
+ def levelno(self):
+ return self._levelno
+
+ @staticmethod
+ def _serialize_record(text, record):
+ exception = record["exception"]
+
+ if exception is not None:
+ exception = {
+ "type": None if exception.type is None else exception.type.__name__,
+ "value": exception.value,
+ "traceback": bool(exception.traceback),
+ }
+
+ serializable = {
+ "text": text,
+ "record": {
+ "elapsed": {
+ "repr": record["elapsed"],
+ "seconds": record["elapsed"].total_seconds(),
+ },
+ "exception": exception,
+ "extra": record["extra"],
+ "file": {"name": record["file"].name, "path": record["file"].path},
+ "function": record["function"],
+ "level": {
+ "icon": record["level"].icon,
+ "name": record["level"].name,
+ "no": record["level"].no,
+ },
+ "line": record["line"],
+ "message": record["message"],
+ "module": record["module"],
+ "name": record["name"],
+ "process": {"id": record["process"].id, "name": record["process"].name},
+ "thread": {"id": record["thread"].id, "name": record["thread"].name},
+ "time": {"repr": record["time"], "timestamp": record["time"].timestamp()},
+ },
+ }
+
+ return json.dumps(serializable, default=str, ensure_ascii=False) + "\n"
+
+ def _queued_writer(self):
+ message = None
+ queue = self._queue
+
+ # We need to use a lock to protect sink during fork.
+ # Particularly, writing to stderr may lead to deadlock in child process.
+ lock = self._queue_lock
+
+ while True:
+ try:
+ message = queue.get()
+ except Exception:
+ with lock:
+ self._error_interceptor.print(None)
+ continue
+
+ if message is None:
+ break
+
+ if message is True:
+ self._confirmation_event.set()
+ continue
+
+ with lock:
+ try:
+ self._sink.write(message)
+ except Exception:
+ self._error_interceptor.print(message.record)
def __getstate__(self):
state = self.__dict__.copy()
- state['_lock'] = None
- state['_lock_acquired'] = None
- state['_memoize_dynamic_format'] = None
+ state["_lock"] = None
+ state["_lock_acquired"] = None
+ state["_memoize_dynamic_format"] = None
if self._enqueue:
- state['_sink'] = None
- state['_thread'] = None
- state['_owner_process'] = None
- state['_queue_lock'] = None
+ state["_sink"] = None
+ state["_thread"] = None
+ state["_owner_process"] = None
+ state["_queue_lock"] = None
return state
def __setstate__(self, state):
diff --git a/loguru/_locks_machinery.py b/loguru/_locks_machinery.py
index a65aee1..6f02110 100644
--- a/loguru/_locks_machinery.py
+++ b/loguru/_locks_machinery.py
@@ -1,9 +1,50 @@
import os
import threading
import weakref
-if not hasattr(os, 'register_at_fork'):
+
+if not hasattr(os, "register_at_fork"):
+
+ def create_logger_lock():
+ return threading.Lock()
+
+ def create_handler_lock():
+ return threading.Lock()
+
else:
+ # While forking, we need to sanitize all locks to make sure the child process doesn't run into
+ # a deadlock (if a lock already acquired is inherited) and to protect sink from corrupted state.
+ # It's very important to acquire logger locks before handlers one to prevent possible deadlock
+ # while 'remove()' is called for example.
+
logger_locks = weakref.WeakSet()
handler_locks = weakref.WeakSet()
- os.register_at_fork(before=acquire_locks, after_in_parent=release_locks,
- after_in_child=release_locks)
+
+ def acquire_locks():
+ for lock in logger_locks:
+ lock.acquire()
+
+ for lock in handler_locks:
+ lock.acquire()
+
+ def release_locks():
+ for lock in logger_locks:
+ lock.release()
+
+ for lock in handler_locks:
+ lock.release()
+
+ os.register_at_fork(
+ before=acquire_locks,
+ after_in_parent=release_locks,
+ after_in_child=release_locks,
+ )
+
+ def create_logger_lock():
+ lock = threading.Lock()
+ logger_locks.add(lock)
+ return lock
+
+ def create_handler_lock():
+ lock = threading.Lock()
+ handler_locks.add(lock)
+ return lock
diff --git a/loguru/_logger.py b/loguru/_logger.py
index 8812d5c..f750967 100644
--- a/loguru/_logger.py
+++ b/loguru/_logger.py
@@ -95,6 +95,7 @@ from multiprocessing import current_process, get_context
from multiprocessing.context import BaseContext
from os.path import basename, splitext
from threading import current_thread
+
from . import _asyncio_loop, _colorama, _defaults, _filters
from ._better_exceptions import ExceptionFormatter
from ._colorizer import Colorizer
@@ -107,49 +108,95 @@ from ._handler import Handler
from ._locks_machinery import create_logger_lock
from ._recattrs import RecordException, RecordFile, RecordLevel, RecordProcess, RecordThread
from ._simple_sinks import AsyncSink, CallableSink, StandardSink, StreamSink
+
if sys.version_info >= (3, 6):
from os import PathLike
else:
from pathlib import PurePath as PathLike
-Level = namedtuple('Level', ['name', 'no', 'color', 'icon'])
+
+
+Level = namedtuple("Level", ["name", "no", "color", "icon"])
+
start_time = aware_now()
-context = ContextVar('loguru_context', default={})
+context = ContextVar("loguru_context", default={})
-class Core:
+class Core:
def __init__(self):
- levels = [Level('TRACE', _defaults.LOGURU_TRACE_NO, _defaults.
- LOGURU_TRACE_COLOR, _defaults.LOGURU_TRACE_ICON), Level('DEBUG',
- _defaults.LOGURU_DEBUG_NO, _defaults.LOGURU_DEBUG_COLOR,
- _defaults.LOGURU_DEBUG_ICON), Level('INFO', _defaults.
- LOGURU_INFO_NO, _defaults.LOGURU_INFO_COLOR, _defaults.
- LOGURU_INFO_ICON), Level('SUCCESS', _defaults.LOGURU_SUCCESS_NO,
- _defaults.LOGURU_SUCCESS_COLOR, _defaults.LOGURU_SUCCESS_ICON),
- Level('WARNING', _defaults.LOGURU_WARNING_NO, _defaults.
- LOGURU_WARNING_COLOR, _defaults.LOGURU_WARNING_ICON), Level(
- 'ERROR', _defaults.LOGURU_ERROR_NO, _defaults.
- LOGURU_ERROR_COLOR, _defaults.LOGURU_ERROR_ICON), Level(
- 'CRITICAL', _defaults.LOGURU_CRITICAL_NO, _defaults.
- LOGURU_CRITICAL_COLOR, _defaults.LOGURU_CRITICAL_ICON)]
+ levels = [
+ Level(
+ "TRACE",
+ _defaults.LOGURU_TRACE_NO,
+ _defaults.LOGURU_TRACE_COLOR,
+ _defaults.LOGURU_TRACE_ICON,
+ ),
+ Level(
+ "DEBUG",
+ _defaults.LOGURU_DEBUG_NO,
+ _defaults.LOGURU_DEBUG_COLOR,
+ _defaults.LOGURU_DEBUG_ICON,
+ ),
+ Level(
+ "INFO",
+ _defaults.LOGURU_INFO_NO,
+ _defaults.LOGURU_INFO_COLOR,
+ _defaults.LOGURU_INFO_ICON,
+ ),
+ Level(
+ "SUCCESS",
+ _defaults.LOGURU_SUCCESS_NO,
+ _defaults.LOGURU_SUCCESS_COLOR,
+ _defaults.LOGURU_SUCCESS_ICON,
+ ),
+ Level(
+ "WARNING",
+ _defaults.LOGURU_WARNING_NO,
+ _defaults.LOGURU_WARNING_COLOR,
+ _defaults.LOGURU_WARNING_ICON,
+ ),
+ Level(
+ "ERROR",
+ _defaults.LOGURU_ERROR_NO,
+ _defaults.LOGURU_ERROR_COLOR,
+ _defaults.LOGURU_ERROR_ICON,
+ ),
+ Level(
+ "CRITICAL",
+ _defaults.LOGURU_CRITICAL_NO,
+ _defaults.LOGURU_CRITICAL_COLOR,
+ _defaults.LOGURU_CRITICAL_ICON,
+ ),
+ ]
self.levels = {level.name: level for level in levels}
- self.levels_ansi_codes = {**{name: Colorizer.ansify(level.color) for
- name, level in self.levels.items()}, None: ''}
- self.levels_lookup = {name: (name, name, level.no, level.icon) for
- name, level in self.levels.items()}
+ self.levels_ansi_codes = {
+ **{name: Colorizer.ansify(level.color) for name, level in self.levels.items()},
+ None: "",
+ }
+
+ # Cache used internally to quickly access level attributes based on their name or severity.
+ # It can also contain integers as keys, it serves to avoid calling "isinstance()" repeatedly
+ # when "logger.log()" is used.
+ self.levels_lookup = {
+ name: (name, name, level.no, level.icon) for name, level in self.levels.items()
+ }
+
self.handlers_count = 0
self.handlers = {}
+
self.extra = {}
self.patcher = None
- self.min_level = float('inf')
+
+ self.min_level = float("inf")
self.enabled = {}
self.activation_list = []
self.activation_none = True
+
self.lock = create_logger_lock()
def __getstate__(self):
state = self.__dict__.copy()
- state['lock'] = None
+ state["lock"] = None
return state
def __setstate__(self, state):
@@ -180,23 +227,30 @@ class Logger:
You should not instantiate a |Logger| by yourself, use ``from loguru import logger`` instead.
"""
- def __init__(self, core, exception, depth, record, lazy, colors, raw,
- capture, patchers, extra):
+ def __init__(self, core, exception, depth, record, lazy, colors, raw, capture, patchers, extra):
self._core = core
- self._options = (exception, depth, record, lazy, colors, raw,
- capture, patchers, extra)
+ self._options = (exception, depth, record, lazy, colors, raw, capture, patchers, extra)
def __repr__(self):
- return '<loguru.logger handlers=%r>' % list(self._core.handlers.
- values())
-
- def add(self, sink, *, level=_defaults.LOGURU_LEVEL, format=_defaults.
- LOGURU_FORMAT, filter=_defaults.LOGURU_FILTER, colorize=_defaults.
- LOGURU_COLORIZE, serialize=_defaults.LOGURU_SERIALIZE, backtrace=
- _defaults.LOGURU_BACKTRACE, diagnose=_defaults.LOGURU_DIAGNOSE,
- enqueue=_defaults.LOGURU_ENQUEUE, context=_defaults.LOGURU_CONTEXT,
- catch=_defaults.LOGURU_CATCH, **kwargs):
- """Add a handler sending log messages to a sink adequately configured.
+ return "<loguru.logger handlers=%r>" % list(self._core.handlers.values())
+
+ def add(
+ self,
+ sink,
+ *,
+ level=_defaults.LOGURU_LEVEL,
+ format=_defaults.LOGURU_FORMAT,
+ filter=_defaults.LOGURU_FILTER,
+ colorize=_defaults.LOGURU_COLORIZE,
+ serialize=_defaults.LOGURU_SERIALIZE,
+ backtrace=_defaults.LOGURU_BACKTRACE,
+ diagnose=_defaults.LOGURU_DIAGNOSE,
+ enqueue=_defaults.LOGURU_ENQUEUE,
+ context=_defaults.LOGURU_CONTEXT,
+ catch=_defaults.LOGURU_CATCH,
+ **kwargs
+ ):
+ r"""Add a handler sending log messages to a sink adequately configured.
Parameters
----------
@@ -334,7 +388,7 @@ class Logger:
If fine-grained control is needed, the ``format`` can also be a function which takes the
record as parameter and return the format template string. However, note that in such a
case, you should take care of appending the line ending and exception field to the returned
- format, while ``"\\n{exception}"`` is automatically appended for convenience if ``format`` is
+ format, while ``"\n{exception}"`` is automatically appended for convenience if ``format`` is
a string.
The ``filter`` attribute can be used to control which messages are effectively passed to the
@@ -604,9 +658,9 @@ class Logger:
Tags which are not recognized will raise an exception during parsing, to inform you about
possible misuse. If you wish to display a markup tag literally, you can escape it by
- prepending a ``\\`` like for example ``\\<blue>``. If, for some reason, you need to escape a
+ prepending a ``\`` like for example ``\<blue>``. If, for some reason, you need to escape a
string programmatically, note that the regex used internally to parse markup tags is
- ``r"\\\\?</?((?:[fb]g\\s)?[^<>\\s]*)>"``.
+ ``r"\\?</?((?:[fb]g\s)?[^<>\s]*)>"``.
Note that when logging a message with ``opt(colors=True)``, color tags present in the
formatting arguments (``args`` and ``kwargs``) are completely ignored. This is important if
@@ -722,7 +776,239 @@ class Logger:
>>> stream_object = RandomStream(seed=12345, threshold=0.25)
>>> logger.add(stream_object, level="INFO")
"""
- pass
+ with self._core.lock:
+ handler_id = self._core.handlers_count
+ self._core.handlers_count += 1
+
+ error_interceptor = ErrorInterceptor(catch, handler_id)
+
+ if colorize is None and serialize:
+ colorize = False
+
+ if isinstance(sink, (str, PathLike)):
+ path = sink
+ name = "'%s'" % path
+
+ if colorize is None:
+ colorize = False
+
+ wrapped_sink = FileSink(path, **kwargs)
+ kwargs = {}
+ encoding = wrapped_sink.encoding
+ terminator = "\n"
+ exception_prefix = ""
+ elif hasattr(sink, "write") and callable(sink.write):
+ name = getattr(sink, "name", None) or repr(sink)
+
+ if colorize is None:
+ colorize = _colorama.should_colorize(sink)
+
+ if colorize is True and _colorama.should_wrap(sink):
+ stream = _colorama.wrap(sink)
+ else:
+ stream = sink
+
+ wrapped_sink = StreamSink(stream)
+ encoding = getattr(sink, "encoding", None)
+ terminator = "\n"
+ exception_prefix = ""
+ elif isinstance(sink, logging.Handler):
+ name = repr(sink)
+
+ if colorize is None:
+ colorize = False
+
+ wrapped_sink = StandardSink(sink)
+ encoding = getattr(sink, "encoding", None)
+ terminator = ""
+ exception_prefix = "\n"
+ elif iscoroutinefunction(sink) or iscoroutinefunction(
+ getattr(sink, "__call__", None) # noqa: B004
+ ):
+ name = getattr(sink, "__name__", None) or repr(sink)
+
+ if colorize is None:
+ colorize = False
+
+ loop = kwargs.pop("loop", None)
+
+ # The worker thread needs an event loop, it can't create a new one internally because it
+ # has to be accessible by the user while calling "complete()", instead we use the global
+ # one when the sink is added. If "enqueue=False" the event loop is dynamically retrieved
+ # at each logging call, which is much more convenient. However, coroutine can't access
+ # running loop in Python 3.5.2 and earlier versions, see python/asyncio#452.
+ if enqueue and loop is None:
+ try:
+ loop = _asyncio_loop.get_running_loop()
+ except RuntimeError as e:
+ raise ValueError(
+ "An event loop is required to add a coroutine sink with `enqueue=True`, "
+ "but none has been passed as argument and none is currently running."
+ ) from e
+
+ coro = sink if iscoroutinefunction(sink) else sink.__call__
+ wrapped_sink = AsyncSink(coro, loop, error_interceptor)
+ encoding = "utf8"
+ terminator = "\n"
+ exception_prefix = ""
+ elif callable(sink):
+ name = getattr(sink, "__name__", None) or repr(sink)
+
+ if colorize is None:
+ colorize = False
+
+ wrapped_sink = CallableSink(sink)
+ encoding = "utf8"
+ terminator = "\n"
+ exception_prefix = ""
+ else:
+ raise TypeError("Cannot log to objects of type '%s'" % type(sink).__name__)
+
+ if kwargs:
+ raise TypeError("add() got an unexpected keyword argument '%s'" % next(iter(kwargs)))
+
+ if filter is None:
+ filter_func = None
+ elif filter == "":
+ filter_func = _filters.filter_none
+ elif isinstance(filter, str):
+ parent = filter + "."
+ length = len(parent)
+ filter_func = functools.partial(_filters.filter_by_name, parent=parent, length=length)
+ elif isinstance(filter, dict):
+ level_per_module = {}
+ for module, level_ in filter.items():
+ if module is not None and not isinstance(module, str):
+ raise TypeError(
+ "The filter dict contains an invalid module, "
+ "it should be a string (or None), not: '%s'" % type(module).__name__
+ )
+ if level_ is False:
+ levelno_ = False
+ elif level_ is True:
+ levelno_ = 0
+ elif isinstance(level_, str):
+ try:
+ levelno_ = self.level(level_).no
+ except ValueError:
+ raise ValueError(
+ "The filter dict contains a module '%s' associated to a level name "
+ "which does not exist: '%s'" % (module, level_)
+ ) from None
+ elif isinstance(level_, int):
+ levelno_ = level_
+ else:
+ raise TypeError(
+ "The filter dict contains a module '%s' associated to an invalid level, "
+ "it should be an integer, a string or a boolean, not: '%s'"
+ % (module, type(level_).__name__)
+ )
+ if levelno_ < 0:
+ raise ValueError(
+ "The filter dict contains a module '%s' associated to an invalid level, "
+ "it should be a positive integer, not: '%d'" % (module, levelno_)
+ )
+ level_per_module[module] = levelno_
+ filter_func = functools.partial(
+ _filters.filter_by_level, level_per_module=level_per_module
+ )
+ elif callable(filter):
+ if filter == builtins.filter:
+ raise ValueError(
+ "The built-in 'filter()' function cannot be used as a 'filter' parameter, "
+ "this is most likely a mistake (please double-check the arguments passed "
+ "to 'logger.add()')."
+ )
+ filter_func = filter
+ else:
+ raise TypeError(
+ "Invalid filter, it should be a function, a string or a dict, not: '%s'"
+ % type(filter).__name__
+ )
+
+ if isinstance(level, str):
+ levelno = self.level(level).no
+ elif isinstance(level, int):
+ levelno = level
+ else:
+ raise TypeError(
+ "Invalid level, it should be an integer or a string, not: '%s'"
+ % type(level).__name__
+ )
+
+ if levelno < 0:
+ raise ValueError(
+ "Invalid level value, it should be a positive integer, not: %d" % levelno
+ )
+
+ if isinstance(format, str):
+ try:
+ formatter = Colorizer.prepare_format(format + terminator + "{exception}")
+ except ValueError as e:
+ raise ValueError(
+ "Invalid format, color markups could not be parsed correctly"
+ ) from e
+ is_formatter_dynamic = False
+ elif callable(format):
+ if format == builtins.format:
+ raise ValueError(
+ "The built-in 'format()' function cannot be used as a 'format' parameter, "
+ "this is most likely a mistake (please double-check the arguments passed "
+ "to 'logger.add()')."
+ )
+ formatter = format
+ is_formatter_dynamic = True
+ else:
+ raise TypeError(
+ "Invalid format, it should be a string or a function, not: '%s'"
+ % type(format).__name__
+ )
+
+ if not isinstance(encoding, str):
+ encoding = "ascii"
+
+ if isinstance(context, str):
+ context = get_context(context)
+ elif context is not None and not isinstance(context, BaseContext):
+ raise TypeError(
+ "Invalid context, it should be a string or a multiprocessing context, "
+ "not: '%s'" % type(context).__name__
+ )
+
+ with self._core.lock:
+ exception_formatter = ExceptionFormatter(
+ colorize=colorize,
+ encoding=encoding,
+ diagnose=diagnose,
+ backtrace=backtrace,
+ hidden_frames_filename=self.catch.__code__.co_filename,
+ prefix=exception_prefix,
+ )
+
+ handler = Handler(
+ name=name,
+ sink=wrapped_sink,
+ levelno=levelno,
+ formatter=formatter,
+ is_formatter_dynamic=is_formatter_dynamic,
+ filter_=filter_func,
+ colorize=colorize,
+ serialize=serialize,
+ enqueue=enqueue,
+ multiprocessing_context=context,
+ id_=handler_id,
+ error_interceptor=error_interceptor,
+ exception_formatter=exception_formatter,
+ levels_ansi_codes=self._core.levels_ansi_codes,
+ )
+
+ handlers = self._core.handlers.copy()
+ handlers[handler_id] = handler
+
+ self._core.min_level = min(self._core.min_level, levelno)
+ self._core.handlers = handlers
+
+ return handler_id
def remove(self, handler_id=None):
"""Remove a previously added handler and stop sending logs to its sink.
@@ -746,7 +1032,32 @@ class Logger:
>>> logger.remove(i)
>>> logger.info("No longer logging")
"""
- pass
+ if not (handler_id is None or isinstance(handler_id, int)):
+ raise TypeError(
+ "Invalid handler id, it should be an integer as returned "
+ "by the 'add()' method (or None), not: '%s'" % type(handler_id).__name__
+ )
+
+ with self._core.lock:
+ handlers = self._core.handlers.copy()
+
+ if handler_id is not None and handler_id not in handlers:
+ raise ValueError("There is no existing handler with id %d" % handler_id) from None
+
+ if handler_id is None:
+ handler_ids = list(handlers.keys())
+ else:
+ handler_ids = [handler_id]
+
+ for handler_id in handler_ids:
+ handler = handlers.pop(handler_id)
+
+ # This needs to be done first in case "stop()" raises an exception
+ levelnos = (h.levelno for h in handlers.values())
+ self._core.min_level = min(levelnos, default=float("inf"))
+ self._core.handlers = handlers
+
+ handler.stop()
def complete(self):
"""Wait for the end of enqueued messages and asynchronous tasks scheduled by handlers.
@@ -798,12 +1109,34 @@ class Logger:
>>> process.join()
Message sent from the child
"""
- pass
-
- def catch(self, exception=Exception, *, level='ERROR', reraise=False,
- onerror=None, exclude=None, default=None, message=
- "An error has been caught in function '{record[function]}', process '{record[process].name}' ({record[process].id}), thread '{record[thread].name}' ({record[thread].id}):"
- ):
+ tasks = []
+
+ with self._core.lock:
+ handlers = self._core.handlers.copy()
+ for handler in handlers.values():
+ handler.complete_queue()
+ tasks.extend(handler.tasks_to_complete())
+
+ class AwaitableCompleter:
+ def __await__(self):
+ for task in tasks:
+ yield from task.__await__()
+
+ return AwaitableCompleter()
+
+ def catch(
+ self,
+ exception=Exception,
+ *,
+ level="ERROR",
+ reraise=False,
+ onerror=None,
+ exclude=None,
+ default=None,
+ message="An error has been caught in function '{record[function]}', "
+ "process '{record[process].name}' ({record[process].id}), "
+ "thread '{record[thread].name}' ({record[thread].id}):"
+ ):
"""Return a decorator to automatically log possibly caught error in wrapped function.
This is useful to ensure unexpected exceptions are logged, the entire program can be
@@ -876,11 +1209,92 @@ class Logger:
... def main():
... 1 / 0
"""
- pass
+ if callable(exception) and (
+ not isclass(exception) or not issubclass(exception, BaseException)
+ ):
+ return self.catch()(exception)
+
+ logger = self
+
+ class Catcher:
+ def __init__(self, from_decorator):
+ self._from_decorator = from_decorator
+
+ def __enter__(self):
+ return None
+
+ def __exit__(self, type_, value, traceback_):
+ if type_ is None:
+ return
+
+ if not issubclass(type_, exception):
+ return False
+
+ if exclude is not None and issubclass(type_, exclude):
+ return False
+
+ from_decorator = self._from_decorator
+ _, depth, _, *options = logger._options
+
+ if from_decorator:
+ depth += 1
+
+ catch_options = [(type_, value, traceback_), depth, True] + options
+ logger._log(level, from_decorator, catch_options, message, (), {})
+
+ if onerror is not None:
+ onerror(value)
+
+ return not reraise
+
+ def __call__(self, function):
+ if isclass(function):
+ raise TypeError(
+ "Invalid object decorated with 'catch()', it must be a function, "
+ "not a class (tried to wrap '%s')" % function.__name__
+ )
- def opt(self, *, exception=None, record=False, lazy=False, colors=False,
- raw=False, capture=True, depth=0, ansi=False):
- """Parametrize a logging call to slightly change generated log message.
+ catcher = Catcher(True)
+
+ if iscoroutinefunction(function):
+
+ async def catch_wrapper(*args, **kwargs):
+ with catcher:
+ return await function(*args, **kwargs)
+ return default
+
+ elif isgeneratorfunction(function):
+
+ def catch_wrapper(*args, **kwargs):
+ with catcher:
+ return (yield from function(*args, **kwargs))
+ return default
+
+ else:
+
+ def catch_wrapper(*args, **kwargs):
+ with catcher:
+ return function(*args, **kwargs)
+ return default
+
+ functools.update_wrapper(catch_wrapper, function)
+ return catch_wrapper
+
+ return Catcher(False)
+
+ def opt(
+ self,
+ *,
+ exception=None,
+ record=False,
+ lazy=False,
+ colors=False,
+ raw=False,
+ capture=True,
+ depth=0,
+ ansi=False
+ ):
+ r"""Parametrize a logging call to slightly change generated log message.
Note that it's not possible to chain |opt| calls, the last one takes precedence over the
others as it will "reset" the options to their default values.
@@ -942,7 +1356,7 @@ class Logger:
>>> logger.opt(colors=True).warning("We got a <red>BIG</red> problem")
[18:11:30] WARNING in '<module>' - We got a BIG problem
- >>> logger.opt(raw=True).debug("No formatting\\n")
+ >>> logger.opt(raw=True).debug("No formatting\n")
No formatting
>>> logger.opt(capture=False).info("Displayed but not captured: {value}", value=123)
@@ -957,9 +1371,18 @@ class Logger:
>>> func()
[18:11:54] DEBUG in 'func' - Get parent context
"""
- pass
-
- def bind(__self, **kwargs):
+ if ansi:
+ colors = True
+ warnings.warn(
+ "The 'ansi' parameter is deprecated, please use 'colors' instead",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+
+ args = self._options[-2:]
+ return Logger(self._core, exception, depth, record, lazy, colors, raw, capture, *args)
+
+ def bind(__self, **kwargs): # noqa: N805
"""Bind attributes to the ``extra`` dict of each logged message record.
This is used to add custom context to each logging call.
@@ -992,10 +1415,11 @@ class Logger:
>>> instance_2.call("Second instance")
127.0.0.1 - Second instance
"""
- pass
+ *options, extra = __self._options
+ return Logger(__self._core, *options, {**extra, **kwargs})
@contextlib.contextmanager
- def contextualize(__self, **kwargs):
+ def contextualize(__self, **kwargs): # noqa: N805
"""Bind attributes to the context-local ``extra`` dict while inside the ``with`` block.
Contrary to |bind| there is no ``logger`` returned, the ``extra`` dict is modified in-place
@@ -1029,7 +1453,15 @@ class Logger:
>>> logger.info("Done.")
Done. | {}
"""
- pass
+ with __self._core.lock:
+ new_context = {**context.get(), **kwargs}
+ token = context.set(new_context)
+
+ try:
+ yield
+ finally:
+ with __self._core.lock:
+ context.reset(token)
def patch(self, patcher):
"""Attach a function to modify the record dict created by each logging call.
@@ -1077,7 +1509,8 @@ class Logger:
... level, message = record["level"], record["message"]
... logger.patch(lambda r: r.update(record)).log(level, message)
"""
- pass
+ *options, patchers, extra = self._options
+ return Logger(self._core, *options, patchers + [patcher], extra)
def level(self, name, no=None, color=None, icon=None):
"""Add, update or retrieve a logging level.
@@ -1132,7 +1565,55 @@ class Logger:
>>> logger.warning("Updated!")
30 /!\\ Updated!
"""
- pass
+ if not isinstance(name, str):
+ raise TypeError(
+ "Invalid level name, it should be a string, not: '%s'" % type(name).__name__
+ )
+
+ if no is color is icon is None:
+ try:
+ return self._core.levels[name]
+ except KeyError:
+ raise ValueError("Level '%s' does not exist" % name) from None
+
+ if name not in self._core.levels:
+ if no is None:
+ raise ValueError(
+ "Level '%s' does not exist, you have to create it by specifying a level no"
+ % name
+ )
+ else:
+ old_color, old_icon = "", " "
+ elif no is not None:
+ raise TypeError("Level '%s' already exists, you can't update its severity no" % name)
+ else:
+ _, no, old_color, old_icon = self.level(name)
+
+ if color is None:
+ color = old_color
+
+ if icon is None:
+ icon = old_icon
+
+ if not isinstance(no, int):
+ raise TypeError(
+ "Invalid level no, it should be an integer, not: '%s'" % type(no).__name__
+ )
+
+ if no < 0:
+ raise ValueError("Invalid level no, it should be a positive integer, not: %d" % no)
+
+ ansi = Colorizer.ansify(color)
+ level = Level(name, no, color, icon)
+
+ with self._core.lock:
+ self._core.levels[name] = level
+ self._core.levels_ansi_codes[name] = ansi
+ self._core.levels_lookup[name] = (name, name, no, icon)
+ for handler in self._core.handlers.values():
+ handler.update_format(name)
+
+ return level
def disable(self, name):
"""Disable logging of messages coming from ``name`` module and its children.
@@ -1156,7 +1637,7 @@ class Logger:
>>> logger.disable("my_library")
>>> logger.info("While publishing a library, don't forget to disable logging")
"""
- pass
+ self._change_activation(name, False)
def enable(self, name):
"""Enable logging of messages coming from ``name`` module and its children.
@@ -1180,10 +1661,9 @@ class Logger:
>>> logger.info("Re-enabled, messages are logged.")
[22:46:12] Re-enabled, messages are logged.
"""
- pass
+ self._change_activation(name, True)
- def configure(self, *, handlers=None, levels=None, extra=None, patcher=
- None, activation=None):
+ def configure(self, *, handlers=None, levels=None, extra=None, patcher=None, activation=None):
"""Configure the core logger.
It should be noted that ``extra`` values set using this function are available across all
@@ -1246,10 +1726,75 @@ class Logger:
>>> logger.bind(context="bar").info("Suppress global context")
>>> # => "bar - Suppress global context"
"""
- pass
+ if handlers is not None:
+ self.remove()
+ else:
+ handlers = []
+
+ if levels is not None:
+ for params in levels:
+ self.level(**params)
+
+ if patcher is not None:
+ with self._core.lock:
+ self._core.patcher = patcher
+
+ if extra is not None:
+ with self._core.lock:
+ self._core.extra.clear()
+ self._core.extra.update(extra)
+
+ if activation is not None:
+ for name, state in activation:
+ if state:
+ self.enable(name)
+ else:
+ self.disable(name)
+
+ return [self.add(**params) for params in handlers]
+
+ def _change_activation(self, name, status):
+ if not (name is None or isinstance(name, str)):
+ raise TypeError(
+ "Invalid name, it should be a string (or None), not: '%s'" % type(name).__name__
+ )
+
+ with self._core.lock:
+ enabled = self._core.enabled.copy()
+
+ if name is None:
+ for n in enabled:
+ if n is None:
+ enabled[n] = status
+ self._core.activation_none = status
+ self._core.enabled = enabled
+ return
+
+ if name != "":
+ name += "."
+
+ activation_list = [
+ (n, s) for n, s in self._core.activation_list if n[: len(name)] != name
+ ]
+
+ parent_status = next((s for n, s in activation_list if name[: len(n)] == n), None)
+ if parent_status != status and not (name == "" and status is True):
+ activation_list.append((name, status))
+
+ def modules_depth(x):
+ return x[0].count(".")
+
+ activation_list.sort(key=modules_depth, reverse=True)
+
+ for n in enabled:
+ if n is not None and (n + ".")[: len(name)] == name:
+ enabled[n] = status
+
+ self._core.activation_list = activation_list
+ self._core.enabled = enabled
@staticmethod
- def parse(file, pattern, *, cast={}, chunk=2 ** 16):
+ def parse(file, pattern, *, cast={}, chunk=2**16): # noqa: B006
"""Parse raw logs and extract each entry as a |dict|.
The logging format has to be specified as the regex ``pattern``, it will then be
@@ -1296,43 +1841,232 @@ class Logger:
... for log in logger.parse(file, reg, cast=cast):
... print(log["date"], log["something_else"])
"""
- pass
-
- def trace(__self, __message, *args, **kwargs):
- """Log ``message.format(*args, **kwargs)`` with severity ``'TRACE'``."""
- pass
+ if isinstance(file, (str, PathLike)):
+ should_close = True
+ fileobj = open(str(file))
+ elif hasattr(file, "read") and callable(file.read):
+ should_close = False
+ fileobj = file
+ else:
+ raise TypeError(
+ "Invalid file, it should be a string path or a file object, not: '%s'"
+ % type(file).__name__
+ )
+
+ if isinstance(cast, dict):
+
+ def cast_function(groups):
+ for key, converter in cast.items():
+ if key in groups:
+ groups[key] = converter(groups[key])
+
+ elif callable(cast):
+ cast_function = cast
+ else:
+ raise TypeError(
+ "Invalid cast, it should be a function or a dict, not: '%s'" % type(cast).__name__
+ )
+
+ try:
+ regex = re.compile(pattern)
+ except TypeError:
+ raise TypeError(
+ "Invalid pattern, it should be a string or a compiled regex, not: '%s'"
+ % type(pattern).__name__
+ ) from None
+
+ matches = Logger._find_iter(fileobj, regex, chunk)
+
+ for match in matches:
+ groups = match.groupdict()
+ cast_function(groups)
+ yield groups
+
+ if should_close:
+ fileobj.close()
- def debug(__self, __message, *args, **kwargs):
- """Log ``message.format(*args, **kwargs)`` with severity ``'DEBUG'``."""
- pass
-
- def info(__self, __message, *args, **kwargs):
- """Log ``message.format(*args, **kwargs)`` with severity ``'INFO'``."""
- pass
-
- def success(__self, __message, *args, **kwargs):
- """Log ``message.format(*args, **kwargs)`` with severity ``'SUCCESS'``."""
- pass
-
- def warning(__self, __message, *args, **kwargs):
- """Log ``message.format(*args, **kwargs)`` with severity ``'WARNING'``."""
- pass
-
- def error(__self, __message, *args, **kwargs):
- """Log ``message.format(*args, **kwargs)`` with severity ``'ERROR'``."""
- pass
-
- def critical(__self, __message, *args, **kwargs):
- """Log ``message.format(*args, **kwargs)`` with severity ``'CRITICAL'``."""
- pass
-
- def exception(__self, __message, *args, **kwargs):
- """Convenience method for logging an ``'ERROR'`` with exception information."""
- pass
-
- def log(__self, __level, __message, *args, **kwargs):
- """Log ``message.format(*args, **kwargs)`` with severity ``level``."""
- pass
+ @staticmethod
+ def _find_iter(fileobj, regex, chunk):
+ buffer = fileobj.read(0)
+
+ while 1:
+ text = fileobj.read(chunk)
+ buffer += text
+ matches = list(regex.finditer(buffer))
+
+ if not text:
+ yield from matches
+ break
+
+ if len(matches) > 1:
+ end = matches[-2].end()
+ buffer = buffer[end:]
+ yield from matches[:-1]
+
+ def _log(self, level, from_decorator, options, message, args, kwargs):
+ core = self._core
+
+ if not core.handlers:
+ return
+
+ try:
+ level_id, level_name, level_no, level_icon = core.levels_lookup[level]
+ except (KeyError, TypeError):
+ if isinstance(level, str):
+ raise ValueError("Level '%s' does not exist" % level) from None
+ if not isinstance(level, int):
+ raise TypeError(
+ "Invalid level, it should be an integer or a string, not: '%s'"
+ % type(level).__name__
+ ) from None
+ if level < 0:
+ raise ValueError(
+ "Invalid level value, it should be a positive integer, not: %d" % level
+ ) from None
+ cache = (None, "Level %d" % level, level, " ")
+ level_id, level_name, level_no, level_icon = cache
+ core.levels_lookup[level] = cache
+
+ if level_no < core.min_level:
+ return
+
+ (exception, depth, record, lazy, colors, raw, capture, patchers, extra) = options
+
+ frame = get_frame(depth + 2)
+
+ try:
+ name = frame.f_globals["__name__"]
+ except KeyError:
+ name = None
+
+ try:
+ if not core.enabled[name]:
+ return
+ except KeyError:
+ enabled = core.enabled
+ if name is None:
+ status = core.activation_none
+ enabled[name] = status
+ if not status:
+ return
+ else:
+ dotted_name = name + "."
+ for dotted_module_name, status in core.activation_list:
+ if dotted_name[: len(dotted_module_name)] == dotted_module_name:
+ if status:
+ break
+ enabled[name] = False
+ return
+ enabled[name] = True
+
+ current_datetime = aware_now()
+
+ code = frame.f_code
+ file_path = code.co_filename
+ file_name = basename(file_path)
+ thread = current_thread()
+ process = current_process()
+ elapsed = current_datetime - start_time
+
+ if exception:
+ if isinstance(exception, BaseException):
+ type_, value, traceback = (type(exception), exception, exception.__traceback__)
+ elif isinstance(exception, tuple):
+ type_, value, traceback = exception
+ else:
+ type_, value, traceback = sys.exc_info()
+ exception = RecordException(type_, value, traceback)
+ else:
+ exception = None
+
+ log_record = {
+ "elapsed": elapsed,
+ "exception": exception,
+ "extra": {**core.extra, **context.get(), **extra},
+ "file": RecordFile(file_name, file_path),
+ "function": code.co_name,
+ "level": RecordLevel(level_name, level_no, level_icon),
+ "line": frame.f_lineno,
+ "message": str(message),
+ "module": splitext(file_name)[0],
+ "name": name,
+ "process": RecordProcess(process.ident, process.name),
+ "thread": RecordThread(thread.ident, thread.name),
+ "time": current_datetime,
+ }
+
+ if lazy:
+ args = [arg() for arg in args]
+ kwargs = {key: value() for key, value in kwargs.items()}
+
+ if capture and kwargs:
+ log_record["extra"].update(kwargs)
+
+ if record:
+ if "record" in kwargs:
+ raise TypeError(
+ "The message can't be formatted: 'record' shall not be used as a keyword "
+ "argument while logger has been configured with '.opt(record=True)'"
+ )
+ kwargs.update(record=log_record)
+
+ if colors:
+ if args or kwargs:
+ colored_message = Colorizer.prepare_message(message, args, kwargs)
+ else:
+ colored_message = Colorizer.prepare_simple_message(str(message))
+ log_record["message"] = colored_message.stripped
+ elif args or kwargs:
+ colored_message = None
+ log_record["message"] = message.format(*args, **kwargs)
+ else:
+ colored_message = None
+
+ if core.patcher:
+ core.patcher(log_record)
+
+ for patcher in patchers:
+ patcher(log_record)
+
+ for handler in core.handlers.values():
+ handler.emit(log_record, level_id, from_decorator, raw, colored_message)
+
+ def trace(__self, __message, *args, **kwargs): # noqa: N805
+ r"""Log ``message.format(*args, **kwargs)`` with severity ``'TRACE'``."""
+ __self._log("TRACE", False, __self._options, __message, args, kwargs)
+
+ def debug(__self, __message, *args, **kwargs): # noqa: N805
+ r"""Log ``message.format(*args, **kwargs)`` with severity ``'DEBUG'``."""
+ __self._log("DEBUG", False, __self._options, __message, args, kwargs)
+
+ def info(__self, __message, *args, **kwargs): # noqa: N805
+ r"""Log ``message.format(*args, **kwargs)`` with severity ``'INFO'``."""
+ __self._log("INFO", False, __self._options, __message, args, kwargs)
+
+ def success(__self, __message, *args, **kwargs): # noqa: N805
+ r"""Log ``message.format(*args, **kwargs)`` with severity ``'SUCCESS'``."""
+ __self._log("SUCCESS", False, __self._options, __message, args, kwargs)
+
+ def warning(__self, __message, *args, **kwargs): # noqa: N805
+ r"""Log ``message.format(*args, **kwargs)`` with severity ``'WARNING'``."""
+ __self._log("WARNING", False, __self._options, __message, args, kwargs)
+
+ def error(__self, __message, *args, **kwargs): # noqa: N805
+ r"""Log ``message.format(*args, **kwargs)`` with severity ``'ERROR'``."""
+ __self._log("ERROR", False, __self._options, __message, args, kwargs)
+
+ def critical(__self, __message, *args, **kwargs): # noqa: N805
+ r"""Log ``message.format(*args, **kwargs)`` with severity ``'CRITICAL'``."""
+ __self._log("CRITICAL", False, __self._options, __message, args, kwargs)
+
+ def exception(__self, __message, *args, **kwargs): # noqa: N805
+ r"""Convenience method for logging an ``'ERROR'`` with exception information."""
+ options = (True,) + __self._options[1:]
+ __self._log("ERROR", False, options, __message, args, kwargs)
+
+ def log(__self, __level, __message, *args, **kwargs): # noqa: N805
+ r"""Log ``message.format(*args, **kwargs)`` with severity ``level``."""
+ __self._log(__level, False, __self._options, __message, args, kwargs)
def start(self, *args, **kwargs):
"""Deprecated function to |add| a new handler.
@@ -1343,7 +2077,12 @@ class Logger:
``start()`` will be removed in Loguru 1.0.0, it is replaced by ``add()`` which is a less
confusing name.
"""
- pass
+ warnings.warn(
+ "The 'start()' method is deprecated, please use 'add()' instead",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ return self.add(*args, **kwargs)
def stop(self, *args, **kwargs):
"""Deprecated function to |remove| an existing handler.
@@ -1354,4 +2093,9 @@ class Logger:
``stop()`` will be removed in Loguru 1.0.0, it is replaced by ``remove()`` which is a less
confusing name.
"""
- pass
+ warnings.warn(
+ "The 'stop()' method is deprecated, please use 'remove()' instead",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ return self.remove(*args, **kwargs)
diff --git a/loguru/_recattrs.py b/loguru/_recattrs.py
index d344c9f..b09426e 100644
--- a/loguru/_recattrs.py
+++ b/loguru/_recattrs.py
@@ -3,7 +3,7 @@ from collections import namedtuple
class RecordLevel:
- __slots__ = 'name', 'no', 'icon'
+ __slots__ = ("name", "no", "icon")
def __init__(self, name, no, icon):
self.name = name
@@ -11,66 +11,80 @@ class RecordLevel:
self.icon = icon
def __repr__(self):
- return '(name=%r, no=%r, icon=%r)' % (self.name, self.no, self.icon)
+ return "(name=%r, no=%r, icon=%r)" % (self.name, self.no, self.icon)
def __format__(self, spec):
return self.name.__format__(spec)
class RecordFile:
- __slots__ = 'name', 'path'
+ __slots__ = ("name", "path")
def __init__(self, name, path):
self.name = name
self.path = path
def __repr__(self):
- return '(name=%r, path=%r)' % (self.name, self.path)
+ return "(name=%r, path=%r)" % (self.name, self.path)
def __format__(self, spec):
return self.name.__format__(spec)
class RecordThread:
- __slots__ = 'id', 'name'
+ __slots__ = ("id", "name")
def __init__(self, id_, name):
self.id = id_
self.name = name
def __repr__(self):
- return '(id=%r, name=%r)' % (self.id, self.name)
+ return "(id=%r, name=%r)" % (self.id, self.name)
def __format__(self, spec):
return self.id.__format__(spec)
class RecordProcess:
- __slots__ = 'id', 'name'
+ __slots__ = ("id", "name")
def __init__(self, id_, name):
self.id = id_
self.name = name
def __repr__(self):
- return '(id=%r, name=%r)' % (self.id, self.name)
+ return "(id=%r, name=%r)" % (self.id, self.name)
def __format__(self, spec):
return self.id.__format__(spec)
-class RecordException(namedtuple('RecordException', ('type', 'value',
- 'traceback'))):
-
+class RecordException(namedtuple("RecordException", ("type", "value", "traceback"))):
def __repr__(self):
- return '(type=%r, value=%r, traceback=%r)' % (self.type, self.value,
- self.traceback)
+ return "(type=%r, value=%r, traceback=%r)" % (self.type, self.value, self.traceback)
def __reduce__(self):
+ # The traceback is not picklable, therefore it needs to be removed. Additionally, there's a
+ # possibility that the exception value is not picklable either. In such cases, we also need
+ # to remove it. This is done for user convenience, aiming to prevent error logging caused by
+ # custom exceptions from third-party libraries. If the serialization succeeds, we can reuse
+ # the pickled value later for optimization (so that it's not pickled twice). It's important
+ # to note that custom exceptions might not necessarily raise a PickleError, hence the
+ # generic Exception catch.
try:
pickled_value = pickle.dumps(self.value)
except Exception:
- return RecordException, (self.type, None, None)
+ return (RecordException, (self.type, None, None))
+ else:
+ return (RecordException._from_pickled_value, (self.type, pickled_value, None))
+
+ @classmethod
+ def _from_pickled_value(cls, type_, pickled_value, traceback_):
+ try:
+ # It's safe to use "pickle.loads()" in this case because the pickled value is generated
+ # by the same code and is not coming from an untrusted source.
+ value = pickle.loads(pickled_value)
+ except Exception:
+ return cls(type_, None, traceback_)
else:
- return RecordException._from_pickled_value, (self.type,
- pickled_value, None)
+ return cls(type_, value, traceback_)
diff --git a/loguru/_simple_sinks.py b/loguru/_simple_sinks.py
index a21a218..068f1e1 100644
--- a/loguru/_simple_sinks.py
+++ b/loguru/_simple_sinks.py
@@ -1,36 +1,112 @@
import asyncio
import logging
import weakref
+
from ._asyncio_loop import get_running_loop, get_task_loop
class StreamSink:
-
def __init__(self, stream):
self._stream = stream
- self._flushable = callable(getattr(stream, 'flush', None))
- self._stoppable = callable(getattr(stream, 'stop', None))
- self._completable = asyncio.iscoroutinefunction(getattr(stream,
- 'complete', None))
+ self._flushable = callable(getattr(stream, "flush", None))
+ self._stoppable = callable(getattr(stream, "stop", None))
+ self._completable = asyncio.iscoroutinefunction(getattr(stream, "complete", None))
+ def write(self, message):
+ self._stream.write(message)
+ if self._flushable:
+ self._stream.flush()
-class StandardSink:
+ def stop(self):
+ if self._stoppable:
+ self._stream.stop()
+ def tasks_to_complete(self):
+ if not self._completable:
+ return []
+ return [self._stream.complete()]
+
+
+class StandardSink:
def __init__(self, handler):
self._handler = handler
+ def write(self, message):
+ record = message.record
+ message = str(message)
+ exc = record["exception"]
+ record = logging.getLogger().makeRecord(
+ record["name"],
+ record["level"].no,
+ record["file"].path,
+ record["line"],
+ message,
+ (),
+ (exc.type, exc.value, exc.traceback) if exc else None,
+ record["function"],
+ {"extra": record["extra"]},
+ )
+ if exc:
+ record.exc_text = "\n"
+ self._handler.handle(record)
+
+ def stop(self):
+ self._handler.close()
+
+ def tasks_to_complete(self):
+ return []
-class AsyncSink:
+class AsyncSink:
def __init__(self, function, loop, error_interceptor):
self._function = function
self._loop = loop
self._error_interceptor = error_interceptor
self._tasks = weakref.WeakSet()
+ def write(self, message):
+ try:
+ loop = self._loop or get_running_loop()
+ except RuntimeError:
+ return
+
+ coroutine = self._function(message)
+ task = loop.create_task(coroutine)
+
+ def check_exception(future):
+ if future.cancelled() or future.exception() is None:
+ return
+ if not self._error_interceptor.should_catch():
+ raise future.exception()
+ self._error_interceptor.print(message.record, exception=future.exception())
+
+ task.add_done_callback(check_exception)
+ self._tasks.add(task)
+
+ def stop(self):
+ for task in self._tasks:
+ task.cancel()
+
+ def tasks_to_complete(self):
+ # To avoid errors due to "self._tasks" being mutated while iterated, the
+ # "tasks_to_complete()" method must be protected by the same lock as "write()" (which
+ # happens to be the handler lock). However, the tasks must not be awaited while the lock is
+ # acquired as this could lead to a deadlock. Therefore, we first need to collect the tasks
+ # to complete, then return them so that they can be awaited outside of the lock.
+ return [self._complete_task(task) for task in self._tasks]
+
+ async def _complete_task(self, task):
+ loop = get_running_loop()
+ if get_task_loop(task) is not loop:
+ return
+ try:
+ await task
+ except Exception:
+ pass # Handled in "check_exception()"
+
def __getstate__(self):
state = self.__dict__.copy()
- state['_tasks'] = None
+ state["_tasks"] = None
return state
def __setstate__(self, state):
@@ -39,6 +115,14 @@ class AsyncSink:
class CallableSink:
-
def __init__(self, function):
self._function = function
+
+ def write(self, message):
+ self._function(message)
+
+ def stop(self):
+ pass
+
+ def tasks_to_complete(self):
+ return []
diff --git a/loguru/_string_parsers.py b/loguru/_string_parsers.py
index 520211d..da00904 100644
--- a/loguru/_string_parsers.py
+++ b/loguru/_string_parsers.py
@@ -3,4 +3,185 @@ import re
class Frequencies:
- pass
+ @staticmethod
+ def hourly(t):
+ dt = t + datetime.timedelta(hours=1)
+ return dt.replace(minute=0, second=0, microsecond=0)
+
+ @staticmethod
+ def daily(t):
+ dt = t + datetime.timedelta(days=1)
+ return dt.replace(hour=0, minute=0, second=0, microsecond=0)
+
+ @staticmethod
+ def weekly(t):
+ dt = t + datetime.timedelta(days=7 - t.weekday())
+ return dt.replace(hour=0, minute=0, second=0, microsecond=0)
+
+ @staticmethod
+ def monthly(t):
+ if t.month == 12:
+ y, m = t.year + 1, 1
+ else:
+ y, m = t.year, t.month + 1
+ return t.replace(year=y, month=m, day=1, hour=0, minute=0, second=0, microsecond=0)
+
+ @staticmethod
+ def yearly(t):
+ y = t.year + 1
+ return t.replace(year=y, month=1, day=1, hour=0, minute=0, second=0, microsecond=0)
+
+
+def parse_size(size):
+ size = size.strip()
+ reg = re.compile(r"([e\+\-\.\d]+)\s*([kmgtpezy])?(i)?(b)", flags=re.I)
+
+ match = reg.fullmatch(size)
+
+ if not match:
+ return None
+
+ s, u, i, b = match.groups()
+
+ try:
+ s = float(s)
+ except ValueError as e:
+ raise ValueError("Invalid float value while parsing size: '%s'" % s) from e
+
+ u = "kmgtpezy".index(u.lower()) + 1 if u else 0
+ i = 1024 if i else 1000
+ b = {"b": 8, "B": 1}[b] if b else 1
+ size = s * i**u / b
+
+ return size
+
+
+def parse_duration(duration):
+ duration = duration.strip()
+ reg = r"(?:([e\+\-\.\d]+)\s*([a-z]+)[\s\,]*)"
+
+ units = [
+ ("y|years?", 31536000),
+ ("months?", 2628000),
+ ("w|weeks?", 604800),
+ ("d|days?", 86400),
+ ("h|hours?", 3600),
+ ("min(?:ute)?s?", 60),
+ ("s|sec(?:ond)?s?", 1),
+ ("ms|milliseconds?", 0.001),
+ ("us|microseconds?", 0.000001),
+ ]
+
+ if not re.fullmatch(reg + "+", duration, flags=re.I):
+ return None
+
+ seconds = 0
+
+ for value, unit in re.findall(reg, duration, flags=re.I):
+ try:
+ value = float(value)
+ except ValueError as e:
+ raise ValueError("Invalid float value while parsing duration: '%s'" % value) from e
+
+ try:
+ unit = next(u for r, u in units if re.fullmatch(r, unit, flags=re.I))
+ except StopIteration:
+ raise ValueError("Invalid unit value while parsing duration: '%s'" % unit) from None
+
+ seconds += value * unit
+
+ return datetime.timedelta(seconds=seconds)
+
+
+def parse_frequency(frequency):
+ frequencies = {
+ "hourly": Frequencies.hourly,
+ "daily": Frequencies.daily,
+ "weekly": Frequencies.weekly,
+ "monthly": Frequencies.monthly,
+ "yearly": Frequencies.yearly,
+ }
+ frequency = frequency.strip().lower()
+ return frequencies.get(frequency, None)
+
+
+def parse_day(day):
+ days = {
+ "monday": 0,
+ "tuesday": 1,
+ "wednesday": 2,
+ "thursday": 3,
+ "friday": 4,
+ "saturday": 5,
+ "sunday": 6,
+ }
+ day = day.strip().lower()
+ if day in days:
+ return days[day]
+ elif day.startswith("w") and day[1:].isdigit():
+ day = int(day[1:])
+ if not 0 <= day < 7:
+ raise ValueError("Invalid weekday value while parsing day (expected [0-6]): '%d'" % day)
+ else:
+ day = None
+
+ return day
+
+
+def parse_time(time):
+ time = time.strip()
+ reg = re.compile(r"^[\d\.\:]+\s*(?:[ap]m)?$", flags=re.I)
+
+ if not reg.match(time):
+ return None
+
+ formats = [
+ "%H",
+ "%H:%M",
+ "%H:%M:%S",
+ "%H:%M:%S.%f",
+ "%I %p",
+ "%I:%M %S",
+ "%I:%M:%S %p",
+ "%I:%M:%S.%f %p",
+ ]
+
+ for format_ in formats:
+ try:
+ dt = datetime.datetime.strptime(time, format_)
+ except ValueError:
+ pass
+ else:
+ return dt.time()
+
+ raise ValueError("Unrecognized format while parsing time: '%s'" % time)
+
+
+def parse_daytime(daytime):
+ daytime = daytime.strip()
+ reg = re.compile(r"^(.*?)\s+at\s+(.*)$", flags=re.I)
+
+ match = reg.match(daytime)
+ if match:
+ day, time = match.groups()
+ else:
+ day = time = daytime
+
+ try:
+ day = parse_day(day)
+ if match and day is None:
+ raise ValueError
+ except ValueError as e:
+ raise ValueError("Invalid day while parsing daytime: '%s'" % day) from e
+
+ try:
+ time = parse_time(time)
+ if match and time is None:
+ raise ValueError
+ except ValueError as e:
+ raise ValueError("Invalid time while parsing daytime: '%s'" % time) from e
+
+ if day is None and time is None:
+ return None
+
+ return day, time