Skip to content

back to Reference (Gold) summary

Reference (Gold): loguru

Pytest Summary for test tests

status count
passed 1459
skipped 38
failed 41
total 1538
collected 1538

Failed pytests:

test_logger.yml::basic_logger_usage[method=trace]

test_logger.yml::basic_logger_usage[method=trace]
/testbed/tests/typesafety/test_logger.yml:14: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:3: note: Revealed type is "Any"          (diff)
E     main:4: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:3: note: Revealed type is "loguru.Logger" (diff)
E     main:4: note: Revealed type is "None"         (diff)
E   Alignment of first line difference:
E     E: main:3: note: Revealed type is "loguru.Logger"...
E     A: main:1: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::basic_logger_usage[method=debug]

test_logger.yml::basic_logger_usage[method=debug]
/testbed/tests/typesafety/test_logger.yml:14: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:3: note: Revealed type is "Any"          (diff)
E     main:4: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:3: note: Revealed type is "loguru.Logger" (diff)
E     main:4: note: Revealed type is "None"         (diff)
E   Alignment of first line difference:
E     E: main:3: note: Revealed type is "loguru.Logger"...
E     A: main:1: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::basic_logger_usage[method=info]

test_logger.yml::basic_logger_usage[method=info]
/testbed/tests/typesafety/test_logger.yml:14: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:3: note: Revealed type is "Any"          (diff)
E     main:4: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:3: note: Revealed type is "loguru.Logger" (diff)
E     main:4: note: Revealed type is "None"         (diff)
E   Alignment of first line difference:
E     E: main:3: note: Revealed type is "loguru.Logger"...
E     A: main:1: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::basic_logger_usage[method=success]

test_logger.yml::basic_logger_usage[method=success]
/testbed/tests/typesafety/test_logger.yml:14: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:3: note: Revealed type is "Any"          (diff)
E     main:4: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:3: note: Revealed type is "loguru.Logger" (diff)
E     main:4: note: Revealed type is "None"         (diff)
E   Alignment of first line difference:
E     E: main:3: note: Revealed type is "loguru.Logger"...
E     A: main:1: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::basic_logger_usage[method=warning]

test_logger.yml::basic_logger_usage[method=warning]
/testbed/tests/typesafety/test_logger.yml:14: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:3: note: Revealed type is "Any"          (diff)
E     main:4: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:3: note: Revealed type is "loguru.Logger" (diff)
E     main:4: note: Revealed type is "None"         (diff)
E   Alignment of first line difference:
E     E: main:3: note: Revealed type is "loguru.Logger"...
E     A: main:1: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::basic_logger_usage[method=error]

test_logger.yml::basic_logger_usage[method=error]
/testbed/tests/typesafety/test_logger.yml:14: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:3: note: Revealed type is "Any"          (diff)
E     main:4: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:3: note: Revealed type is "loguru.Logger" (diff)
E     main:4: note: Revealed type is "None"         (diff)
E   Alignment of first line difference:
E     E: main:3: note: Revealed type is "loguru.Logger"...
E     A: main:1: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::basic_logger_usage[method=exception]

test_logger.yml::basic_logger_usage[method=exception]
/testbed/tests/typesafety/test_logger.yml:14: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:3: note: Revealed type is "Any"          (diff)
E     main:4: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:3: note: Revealed type is "loguru.Logger" (diff)
E     main:4: note: Revealed type is "None"         (diff)
E   Alignment of first line difference:
E     E: main:3: note: Revealed type is "loguru.Logger"...
E     A: main:1: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::basic_logger_usage[method=critical]

test_logger.yml::basic_logger_usage[method=critical]
/testbed/tests/typesafety/test_logger.yml:14: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:3: note: Revealed type is "Any"          (diff)
E     main:4: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:3: note: Revealed type is "loguru.Logger" (diff)
E     main:4: note: Revealed type is "None"         (diff)
E   Alignment of first line difference:
E     E: main:3: note: Revealed type is "loguru.Logger"...
E     A: main:1: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::using_log_function[level='INFO']

test_logger.yml::using_log_function[level='INFO']
/testbed/tests/typesafety/test_logger.yml:24: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Output is not expected: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E   Expected:
E     (empty)

test_logger.yml::using_log_function[level=30]

test_logger.yml::using_log_function[level=30]
/testbed/tests/typesafety/test_logger.yml:24: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Output is not expected: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E   Expected:
E     (empty)

test_logger.yml::using_logging_arguments

test_logger.yml::using_logging_arguments
/testbed/tests/typesafety/test_logger.yml:29: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Output is not expected: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E   Expected:
E     (empty)

test_logger.yml::logging_non_string[message=123]

test_logger.yml::logging_non_string[message=123]
/testbed/tests/typesafety/test_logger.yml:38: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Output is not expected: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E   Expected:
E     (empty)

test_logger.yml::logging_non_string[message=dict(foo=456)]

test_logger.yml::logging_non_string[message=dict(foo=456)]
/testbed/tests/typesafety/test_logger.yml:38: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Output is not expected: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E   Expected:
E     (empty)

test_logger.yml::logging_non_string[message=object()]

test_logger.yml::logging_non_string[message=object()]
/testbed/tests/typesafety/test_logger.yml:38: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Output is not expected: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E   Expected:
E     (empty)

test_logger.yml::add_sink[sink=sys.stderr]

test_logger.yml::add_sink[sink=sys.stderr]
/testbed/tests/typesafety/test_logger.yml:55: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:2: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:2: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:6: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:6: note: Revealed type is "builtins.int" (diff)
E   Alignment of first line difference:
E     E: main:6: note: Revealed type is "builtins.int"...
E     A: main:2: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::add_sink[sink='test.txt']

test_logger.yml::add_sink[sink='test.txt']
/testbed/tests/typesafety/test_logger.yml:55: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:2: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:2: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:6: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:6: note: Revealed type is "builtins.int" (diff)
E   Alignment of first line difference:
E     E: main:6: note: Revealed type is "builtins.int"...
E     A: main:2: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::add_sink[sink=Path('file.log')]

test_logger.yml::add_sink[sink=Path('file.log')]
/testbed/tests/typesafety/test_logger.yml:55: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:2: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:2: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:6: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:6: note: Revealed type is "builtins.int" (diff)
E   Alignment of first line difference:
E     E: main:6: note: Revealed type is "builtins.int"...
E     A: main:2: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::add_sink[sink=lambda m: None]

test_logger.yml::add_sink[sink=lambda m: None]
/testbed/tests/typesafety/test_logger.yml:55: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:2: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:2: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:6: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:6: note: Revealed type is "builtins.int" (diff)
E   Alignment of first line difference:
E     E: main:6: note: Revealed type is "builtins.int"...
E     A: main:2: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::add_sink[sink=StreamHandler()]

test_logger.yml::add_sink[sink=StreamHandler()]
/testbed/tests/typesafety/test_logger.yml:55: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:2: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:2: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:6: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:6: note: Revealed type is "builtins.int" (diff)
E   Alignment of first line difference:
E     E: main:6: note: Revealed type is "builtins.int"...
E     A: main:2: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::basic_sink_options[format='{message}',filter='module',context='fork']

test_logger.yml::basic_sink_options[format='{message}',filter='module',context='fork']
/testbed/tests/typesafety/test_logger.yml:67: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Output is not expected: 
E   Actual:
E     main:3: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:3: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E   Expected:
E     (empty)

test_logger.yml::basic_sink_options[format=lambda r: '{message}\n',filter=lambda r: True,context=get_context('fork')]

test_logger.yml::basic_sink_options[format=lambda r: '{message}\n',filter=lambda r: True,context=get_context('fork')]
/testbed/tests/typesafety/test_logger.yml:67: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Output is not expected: 
E   Actual:
E     main:3: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:3: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E   Expected:
E     (empty)

test_logger.yml::file_sink_options

test_logger.yml::file_sink_options
/testbed/tests/typesafety/test_logger.yml:85: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Output is not expected: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E   Expected:
E     (empty)

test_logger.yml::async_sink_options

test_logger.yml::async_sink_options
/testbed/tests/typesafety/test_logger.yml:101: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Output is not expected: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E   Expected:
E     (empty)

test_logger.yml::remove_sink

test_logger.yml::remove_sink
/testbed/tests/typesafety/test_logger.yml:113: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Output is not expected: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E   Expected:
E     (empty)

test_logger.yml::await_completion

test_logger.yml::await_completion
/testbed/tests/typesafety/test_logger.yml:123: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:5: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:5: note: Revealed type is "typing.Awaitable[None]" (diff)
E   Alignment of first line difference:
E     E: main:5: note: Revealed type is "typing.Awaitable[None]"...
E     A: main:1: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::catch_as_decorator_with_parentheses

test_logger.yml::catch_as_decorator_with_parentheses
/testbed/tests/typesafety/test_logger.yml:134: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:6: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:6: note: Revealed type is "def (a: builtins.int, b: builtins.int) -> builtins.int" (diff)
E   Alignment of first line difference:
E     E: main:6: note: Revealed type is "def (a: builtins.int, b: builtins.int) -...
E     A: main:1: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::catch_as_decorator_without_parentheses

test_logger.yml::catch_as_decorator_without_parentheses
/testbed/tests/typesafety/test_logger.yml:145: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:6: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:6: note: Revealed type is "def (a: builtins.int, b: builtins.int) -> builtins.int" (diff)
E   Alignment of first line difference:
E     E: main:6: note: Revealed type is "def (a: builtins.int, b: builtins.int) -...
E     A: main:1: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::catch_as_context_manager

test_logger.yml::catch_as_context_manager
/testbed/tests/typesafety/test_logger.yml:155: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:5: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:5: note: Revealed type is "loguru.Catcher" (diff)
E   Alignment of first line difference:
E     E: main:5: note: Revealed type is "loguru.Catcher"...
E     A: main:1: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::opt

test_logger.yml::opt
/testbed/tests/typesafety/test_logger.yml:164: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:4: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:4: note: Revealed type is "loguru.Logger" (diff)
E   Alignment of first line difference:
E     E: main:4: note: Revealed type is "loguru.Logger"...
E     A: main:1: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::bind

test_logger.yml::bind
/testbed/tests/typesafety/test_logger.yml:173: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:4: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:4: note: Revealed type is "loguru.Logger" (diff)
E   Alignment of first line difference:
E     E: main:4: note: Revealed type is "loguru.Logger"...
E     A: main:1: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::patch

test_logger.yml::patch
/testbed/tests/typesafety/test_logger.yml:182: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:4: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:4: note: Revealed type is "loguru.Logger" (diff)
E   Alignment of first line difference:
E     E: main:4: note: Revealed type is "loguru.Logger"...
E     A: main:1: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::contextualize

test_logger.yml::contextualize
/testbed/tests/typesafety/test_logger.yml:192: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:5: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:5: note: Revealed type is "loguru.Contextualizer" (diff)
E   Alignment of first line difference:
E     E: main:5: note: Revealed type is "loguru.Contextualizer"...
E     A: main:1: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::level_get

test_logger.yml::level_get
/testbed/tests/typesafety/test_logger.yml:202: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:4: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:4: note: Revealed type is "tuple[builtins.str, builtins.int, builtins.str, builtins.str, fallback=loguru.Level]" (diff)
E   Alignment of first line difference:
E     E: main:4: note: Revealed type is "tuple[builtins.str, builtins.int, builti...
E     A: main:1: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::level_set

test_logger.yml::level_set
/testbed/tests/typesafety/test_logger.yml:222: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:4: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:4: note: Revealed type is "tuple[builtins.str, builtins.int, builtins.str, builtins.str, fallback=loguru.Level]" (diff)
E   Alignment of first line difference:
E     E: main:4: note: Revealed type is "tuple[builtins.str, builtins.int, builti...
E     A: main:1: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::level_update

test_logger.yml::level_update
/testbed/tests/typesafety/test_logger.yml:242: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:4: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:4: note: Revealed type is "tuple[builtins.str, builtins.int, builtins.str, builtins.str, fallback=loguru.Level]" (diff)
E   Alignment of first line difference:
E     E: main:4: note: Revealed type is "tuple[builtins.str, builtins.int, builti...
E     A: main:1: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::enable_and_disable_logger

test_logger.yml::enable_and_disable_logger
/testbed/tests/typesafety/test_logger.yml:257: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Output is not expected: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E   Expected:
E     (empty)

test_logger.yml::configure

test_logger.yml::configure
/testbed/tests/typesafety/test_logger.yml:272: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:9: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:9: note: Revealed type is "builtins.list[builtins.int]" (diff)
E   Alignment of first line difference:
E     E: main:9: note: Revealed type is "builtins.list[builtins.int]"...
E     A: main:1: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::parse

test_logger.yml::parse
/testbed/tests/typesafety/test_logger.yml:282: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru"  [import] (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E     main:5: note: Revealed type is "Any"          (diff)
E     main:6: note: Revealed type is "Any"          (diff)
E   Expected:
E     main:5: note: Revealed type is "typing.Generator[builtins.dict[builtins.str, Any], None, None]" (diff)
E     main:6: note: Revealed type is "builtins.dict[builtins.str, Any]" (diff)
E   Alignment of first line difference:
E     E: main:5: note: Revealed type is "typing.Generator[builtins.dict[builtins....
E     A: main:1: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::invalid_add_argument

test_logger.yml::invalid_add_argument
/testbed/tests/typesafety/test_logger.yml:292: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru" (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E   Expected:
E     main:2: error: No overload variant of "add" of "Logger" matches argument types "Callable[[Any], None]", "int" (diff)
E     main:2: note: Possible overload variants:     (diff)
E     main:2: note:     def add(self, sink: TextIO | Writable | Callable[[Message], None] | Handler, *, level: str | int = ..., format: str | Callable[[Record], str] = ..., filter: str | Callable[[Record], bool] | dict[str | None, str | int | bool] | None = ..., colorize: bool | None = ..., serialize: bool = ..., backtrace: bool = ..., diagnose: bool = ..., enqueue: bool = ..., context: str | BaseContext | None = ..., catch: bool = ...) -> int (diff)
E     main:2: note:     def add(self, sink: Callable[[Message], Awaitable[None]], *, level: str | int = ..., format: str | Callable[[Record], str] = ..., filter: str | Callable[[Record], bool] | dict[str | None, str | int | bool] | None = ..., colorize: bool | None = ..., serialize: bool = ..., backtrace: bool = ..., diagnose: bool = ..., enqueue: bool = ..., context: str | BaseContext | None = ..., catch: bool = ..., loop: AbstractEventLoop | None = ...) -> int (diff)
E     main:2: note:     def add(self, sink: str | PathLike[str], *, level: str | int = ..., format: str | Callable[[Record], str] = ..., filter: str | Callable[[Record], bool] | dict[str | None, str | int | bool] | None = ..., colorize: bool | None = ..., serialize: bool = ..., backtrace: bool = ..., diagnose: bool = ..., enqueue: bool = ..., context: str | BaseContext | None = ..., catch: bool = ..., rotation: str | int | time | timedelta | Callable[[Message, TextIO], bool] | None = ..., retention: str | int | timedelta | Callable[[list[str]], None] | None = ..., compression: str | Callable[[str], None] | None = ..., delay: bool = ..., watch: bool = ..., mode: str = ..., buffering: int = ..., encoding: str = ..., **kwargs: Any) -> int (diff)
E   Alignment of first line difference:
E     E: main:2: error: No overload variant of "add" of "Logger" matches argument...
E     A: main:1: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::invalid_logged_object_formatting

test_logger.yml::invalid_logged_object_formatting
/testbed/tests/typesafety/test_logger.yml:334: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru" (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E   Expected:
E     main:2: error: No overload variant of "info" of "Logger" matches argument types "int", "int" (diff)
E     main:2: note: Possible overload variants:     (diff)
E     main:2: note:     def info(__self, str, /, *args: Any, **kwargs: Any) -> None (diff)
E     main:2: note:     def info(__self, Any, /) -> None (diff)
E   Alignment of first line difference:
E     E: main:2: error: No overload variant of "info" of "Logger" matches argumen...
E     A: main:1: error: Cannot find implementation or library stub for module nam...
E             ^

test_logger.yml::invalid_configuration

test_logger.yml::invalid_configuration
/testbed/tests/typesafety/test_logger.yml:362: 
E   pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output: 
E   Actual:
E     main:1: error: Cannot find implementation or library stub for module named "loguru" (diff)
E     main:1: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports (diff)
E   Expected:
E     main:4: error: Extra key "baz" for TypedDict "LevelConfig" (diff)
E     main:5: error: Argument "patcher" to "configure" of "Logger" has incompatible type "int"; expected "Callable[[Record], None] | None" (diff)
E     main:6: error: List item 0 has incompatible type "dict[str, str]"; expected "tuple[str | None, bool]" (diff)
E     main:7: error: Argument "extra" to "configure" of "Logger" has incompatible type "list[]"; expected "dict[Any, Any] | None" (diff)
E   Alignment of first line difference:
E     E: main:4: error: Extra key "baz" for TypedDict "LevelConfig"...
E     A: main:1: error: Cannot find implementation or library stub for module nam...
E             ^

Patch diff

diff --git a/loguru/_asyncio_loop.py b/loguru/_asyncio_loop.py
index a1d4421..e981955 100644
--- a/loguru/_asyncio_loop.py
+++ b/loguru/_asyncio_loop.py
@@ -1,3 +1,27 @@
 import asyncio
 import sys
+
+
+def load_loop_functions():
+    if sys.version_info >= (3, 7):
+
+        def get_task_loop(task):
+            return task.get_loop()
+
+        get_running_loop = asyncio.get_running_loop
+
+    else:
+
+        def get_task_loop(task):
+            return task._loop
+
+        def get_running_loop():
+            loop = asyncio.get_event_loop()
+            if not loop.is_running():
+                raise RuntimeError("There is no running event loop")
+            return loop
+
+    return get_task_loop, get_running_loop
+
+
 get_task_loop, get_running_loop = load_loop_functions()
diff --git a/loguru/_better_exceptions.py b/loguru/_better_exceptions.py
index 6b15688..8327a13 100644
--- a/loguru/_better_exceptions.py
+++ b/loguru/_better_exceptions.py
@@ -9,46 +9,142 @@ import sys
 import sysconfig
 import tokenize
 import traceback
+
 if sys.version_info >= (3, 11):
+
+    def is_exception_group(exc):
+        return isinstance(exc, ExceptionGroup)
+
 else:
     try:
         from exceptiongroup import ExceptionGroup
     except ImportError:

+        def is_exception_group(exc):
+            return False
+
+    else:
+
+        def is_exception_group(exc):
+            return isinstance(exc, ExceptionGroup)
+

 class SyntaxHighlighter:
-    _default_style = {'comment': '\x1b[30m\x1b[1m{}\x1b[0m', 'keyword':
-        '\x1b[35m\x1b[1m{}\x1b[0m', 'builtin': '\x1b[1m{}\x1b[0m', 'string':
-        '\x1b[36m{}\x1b[0m', 'number': '\x1b[34m\x1b[1m{}\x1b[0m',
-        'operator': '\x1b[35m\x1b[1m{}\x1b[0m', 'punctuation':
-        '\x1b[1m{}\x1b[0m', 'constant': '\x1b[36m\x1b[1m{}\x1b[0m',
-        'identifier': '\x1b[1m{}\x1b[0m', 'other': '{}'}
+    _default_style = {
+        "comment": "\x1b[30m\x1b[1m{}\x1b[0m",
+        "keyword": "\x1b[35m\x1b[1m{}\x1b[0m",
+        "builtin": "\x1b[1m{}\x1b[0m",
+        "string": "\x1b[36m{}\x1b[0m",
+        "number": "\x1b[34m\x1b[1m{}\x1b[0m",
+        "operator": "\x1b[35m\x1b[1m{}\x1b[0m",
+        "punctuation": "\x1b[1m{}\x1b[0m",
+        "constant": "\x1b[36m\x1b[1m{}\x1b[0m",
+        "identifier": "\x1b[1m{}\x1b[0m",
+        "other": "{}",
+    }
+
     _builtins = set(dir(builtins))
-    _constants = {'True', 'False', 'None'}
-    _punctation = {'(', ')', '[', ']', '{', '}', ':', ',', ';'}
+    _constants = {"True", "False", "None"}
+    _punctation = {"(", ")", "[", "]", "{", "}", ":", ",", ";"}
     _strings = {tokenize.STRING}
     _fstring_middle = None
+
     if sys.version_info >= (3, 12):
-        _strings.update({tokenize.FSTRING_START, tokenize.FSTRING_MIDDLE,
-            tokenize.FSTRING_END})
+        _strings.update({tokenize.FSTRING_START, tokenize.FSTRING_MIDDLE, tokenize.FSTRING_END})
         _fstring_middle = tokenize.FSTRING_MIDDLE

     def __init__(self, style=None):
         self._style = style or self._default_style

+    def highlight(self, source):
+        style = self._style
+        row, column = 0, 0
+        output = ""
+
+        for token in self.tokenize(source):
+            type_, string, (start_row, start_column), (_, end_column), line = token
+
+            if type_ == self._fstring_middle:
+                # When an f-string contains "{{" or "}}", they appear as "{" or "}" in the "string"
+                # attribute of the token. However, they do not count in the column position.
+                end_column += string.count("{") + string.count("}")
+
+            if type_ == tokenize.NAME:
+                if string in self._constants:
+                    color = style["constant"]
+                elif keyword.iskeyword(string):
+                    color = style["keyword"]
+                elif string in self._builtins:
+                    color = style["builtin"]
+                else:
+                    color = style["identifier"]
+            elif type_ == tokenize.OP:
+                if string in self._punctation:
+                    color = style["punctuation"]
+                else:
+                    color = style["operator"]
+            elif type_ == tokenize.NUMBER:
+                color = style["number"]
+            elif type_ in self._strings:
+                color = style["string"]
+            elif type_ == tokenize.COMMENT:
+                color = style["comment"]
+            else:
+                color = style["other"]
+
+            if start_row != row:
+                source = source[column:]
+                row, column = start_row, 0
+
+            if type_ != tokenize.ENCODING:
+                output += line[column:start_column]
+                output += color.format(line[start_column:end_column])
+
+            column = end_column
+
+        output += source[column:]
+
+        return output
+
+    @staticmethod
+    def tokenize(source):
+        # Worth reading: https://www.asmeurer.com/brown-water-python/
+        source = source.encode("utf-8")
+        source = io.BytesIO(source)
+
+        try:
+            yield from tokenize.tokenize(source.readline)
+        except tokenize.TokenError:
+            return
+

 class ExceptionFormatter:
-    _default_theme = {'introduction': '\x1b[33m\x1b[1m{}\x1b[0m', 'cause':
-        '\x1b[1m{}\x1b[0m', 'context': '\x1b[1m{}\x1b[0m', 'dirname':
-        '\x1b[32m{}\x1b[0m', 'basename': '\x1b[32m\x1b[1m{}\x1b[0m', 'line':
-        '\x1b[33m{}\x1b[0m', 'function': '\x1b[35m{}\x1b[0m',
-        'exception_type': '\x1b[31m\x1b[1m{}\x1b[0m', 'exception_value':
-        '\x1b[1m{}\x1b[0m', 'arrows': '\x1b[36m{}\x1b[0m', 'value':
-        '\x1b[36m\x1b[1m{}\x1b[0m'}
-
-    def __init__(self, colorize=False, backtrace=False, diagnose=True,
-        theme=None, style=None, max_length=128, encoding='ascii',
-        hidden_frames_filename=None, prefix=''):
+    _default_theme = {
+        "introduction": "\x1b[33m\x1b[1m{}\x1b[0m",
+        "cause": "\x1b[1m{}\x1b[0m",
+        "context": "\x1b[1m{}\x1b[0m",
+        "dirname": "\x1b[32m{}\x1b[0m",
+        "basename": "\x1b[32m\x1b[1m{}\x1b[0m",
+        "line": "\x1b[33m{}\x1b[0m",
+        "function": "\x1b[35m{}\x1b[0m",
+        "exception_type": "\x1b[31m\x1b[1m{}\x1b[0m",
+        "exception_value": "\x1b[1m{}\x1b[0m",
+        "arrows": "\x1b[36m{}\x1b[0m",
+        "value": "\x1b[36m\x1b[1m{}\x1b[0m",
+    }
+
+    def __init__(
+        self,
+        colorize=False,
+        backtrace=False,
+        diagnose=True,
+        theme=None,
+        style=None,
+        max_length=128,
+        encoding="ascii",
+        hidden_frames_filename=None,
+        prefix="",
+    ):
         self._colorize = colorize
         self._diagnose = diagnose
         self._theme = theme or self._default_theme
@@ -59,6 +155,374 @@ class ExceptionFormatter:
         self._hidden_frames_filename = hidden_frames_filename
         self._prefix = prefix
         self._lib_dirs = self._get_lib_dirs()
-        self._pipe_char = self._get_char('│', '|')
-        self._cap_char = self._get_char('└', '->')
-        self._catch_point_identifier = ' <Loguru catch point here>'
+        self._pipe_char = self._get_char("\u2502", "|")
+        self._cap_char = self._get_char("\u2514", "->")
+        self._catch_point_identifier = " <Loguru catch point here>"
+
+    @staticmethod
+    def _get_lib_dirs():
+        schemes = sysconfig.get_scheme_names()
+        names = ["stdlib", "platstdlib", "platlib", "purelib"]
+        paths = {sysconfig.get_path(name, scheme) for scheme in schemes for name in names}
+        return [os.path.abspath(path).lower() + os.sep for path in paths if path in sys.path]
+
+    @staticmethod
+    def _indent(text, count, *, prefix="| "):
+        if count == 0:
+            yield text
+            return
+        for line in text.splitlines(True):
+            indented = "  " * count + prefix + line
+            yield indented.rstrip() + "\n"
+
+    def _get_char(self, char, default):
+        try:
+            char.encode(self._encoding)
+        except (UnicodeEncodeError, LookupError):
+            return default
+        else:
+            return char
+
+    def _is_file_mine(self, file):
+        filepath = os.path.abspath(file).lower()
+        if not filepath.endswith(".py"):
+            return False
+        return not any(filepath.startswith(d) for d in self._lib_dirs)
+
+    def _extract_frames(self, tb, is_first, *, limit=None, from_decorator=False):
+        frames, final_source = [], None
+
+        if tb is None or (limit is not None and limit <= 0):
+            return frames, final_source
+
+        def is_valid(frame):
+            return frame.f_code.co_filename != self._hidden_frames_filename
+
+        def get_info(frame, lineno):
+            filename = frame.f_code.co_filename
+            function = frame.f_code.co_name
+            source = linecache.getline(filename, lineno).strip()
+            return filename, lineno, function, source
+
+        infos = []
+
+        if is_valid(tb.tb_frame):
+            infos.append((get_info(tb.tb_frame, tb.tb_lineno), tb.tb_frame))
+
+        get_parent_only = from_decorator and not self._backtrace
+
+        if (self._backtrace and is_first) or get_parent_only:
+            frame = tb.tb_frame.f_back
+            while frame:
+                if is_valid(frame):
+                    infos.insert(0, (get_info(frame, frame.f_lineno), frame))
+                    if get_parent_only:
+                        break
+                frame = frame.f_back
+
+            if infos and not get_parent_only:
+                (filename, lineno, function, source), frame = infos[-1]
+                function += self._catch_point_identifier
+                infos[-1] = ((filename, lineno, function, source), frame)
+
+        tb = tb.tb_next
+
+        while tb:
+            if is_valid(tb.tb_frame):
+                infos.append((get_info(tb.tb_frame, tb.tb_lineno), tb.tb_frame))
+            tb = tb.tb_next
+
+        if limit is not None:
+            infos = infos[-limit:]
+
+        for (filename, lineno, function, source), frame in infos:
+            final_source = source
+            if source:
+                colorize = self._colorize and self._is_file_mine(filename)
+                lines = []
+                if colorize:
+                    lines.append(self._syntax_highlighter.highlight(source))
+                else:
+                    lines.append(source)
+                if self._diagnose:
+                    relevant_values = self._get_relevant_values(source, frame)
+                    values = self._format_relevant_values(list(relevant_values), colorize)
+                    lines += list(values)
+                source = "\n    ".join(lines)
+            frames.append((filename, lineno, function, source))
+
+        return frames, final_source
+
+    def _get_relevant_values(self, source, frame):
+        value = None
+        pending = None
+        is_attribute = False
+        is_valid_value = False
+        is_assignment = True
+
+        for token in self._syntax_highlighter.tokenize(source):
+            type_, string, (_, col), *_ = token
+
+            if pending is not None:
+                # Keyword arguments are ignored
+                if type_ != tokenize.OP or string != "=" or is_assignment:
+                    yield pending
+                pending = None
+
+            if type_ == tokenize.NAME and not keyword.iskeyword(string):
+                if not is_attribute:
+                    for variables in (frame.f_locals, frame.f_globals):
+                        try:
+                            value = variables[string]
+                        except KeyError:
+                            continue
+                        else:
+                            is_valid_value = True
+                            pending = (col, self._format_value(value))
+                            break
+                elif is_valid_value:
+                    try:
+                        value = inspect.getattr_static(value, string)
+                    except AttributeError:
+                        is_valid_value = False
+                    else:
+                        yield (col, self._format_value(value))
+            elif type_ == tokenize.OP and string == ".":
+                is_attribute = True
+                is_assignment = False
+            elif type_ == tokenize.OP and string == ";":
+                is_assignment = True
+                is_attribute = False
+                is_valid_value = False
+            else:
+                is_attribute = False
+                is_valid_value = False
+                is_assignment = False
+
+        if pending is not None:
+            yield pending
+
+    def _format_relevant_values(self, relevant_values, colorize):
+        for i in reversed(range(len(relevant_values))):
+            col, value = relevant_values[i]
+            pipe_cols = [pcol for pcol, _ in relevant_values[:i]]
+            pre_line = ""
+            index = 0
+
+            for pc in pipe_cols:
+                pre_line += (" " * (pc - index)) + self._pipe_char
+                index = pc + 1
+
+            pre_line += " " * (col - index)
+            value_lines = value.split("\n")
+
+            for n, value_line in enumerate(value_lines):
+                if n == 0:
+                    arrows = pre_line + self._cap_char + " "
+                else:
+                    arrows = pre_line + " " * (len(self._cap_char) + 1)
+
+                if colorize:
+                    arrows = self._theme["arrows"].format(arrows)
+                    value_line = self._theme["value"].format(value_line)
+
+                yield arrows + value_line
+
+    def _format_value(self, v):
+        try:
+            v = repr(v)
+        except Exception:
+            v = "<unprintable %s object>" % type(v).__name__
+
+        max_length = self._max_length
+        if max_length is not None and len(v) > max_length:
+            v = v[: max_length - 3] + "..."
+        return v
+
+    def _format_locations(self, frames_lines, *, has_introduction):
+        prepend_with_new_line = has_introduction
+        regex = r'^  File "(?P<file>.*?)", line (?P<line>[^,]+)(?:, in (?P<function>.*))?\n'
+
+        for frame in frames_lines:
+            match = re.match(regex, frame)
+
+            if match:
+                file, line, function = match.group("file", "line", "function")
+
+                is_mine = self._is_file_mine(file)
+
+                if function is not None:
+                    pattern = '  File "{}", line {}, in {}\n'
+                else:
+                    pattern = '  File "{}", line {}\n'
+
+                if self._backtrace and function and function.endswith(self._catch_point_identifier):
+                    function = function[: -len(self._catch_point_identifier)]
+                    pattern = ">" + pattern[1:]
+
+                if self._colorize and is_mine:
+                    dirname, basename = os.path.split(file)
+                    if dirname:
+                        dirname += os.sep
+                    dirname = self._theme["dirname"].format(dirname)
+                    basename = self._theme["basename"].format(basename)
+                    file = dirname + basename
+                    line = self._theme["line"].format(line)
+                    function = self._theme["function"].format(function)
+
+                if self._diagnose and (is_mine or prepend_with_new_line):
+                    pattern = "\n" + pattern
+
+                location = pattern.format(file, line, function)
+                frame = location + frame[match.end() :]
+                prepend_with_new_line = is_mine
+
+            yield frame
+
+    def _format_exception(
+        self, value, tb, *, seen=None, is_first=False, from_decorator=False, group_nesting=0
+    ):
+        # Implemented from built-in traceback module:
+        # https://github.com/python/cpython/blob/a5b76167/Lib/traceback.py#L468
+        exc_type, exc_value, exc_traceback = type(value), value, tb
+
+        if seen is None:
+            seen = set()
+
+        seen.add(id(exc_value))
+
+        if exc_value:
+            if exc_value.__cause__ is not None and id(exc_value.__cause__) not in seen:
+                yield from self._format_exception(
+                    exc_value.__cause__,
+                    exc_value.__cause__.__traceback__,
+                    seen=seen,
+                    group_nesting=group_nesting,
+                )
+                cause = "The above exception was the direct cause of the following exception:"
+                if self._colorize:
+                    cause = self._theme["cause"].format(cause)
+                if self._diagnose:
+                    yield from self._indent("\n\n" + cause + "\n\n\n", group_nesting)
+                else:
+                    yield from self._indent("\n" + cause + "\n\n", group_nesting)
+
+            elif (
+                exc_value.__context__ is not None
+                and id(exc_value.__context__) not in seen
+                and not exc_value.__suppress_context__
+            ):
+                yield from self._format_exception(
+                    exc_value.__context__,
+                    exc_value.__context__.__traceback__,
+                    seen=seen,
+                    group_nesting=group_nesting,
+                )
+                context = "During handling of the above exception, another exception occurred:"
+                if self._colorize:
+                    context = self._theme["context"].format(context)
+                if self._diagnose:
+                    yield from self._indent("\n\n" + context + "\n\n\n", group_nesting)
+                else:
+                    yield from self._indent("\n" + context + "\n\n", group_nesting)
+
+        is_grouped = is_exception_group(value)
+
+        if is_grouped and group_nesting == 0:
+            yield from self._format_exception(
+                value,
+                tb,
+                seen=seen,
+                group_nesting=1,
+                is_first=is_first,
+                from_decorator=from_decorator,
+            )
+            return
+
+        try:
+            traceback_limit = sys.tracebacklimit
+        except AttributeError:
+            traceback_limit = None
+
+        frames, final_source = self._extract_frames(
+            exc_traceback, is_first, limit=traceback_limit, from_decorator=from_decorator
+        )
+        exception_only = traceback.format_exception_only(exc_type, exc_value)
+
+        # Determining the correct index for the "Exception: message" part in the formatted exception
+        # is challenging. This is because it might be preceded by multiple lines specific to
+        # "SyntaxError" or followed by various notes. However, we can make an educated guess based
+        # on the indentation; the preliminary context for "SyntaxError" is always indented, while
+        # the Exception itself is not. This allows us to identify the correct index for the
+        # exception message.
+        for error_message_index, part in enumerate(exception_only):  # noqa: B007
+            if not part.startswith(" "):
+                break
+
+        error_message = exception_only[error_message_index][:-1]  # Remove last new line temporarily
+
+        if self._colorize:
+            if ":" in error_message:
+                exception_type, exception_value = error_message.split(":", 1)
+                exception_type = self._theme["exception_type"].format(exception_type)
+                exception_value = self._theme["exception_value"].format(exception_value)
+                error_message = exception_type + ":" + exception_value
+            else:
+                error_message = self._theme["exception_type"].format(error_message)
+
+        if self._diagnose and frames:
+            if issubclass(exc_type, AssertionError) and not str(exc_value) and final_source:
+                if self._colorize:
+                    final_source = self._syntax_highlighter.highlight(final_source)
+                error_message += ": " + final_source
+
+            error_message = "\n" + error_message
+
+        exception_only[error_message_index] = error_message + "\n"
+
+        if is_first:
+            yield self._prefix
+
+        has_introduction = bool(frames)
+
+        if has_introduction:
+            if is_grouped:
+                introduction = "Exception Group Traceback (most recent call last):"
+            else:
+                introduction = "Traceback (most recent call last):"
+            if self._colorize:
+                introduction = self._theme["introduction"].format(introduction)
+            if group_nesting == 1:  # Implies we're processing the root ExceptionGroup.
+                yield from self._indent(introduction + "\n", group_nesting, prefix="+ ")
+            else:
+                yield from self._indent(introduction + "\n", group_nesting)
+
+        frames_lines = traceback.format_list(frames) + exception_only
+        if self._colorize or self._backtrace or self._diagnose:
+            frames_lines = self._format_locations(frames_lines, has_introduction=has_introduction)
+
+        yield from self._indent("".join(frames_lines), group_nesting)
+
+        if is_grouped:
+            for n, exc in enumerate(value.exceptions, start=1):
+                ruler = "+" + (" %s " % ("..." if n > 15 else n)).center(35, "-")
+                yield from self._indent(ruler, group_nesting, prefix="+-" if n == 1 else "  ")
+                if n > 15:
+                    message = "and %d more exceptions\n" % (len(value.exceptions) - 15)
+                    yield from self._indent(message, group_nesting + 1)
+                    break
+                elif group_nesting == 10 and is_exception_group(exc):
+                    message = "... (max_group_depth is 10)\n"
+                    yield from self._indent(message, group_nesting + 1)
+                else:
+                    yield from self._format_exception(
+                        exc,
+                        exc.__traceback__,
+                        seen=seen,
+                        group_nesting=group_nesting + 1,
+                    )
+            if not is_exception_group(exc) or group_nesting == 10:
+                yield from self._indent("-" * 35, group_nesting + 1, prefix="+-")
+
+    def format_exception(self, type_, value, tb, *, from_decorator=False):
+        yield from self._format_exception(value, tb, is_first=True, from_decorator=from_decorator)
diff --git a/loguru/_colorama.py b/loguru/_colorama.py
index 82cda0b..b69c700 100644
--- a/loguru/_colorama.py
+++ b/loguru/_colorama.py
@@ -1,2 +1,66 @@
 import os
 import sys
+
+
+def should_colorize(stream):
+    if stream is None:
+        return False
+
+    if stream is sys.stdout or stream is sys.stderr:
+        try:
+            import ipykernel
+            import IPython
+
+            ipython = IPython.get_ipython()
+            is_jupyter_stream = isinstance(stream, ipykernel.iostream.OutStream)
+            is_jupyter_shell = isinstance(ipython, ipykernel.zmqshell.ZMQInteractiveShell)
+        except Exception:
+            pass
+        else:
+            if is_jupyter_stream and is_jupyter_shell:
+                return True
+
+    if stream is sys.__stdout__ or stream is sys.__stderr__:
+        if "CI" in os.environ and any(
+            ci in os.environ
+            for ci in ["TRAVIS", "CIRCLECI", "APPVEYOR", "GITLAB_CI", "GITHUB_ACTIONS"]
+        ):
+            return True
+        if "PYCHARM_HOSTED" in os.environ:
+            return True
+        if os.name == "nt" and "TERM" in os.environ:
+            return True
+
+    try:
+        return stream.isatty()
+    except Exception:
+        return False
+
+
+def should_wrap(stream):
+    if os.name != "nt":
+        return False
+
+    if stream is not sys.__stdout__ and stream is not sys.__stderr__:
+        return False
+
+    from colorama.win32 import winapi_test
+
+    if not winapi_test():
+        return False
+
+    try:
+        from colorama.winterm import enable_vt_processing
+    except ImportError:
+        return True
+
+    try:
+        return not enable_vt_processing(stream.fileno())
+    except Exception:
+        return True
+
+
+def wrap(stream):
+    from colorama import AnsiToWin32
+
+    return AnsiToWin32(stream, convert=True, strip=True, autoreset=False).stream
diff --git a/loguru/_colorizer.py b/loguru/_colorizer.py
index b5aa9a1..ed87cdb 100644
--- a/loguru/_colorizer.py
+++ b/loguru/_colorizer.py
@@ -25,6 +25,7 @@ class Fore:
     CYAN = 36
     WHITE = 37
     RESET = 39
+
     LIGHTBLACK_EX = 90
     LIGHTRED_EX = 91
     LIGHTGREEN_EX = 92
@@ -45,6 +46,7 @@ class Back:
     CYAN = 46
     WHITE = 47
     RESET = 49
+
     LIGHTBLACK_EX = 100
     LIGHTRED_EX = 101
     LIGHTGREEN_EX = 102
@@ -55,6 +57,10 @@ class Back:
     LIGHTWHITE_EX = 107


+def ansi_escape(codes):
+    return {name: "\033[%dm" % code for name, code in codes.items()}
+
+
 class TokenType:
     TEXT = 1
     ANSI = 2
@@ -63,66 +69,403 @@ class TokenType:


 class AnsiParser:
-    _style = ansi_escape({'b': Style.BOLD, 'd': Style.DIM, 'n': Style.
-        NORMAL, 'h': Style.HIDE, 'i': Style.ITALIC, 'l': Style.BLINK, 's':
-        Style.STRIKE, 'u': Style.UNDERLINE, 'v': Style.REVERSE, 'bold':
-        Style.BOLD, 'dim': Style.DIM, 'normal': Style.NORMAL, 'hide': Style
-        .HIDE, 'italic': Style.ITALIC, 'blink': Style.BLINK, 'strike':
-        Style.STRIKE, 'underline': Style.UNDERLINE, 'reverse': Style.REVERSE})
-    _foreground = ansi_escape({'k': Fore.BLACK, 'r': Fore.RED, 'g': Fore.
-        GREEN, 'y': Fore.YELLOW, 'e': Fore.BLUE, 'm': Fore.MAGENTA, 'c':
-        Fore.CYAN, 'w': Fore.WHITE, 'lk': Fore.LIGHTBLACK_EX, 'lr': Fore.
-        LIGHTRED_EX, 'lg': Fore.LIGHTGREEN_EX, 'ly': Fore.LIGHTYELLOW_EX,
-        'le': Fore.LIGHTBLUE_EX, 'lm': Fore.LIGHTMAGENTA_EX, 'lc': Fore.
-        LIGHTCYAN_EX, 'lw': Fore.LIGHTWHITE_EX, 'black': Fore.BLACK, 'red':
-        Fore.RED, 'green': Fore.GREEN, 'yellow': Fore.YELLOW, 'blue': Fore.
-        BLUE, 'magenta': Fore.MAGENTA, 'cyan': Fore.CYAN, 'white': Fore.
-        WHITE, 'light-black': Fore.LIGHTBLACK_EX, 'light-red': Fore.
-        LIGHTRED_EX, 'light-green': Fore.LIGHTGREEN_EX, 'light-yellow':
-        Fore.LIGHTYELLOW_EX, 'light-blue': Fore.LIGHTBLUE_EX,
-        'light-magenta': Fore.LIGHTMAGENTA_EX, 'light-cyan': Fore.
-        LIGHTCYAN_EX, 'light-white': Fore.LIGHTWHITE_EX})
-    _background = ansi_escape({'K': Back.BLACK, 'R': Back.RED, 'G': Back.
-        GREEN, 'Y': Back.YELLOW, 'E': Back.BLUE, 'M': Back.MAGENTA, 'C':
-        Back.CYAN, 'W': Back.WHITE, 'LK': Back.LIGHTBLACK_EX, 'LR': Back.
-        LIGHTRED_EX, 'LG': Back.LIGHTGREEN_EX, 'LY': Back.LIGHTYELLOW_EX,
-        'LE': Back.LIGHTBLUE_EX, 'LM': Back.LIGHTMAGENTA_EX, 'LC': Back.
-        LIGHTCYAN_EX, 'LW': Back.LIGHTWHITE_EX, 'BLACK': Back.BLACK, 'RED':
-        Back.RED, 'GREEN': Back.GREEN, 'YELLOW': Back.YELLOW, 'BLUE': Back.
-        BLUE, 'MAGENTA': Back.MAGENTA, 'CYAN': Back.CYAN, 'WHITE': Back.
-        WHITE, 'LIGHT-BLACK': Back.LIGHTBLACK_EX, 'LIGHT-RED': Back.
-        LIGHTRED_EX, 'LIGHT-GREEN': Back.LIGHTGREEN_EX, 'LIGHT-YELLOW':
-        Back.LIGHTYELLOW_EX, 'LIGHT-BLUE': Back.LIGHTBLUE_EX,
-        'LIGHT-MAGENTA': Back.LIGHTMAGENTA_EX, 'LIGHT-CYAN': Back.
-        LIGHTCYAN_EX, 'LIGHT-WHITE': Back.LIGHTWHITE_EX})
-    _regex_tag = re.compile('\\\\?</?((?:[fb]g\\s)?[^<>\\s]*)>')
+    _style = ansi_escape(
+        {
+            "b": Style.BOLD,
+            "d": Style.DIM,
+            "n": Style.NORMAL,
+            "h": Style.HIDE,
+            "i": Style.ITALIC,
+            "l": Style.BLINK,
+            "s": Style.STRIKE,
+            "u": Style.UNDERLINE,
+            "v": Style.REVERSE,
+            "bold": Style.BOLD,
+            "dim": Style.DIM,
+            "normal": Style.NORMAL,
+            "hide": Style.HIDE,
+            "italic": Style.ITALIC,
+            "blink": Style.BLINK,
+            "strike": Style.STRIKE,
+            "underline": Style.UNDERLINE,
+            "reverse": Style.REVERSE,
+        }
+    )
+
+    _foreground = ansi_escape(
+        {
+            "k": Fore.BLACK,
+            "r": Fore.RED,
+            "g": Fore.GREEN,
+            "y": Fore.YELLOW,
+            "e": Fore.BLUE,
+            "m": Fore.MAGENTA,
+            "c": Fore.CYAN,
+            "w": Fore.WHITE,
+            "lk": Fore.LIGHTBLACK_EX,
+            "lr": Fore.LIGHTRED_EX,
+            "lg": Fore.LIGHTGREEN_EX,
+            "ly": Fore.LIGHTYELLOW_EX,
+            "le": Fore.LIGHTBLUE_EX,
+            "lm": Fore.LIGHTMAGENTA_EX,
+            "lc": Fore.LIGHTCYAN_EX,
+            "lw": Fore.LIGHTWHITE_EX,
+            "black": Fore.BLACK,
+            "red": Fore.RED,
+            "green": Fore.GREEN,
+            "yellow": Fore.YELLOW,
+            "blue": Fore.BLUE,
+            "magenta": Fore.MAGENTA,
+            "cyan": Fore.CYAN,
+            "white": Fore.WHITE,
+            "light-black": Fore.LIGHTBLACK_EX,
+            "light-red": Fore.LIGHTRED_EX,
+            "light-green": Fore.LIGHTGREEN_EX,
+            "light-yellow": Fore.LIGHTYELLOW_EX,
+            "light-blue": Fore.LIGHTBLUE_EX,
+            "light-magenta": Fore.LIGHTMAGENTA_EX,
+            "light-cyan": Fore.LIGHTCYAN_EX,
+            "light-white": Fore.LIGHTWHITE_EX,
+        }
+    )
+
+    _background = ansi_escape(
+        {
+            "K": Back.BLACK,
+            "R": Back.RED,
+            "G": Back.GREEN,
+            "Y": Back.YELLOW,
+            "E": Back.BLUE,
+            "M": Back.MAGENTA,
+            "C": Back.CYAN,
+            "W": Back.WHITE,
+            "LK": Back.LIGHTBLACK_EX,
+            "LR": Back.LIGHTRED_EX,
+            "LG": Back.LIGHTGREEN_EX,
+            "LY": Back.LIGHTYELLOW_EX,
+            "LE": Back.LIGHTBLUE_EX,
+            "LM": Back.LIGHTMAGENTA_EX,
+            "LC": Back.LIGHTCYAN_EX,
+            "LW": Back.LIGHTWHITE_EX,
+            "BLACK": Back.BLACK,
+            "RED": Back.RED,
+            "GREEN": Back.GREEN,
+            "YELLOW": Back.YELLOW,
+            "BLUE": Back.BLUE,
+            "MAGENTA": Back.MAGENTA,
+            "CYAN": Back.CYAN,
+            "WHITE": Back.WHITE,
+            "LIGHT-BLACK": Back.LIGHTBLACK_EX,
+            "LIGHT-RED": Back.LIGHTRED_EX,
+            "LIGHT-GREEN": Back.LIGHTGREEN_EX,
+            "LIGHT-YELLOW": Back.LIGHTYELLOW_EX,
+            "LIGHT-BLUE": Back.LIGHTBLUE_EX,
+            "LIGHT-MAGENTA": Back.LIGHTMAGENTA_EX,
+            "LIGHT-CYAN": Back.LIGHTCYAN_EX,
+            "LIGHT-WHITE": Back.LIGHTWHITE_EX,
+        }
+    )
+
+    _regex_tag = re.compile(r"\\?</?((?:[fb]g\s)?[^<>\s]*)>")

     def __init__(self):
         self._tokens = []
         self._tags = []
         self._color_tokens = []

+    @staticmethod
+    def strip(tokens):
+        output = ""
+        for type_, value in tokens:
+            if type_ == TokenType.TEXT:
+                output += value
+        return output
+
+    @staticmethod
+    def colorize(tokens, ansi_level):
+        output = ""
+
+        for type_, value in tokens:
+            if type_ == TokenType.LEVEL:
+                if ansi_level is None:
+                    raise ValueError(
+                        "The '<level>' color tag is not allowed in this context, "
+                        "it has not yet been associated to any color value."
+                    )
+                value = ansi_level
+            output += value
+
+        return output
+
+    @staticmethod
+    def wrap(tokens, *, ansi_level, color_tokens):
+        output = ""
+
+        for type_, value in tokens:
+            if type_ == TokenType.LEVEL:
+                value = ansi_level
+            output += value
+            if type_ == TokenType.CLOSING:
+                for subtype, subvalue in color_tokens:
+                    if subtype == TokenType.LEVEL:
+                        subvalue = ansi_level
+                    output += subvalue
+
+        return output
+
+    def feed(self, text, *, raw=False):
+        if raw:
+            self._tokens.append((TokenType.TEXT, text))
+            return
+
+        position = 0
+
+        for match in self._regex_tag.finditer(text):
+            markup, tag = match.group(0), match.group(1)
+
+            self._tokens.append((TokenType.TEXT, text[position : match.start()]))
+
+            position = match.end()
+
+            if markup[0] == "\\":
+                self._tokens.append((TokenType.TEXT, markup[1:]))
+                continue
+
+            if markup[1] == "/":
+                if self._tags and (tag == "" or tag == self._tags[-1]):
+                    self._tags.pop()
+                    self._color_tokens.pop()
+                    self._tokens.append((TokenType.CLOSING, "\033[0m"))
+                    self._tokens.extend(self._color_tokens)
+                    continue
+                elif tag in self._tags:
+                    raise ValueError('Closing tag "%s" violates nesting rules' % markup)
+                else:
+                    raise ValueError('Closing tag "%s" has no corresponding opening tag' % markup)
+
+            if tag in {"lvl", "level"}:
+                token = (TokenType.LEVEL, None)
+            else:
+                ansi = self._get_ansicode(tag)
+
+                if ansi is None:
+                    raise ValueError(
+                        'Tag "%s" does not correspond to any known color directive, '
+                        "make sure you did not misspelled it (or prepend '\\' to escape it)"
+                        % markup
+                    )
+
+                token = (TokenType.ANSI, ansi)
+
+            self._tags.append(tag)
+            self._color_tokens.append(token)
+            self._tokens.append(token)
+
+        self._tokens.append((TokenType.TEXT, text[position:]))
+
+    def done(self, *, strict=True):
+        if strict and self._tags:
+            faulty_tag = self._tags.pop(0)
+            raise ValueError('Opening tag "<%s>" has no corresponding closing tag' % faulty_tag)
+        return self._tokens
+
+    def current_color_tokens(self):
+        return list(self._color_tokens)
+
+    def _get_ansicode(self, tag):
+        style = self._style
+        foreground = self._foreground
+        background = self._background
+
+        # Substitute on a direct match.
+        if tag in style:
+            return style[tag]
+        elif tag in foreground:
+            return foreground[tag]
+        elif tag in background:
+            return background[tag]
+
+        # An alternative syntax for setting the color (e.g. <fg red>, <bg red>).
+        elif tag.startswith("fg ") or tag.startswith("bg "):
+            st, color = tag[:2], tag[3:]
+            code = "38" if st == "fg" else "48"
+
+            if st == "fg" and color.lower() in foreground:
+                return foreground[color.lower()]
+            elif st == "bg" and color.upper() in background:
+                return background[color.upper()]
+            elif color.isdigit() and int(color) <= 255:
+                return "\033[%s;5;%sm" % (code, color)
+            elif re.match(r"#(?:[a-fA-F0-9]{3}){1,2}$", color):
+                hex_color = color[1:]
+                if len(hex_color) == 3:
+                    hex_color *= 2
+                rgb = tuple(int(hex_color[i : i + 2], 16) for i in (0, 2, 4))
+                return "\033[%s;2;%s;%s;%sm" % ((code,) + rgb)
+            elif color.count(",") == 2:
+                colors = tuple(color.split(","))
+                if all(x.isdigit() and int(x) <= 255 for x in colors):
+                    return "\033[%s;2;%s;%s;%sm" % ((code,) + colors)
+
+        return None
+

 class ColoringMessage(str):
-    __fields__ = '_messages',
+    __fields__ = ("_messages",)

     def __format__(self, spec):
         return next(self._messages).__format__(spec)


 class ColoredMessage:
-
     def __init__(self, tokens):
         self.tokens = tokens
         self.stripped = AnsiParser.strip(tokens)

+    def colorize(self, ansi_level):
+        return AnsiParser.colorize(self.tokens, ansi_level)

-class ColoredFormat:

+class ColoredFormat:
     def __init__(self, tokens, messages_color_tokens):
         self._tokens = tokens
         self._messages_color_tokens = messages_color_tokens

+    def strip(self):
+        return AnsiParser.strip(self._tokens)
+
+    def colorize(self, ansi_level):
+        return AnsiParser.colorize(self._tokens, ansi_level)
+
+    def make_coloring_message(self, message, *, ansi_level, colored_message):
+        messages = [
+            message
+            if color_tokens is None
+            else AnsiParser.wrap(
+                colored_message.tokens, ansi_level=ansi_level, color_tokens=color_tokens
+            )
+            for color_tokens in self._messages_color_tokens
+        ]
+        coloring = ColoringMessage(message)
+        coloring._messages = iter(messages)
+        return coloring
+

 class Colorizer:
-    pass
+    @staticmethod
+    def prepare_format(string):
+        tokens, messages_color_tokens = Colorizer._parse_without_formatting(string)
+        return ColoredFormat(tokens, messages_color_tokens)
+
+    @staticmethod
+    def prepare_message(string, args=(), kwargs={}):  # noqa: B006
+        tokens = Colorizer._parse_with_formatting(string, args, kwargs)
+        return ColoredMessage(tokens)
+
+    @staticmethod
+    def prepare_simple_message(string):
+        parser = AnsiParser()
+        parser.feed(string)
+        tokens = parser.done()
+        return ColoredMessage(tokens)
+
+    @staticmethod
+    def ansify(text):
+        parser = AnsiParser()
+        parser.feed(text.strip())
+        tokens = parser.done(strict=False)
+        return AnsiParser.colorize(tokens, None)
+
+    @staticmethod
+    def _parse_with_formatting(
+        string, args, kwargs, *, recursion_depth=2, auto_arg_index=0, recursive=False
+    ):
+        # This function re-implements Formatter._vformat()
+
+        if recursion_depth < 0:
+            raise ValueError("Max string recursion exceeded")
+
+        formatter = Formatter()
+        parser = AnsiParser()
+
+        for literal_text, field_name, format_spec, conversion in formatter.parse(string):
+            parser.feed(literal_text, raw=recursive)
+
+            if field_name is not None:
+                if field_name == "":
+                    if auto_arg_index is False:
+                        raise ValueError(
+                            "cannot switch from manual field "
+                            "specification to automatic field "
+                            "numbering"
+                        )
+                    field_name = str(auto_arg_index)
+                    auto_arg_index += 1
+                elif field_name.isdigit():
+                    if auto_arg_index:
+                        raise ValueError(
+                            "cannot switch from manual field "
+                            "specification to automatic field "
+                            "numbering"
+                        )
+                    auto_arg_index = False
+
+                obj, _ = formatter.get_field(field_name, args, kwargs)
+                obj = formatter.convert_field(obj, conversion)
+
+                format_spec, auto_arg_index = Colorizer._parse_with_formatting(
+                    format_spec,
+                    args,
+                    kwargs,
+                    recursion_depth=recursion_depth - 1,
+                    auto_arg_index=auto_arg_index,
+                    recursive=True,
+                )
+
+                formatted = formatter.format_field(obj, format_spec)
+                parser.feed(formatted, raw=True)
+
+        tokens = parser.done()
+
+        if recursive:
+            return AnsiParser.strip(tokens), auto_arg_index
+
+        return tokens
+
+    @staticmethod
+    def _parse_without_formatting(string, *, recursion_depth=2, recursive=False):
+        if recursion_depth < 0:
+            raise ValueError("Max string recursion exceeded")
+
+        formatter = Formatter()
+        parser = AnsiParser()
+
+        messages_color_tokens = []
+
+        for literal_text, field_name, format_spec, conversion in formatter.parse(string):
+            if literal_text and literal_text[-1] in "{}":
+                literal_text += literal_text[-1]
+
+            parser.feed(literal_text, raw=recursive)
+
+            if field_name is not None:
+                if field_name == "message":
+                    if recursive:
+                        messages_color_tokens.append(None)
+                    else:
+                        color_tokens = parser.current_color_tokens()
+                        messages_color_tokens.append(color_tokens)
+                field = "{%s" % field_name
+                if conversion:
+                    field += "!%s" % conversion
+                if format_spec:
+                    field += ":%s" % format_spec
+                field += "}"
+                parser.feed(field, raw=True)
+
+                _, color_tokens = Colorizer._parse_without_formatting(
+                    format_spec, recursion_depth=recursion_depth - 1, recursive=True
+                )
+                messages_color_tokens.extend(color_tokens)
+
+        return parser.done(), messages_color_tokens
diff --git a/loguru/_contextvars.py b/loguru/_contextvars.py
index 69d6932..2e8bbb2 100644
--- a/loguru/_contextvars.py
+++ b/loguru/_contextvars.py
@@ -1,2 +1,15 @@
 import sys
+
+
+def load_contextvar_class():
+    if sys.version_info >= (3, 7):
+        from contextvars import ContextVar
+    elif sys.version_info >= (3, 5, 3):
+        from aiocontextvars import ContextVar
+    else:
+        from contextvars import ContextVar
+
+    return ContextVar
+
+
 ContextVar = load_contextvar_class()
diff --git a/loguru/_ctime_functions.py b/loguru/_ctime_functions.py
index 0e6aed3..e232b42 100644
--- a/loguru/_ctime_functions.py
+++ b/loguru/_ctime_functions.py
@@ -1,2 +1,57 @@
 import os
+
+
+def load_ctime_functions():
+    if os.name == "nt":
+        import win32_setctime
+
+        def get_ctime_windows(filepath):
+            return os.stat(filepath).st_ctime
+
+        def set_ctime_windows(filepath, timestamp):
+            if not win32_setctime.SUPPORTED:
+                return
+
+            try:
+                win32_setctime.setctime(filepath, timestamp)
+            except (OSError, ValueError):
+                pass
+
+        return get_ctime_windows, set_ctime_windows
+
+    elif hasattr(os.stat_result, "st_birthtime"):
+
+        def get_ctime_macos(filepath):
+            return os.stat(filepath).st_birthtime
+
+        def set_ctime_macos(filepath, timestamp):
+            pass
+
+        return get_ctime_macos, set_ctime_macos
+
+    elif hasattr(os, "getxattr") and hasattr(os, "setxattr"):
+
+        def get_ctime_linux(filepath):
+            try:
+                return float(os.getxattr(filepath, b"user.loguru_crtime"))
+            except OSError:
+                return os.stat(filepath).st_mtime
+
+        def set_ctime_linux(filepath, timestamp):
+            try:
+                os.setxattr(filepath, b"user.loguru_crtime", str(timestamp).encode("ascii"))
+            except OSError:
+                pass
+
+        return get_ctime_linux, set_ctime_linux
+
+    def get_ctime_fallback(filepath):
+        return os.stat(filepath).st_mtime
+
+    def set_ctime_fallback(filepath, timestamp):
+        pass
+
+    return get_ctime_fallback, set_ctime_fallback
+
+
 get_ctime, set_ctime = load_ctime_functions()
diff --git a/loguru/_datetime.py b/loguru/_datetime.py
index 754c834..76626db 100644
--- a/loguru/_datetime.py
+++ b/loguru/_datetime.py
@@ -3,58 +3,103 @@ from calendar import day_abbr, day_name, month_abbr, month_name
 from datetime import datetime as datetime_
 from datetime import timedelta, timezone
 from time import localtime, strftime
-tokens = (
-    'H{1,2}|h{1,2}|m{1,2}|s{1,2}|S+|YYYY|YY|M{1,4}|D{1,4}|Z{1,2}|zz|A|X|x|E|Q|dddd|ddd|d'
-    )
-pattern = re.compile('(?:{0})|\\[(?:{0}|!UTC|)\\]'.format(tokens))

+tokens = r"H{1,2}|h{1,2}|m{1,2}|s{1,2}|S+|YYYY|YY|M{1,4}|D{1,4}|Z{1,2}|zz|A|X|x|E|Q|dddd|ddd|d"

-class datetime(datetime_):
+pattern = re.compile(r"(?:{0})|\[(?:{0}|!UTC|)\]".format(tokens))

+
+class datetime(datetime_):  # noqa: N801
     def __format__(self, spec):
-        if spec.endswith('!UTC'):
+        if spec.endswith("!UTC"):
             dt = self.astimezone(timezone.utc)
             spec = spec[:-4]
         else:
             dt = self
+
         if not spec:
-            spec = '%Y-%m-%dT%H:%M:%S.%f%z'
-        if '%' in spec:
+            spec = "%Y-%m-%dT%H:%M:%S.%f%z"
+
+        if "%" in spec:
             return datetime_.__format__(dt, spec)
-        if 'SSSSSSS' in spec:
+
+        if "SSSSSSS" in spec:
             raise ValueError(
-                "Invalid time format: the provided format string contains more than six successive 'S' characters. This may be due to an attempt to use nanosecond precision, which is not supported."
-                )
-        year, month, day, hour, minute, second, weekday, yearday, _ = (dt.
-            timetuple())
+                "Invalid time format: the provided format string contains more than six successive "
+                "'S' characters. This may be due to an attempt to use nanosecond precision, which "
+                "is not supported."
+            )
+
+        year, month, day, hour, minute, second, weekday, yearday, _ = dt.timetuple()
         microsecond = dt.microsecond
         timestamp = dt.timestamp()
         tzinfo = dt.tzinfo or timezone(timedelta(seconds=0))
         offset = tzinfo.utcoffset(dt).total_seconds()
-        sign = ('-', '+')[offset >= 0]
+        sign = ("-", "+")[offset >= 0]
         (h, m), s = divmod(abs(offset // 60), 60), abs(offset) % 60
-        rep = {'YYYY': '%04d' % year, 'YY': '%02d' % (year % 100), 'Q': 
-            '%d' % ((month - 1) // 3 + 1), 'MMMM': month_name[month], 'MMM':
-            month_abbr[month], 'MM': '%02d' % month, 'M': '%d' % month,
-            'DDDD': '%03d' % yearday, 'DDD': '%d' % yearday, 'DD': '%02d' %
-            day, 'D': '%d' % day, 'dddd': day_name[weekday], 'ddd':
-            day_abbr[weekday], 'd': '%d' % weekday, 'E': '%d' % (weekday + 
-            1), 'HH': '%02d' % hour, 'H': '%d' % hour, 'hh': '%02d' % ((
-            hour - 1) % 12 + 1), 'h': '%d' % ((hour - 1) % 12 + 1), 'mm': 
-            '%02d' % minute, 'm': '%d' % minute, 'ss': '%02d' % second, 's':
-            '%d' % second, 'S': '%d' % (microsecond // 100000), 'SS': 
-            '%02d' % (microsecond // 10000), 'SSS': '%03d' % (microsecond //
-            1000), 'SSSS': '%04d' % (microsecond // 100), 'SSSSS': '%05d' %
-            (microsecond // 10), 'SSSSSS': '%06d' % microsecond, 'A': ('AM',
-            'PM')[hour // 12], 'Z': '%s%02d:%02d%s' % (sign, h, m, (
-            ':%09.06f' % s)[:11 if s % 1 else 3] * (s > 0)), 'ZZ': 
-            '%s%02d%02d%s' % (sign, h, m, ('%09.06f' % s)[:10 if s % 1 else
-            2] * (s > 0)), 'zz': tzinfo.tzname(dt) or '', 'X': '%d' %
-            timestamp, 'x': '%d' % (int(timestamp) * 1000000 + microsecond)}
+
+        rep = {
+            "YYYY": "%04d" % year,
+            "YY": "%02d" % (year % 100),
+            "Q": "%d" % ((month - 1) // 3 + 1),
+            "MMMM": month_name[month],
+            "MMM": month_abbr[month],
+            "MM": "%02d" % month,
+            "M": "%d" % month,
+            "DDDD": "%03d" % yearday,
+            "DDD": "%d" % yearday,
+            "DD": "%02d" % day,
+            "D": "%d" % day,
+            "dddd": day_name[weekday],
+            "ddd": day_abbr[weekday],
+            "d": "%d" % weekday,
+            "E": "%d" % (weekday + 1),
+            "HH": "%02d" % hour,
+            "H": "%d" % hour,
+            "hh": "%02d" % ((hour - 1) % 12 + 1),
+            "h": "%d" % ((hour - 1) % 12 + 1),
+            "mm": "%02d" % minute,
+            "m": "%d" % minute,
+            "ss": "%02d" % second,
+            "s": "%d" % second,
+            "S": "%d" % (microsecond // 100000),
+            "SS": "%02d" % (microsecond // 10000),
+            "SSS": "%03d" % (microsecond // 1000),
+            "SSSS": "%04d" % (microsecond // 100),
+            "SSSSS": "%05d" % (microsecond // 10),
+            "SSSSSS": "%06d" % microsecond,
+            "A": ("AM", "PM")[hour // 12],
+            "Z": "%s%02d:%02d%s" % (sign, h, m, (":%09.06f" % s)[: 11 if s % 1 else 3] * (s > 0)),
+            "ZZ": "%s%02d%02d%s" % (sign, h, m, ("%09.06f" % s)[: 10 if s % 1 else 2] * (s > 0)),
+            "zz": tzinfo.tzname(dt) or "",
+            "X": "%d" % timestamp,
+            "x": "%d" % (int(timestamp) * 1000000 + microsecond),
+        }

         def get(m):
             try:
                 return rep[m.group(0)]
             except KeyError:
                 return m.group(0)[1:-1]
+
         return pattern.sub(get, spec)
+
+
+def aware_now():
+    now = datetime_.now()
+    timestamp = now.timestamp()
+    local = localtime(timestamp)
+
+    try:
+        seconds = local.tm_gmtoff
+        zone = local.tm_zone
+    except AttributeError:
+        # Workaround for Python 3.5.
+        utc_naive = datetime_.fromtimestamp(timestamp, tz=timezone.utc).replace(tzinfo=None)
+        offset = datetime_.fromtimestamp(timestamp) - utc_naive
+        seconds = offset.total_seconds()
+        zone = strftime("%Z")
+
+    tzinfo = timezone(timedelta(seconds=seconds), zone)
+
+    return datetime.combine(now.date(), now.time().replace(tzinfo=tzinfo))
diff --git a/loguru/_defaults.py b/loguru/_defaults.py
index 2e33c3d..d3d8de7 100644
--- a/loguru/_defaults.py
+++ b/loguru/_defaults.py
@@ -1,35 +1,74 @@
 from os import environ
-LOGURU_AUTOINIT = env('LOGURU_AUTOINIT', bool, True)
-LOGURU_FORMAT = env('LOGURU_FORMAT', str,
-    '<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>'
-    )
-LOGURU_FILTER = env('LOGURU_FILTER', str, None)
-LOGURU_LEVEL = env('LOGURU_LEVEL', str, 'DEBUG')
-LOGURU_COLORIZE = env('LOGURU_COLORIZE', bool, None)
-LOGURU_SERIALIZE = env('LOGURU_SERIALIZE', bool, False)
-LOGURU_BACKTRACE = env('LOGURU_BACKTRACE', bool, True)
-LOGURU_DIAGNOSE = env('LOGURU_DIAGNOSE', bool, True)
-LOGURU_ENQUEUE = env('LOGURU_ENQUEUE', bool, False)
-LOGURU_CONTEXT = env('LOGURU_CONTEXT', str, None)
-LOGURU_CATCH = env('LOGURU_CATCH', bool, True)
-LOGURU_TRACE_NO = env('LOGURU_TRACE_NO', int, 5)
-LOGURU_TRACE_COLOR = env('LOGURU_TRACE_COLOR', str, '<cyan><bold>')
-LOGURU_TRACE_ICON = env('LOGURU_TRACE_ICON', str, '✏️')
-LOGURU_DEBUG_NO = env('LOGURU_DEBUG_NO', int, 10)
-LOGURU_DEBUG_COLOR = env('LOGURU_DEBUG_COLOR', str, '<blue><bold>')
-LOGURU_DEBUG_ICON = env('LOGURU_DEBUG_ICON', str, '🐞')
-LOGURU_INFO_NO = env('LOGURU_INFO_NO', int, 20)
-LOGURU_INFO_COLOR = env('LOGURU_INFO_COLOR', str, '<bold>')
-LOGURU_INFO_ICON = env('LOGURU_INFO_ICON', str, 'ℹ️')
-LOGURU_SUCCESS_NO = env('LOGURU_SUCCESS_NO', int, 25)
-LOGURU_SUCCESS_COLOR = env('LOGURU_SUCCESS_COLOR', str, '<green><bold>')
-LOGURU_SUCCESS_ICON = env('LOGURU_SUCCESS_ICON', str, '✅')
-LOGURU_WARNING_NO = env('LOGURU_WARNING_NO', int, 30)
-LOGURU_WARNING_COLOR = env('LOGURU_WARNING_COLOR', str, '<yellow><bold>')
-LOGURU_WARNING_ICON = env('LOGURU_WARNING_ICON', str, '⚠️')
-LOGURU_ERROR_NO = env('LOGURU_ERROR_NO', int, 40)
-LOGURU_ERROR_COLOR = env('LOGURU_ERROR_COLOR', str, '<red><bold>')
-LOGURU_ERROR_ICON = env('LOGURU_ERROR_ICON', str, '❌')
-LOGURU_CRITICAL_NO = env('LOGURU_CRITICAL_NO', int, 50)
-LOGURU_CRITICAL_COLOR = env('LOGURU_CRITICAL_COLOR', str, '<RED><bold>')
-LOGURU_CRITICAL_ICON = env('LOGURU_CRITICAL_ICON', str, '☠️')
+
+
+def env(key, type_, default=None):
+    if key not in environ:
+        return default
+
+    val = environ[key]
+
+    if type_ == str:
+        return val
+    elif type_ == bool:
+        if val.lower() in ["1", "true", "yes", "y", "ok", "on"]:
+            return True
+        if val.lower() in ["0", "false", "no", "n", "nok", "off"]:
+            return False
+        raise ValueError(
+            "Invalid environment variable '%s' (expected a boolean): '%s'" % (key, val)
+        )
+    elif type_ == int:
+        try:
+            return int(val)
+        except ValueError:
+            raise ValueError(
+                "Invalid environment variable '%s' (expected an integer): '%s'" % (key, val)
+            ) from None
+
+
+LOGURU_AUTOINIT = env("LOGURU_AUTOINIT", bool, True)
+
+LOGURU_FORMAT = env(
+    "LOGURU_FORMAT",
+    str,
+    "<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
+    "<level>{level: <8}</level> | "
+    "<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
+)
+LOGURU_FILTER = env("LOGURU_FILTER", str, None)
+LOGURU_LEVEL = env("LOGURU_LEVEL", str, "DEBUG")
+LOGURU_COLORIZE = env("LOGURU_COLORIZE", bool, None)
+LOGURU_SERIALIZE = env("LOGURU_SERIALIZE", bool, False)
+LOGURU_BACKTRACE = env("LOGURU_BACKTRACE", bool, True)
+LOGURU_DIAGNOSE = env("LOGURU_DIAGNOSE", bool, True)
+LOGURU_ENQUEUE = env("LOGURU_ENQUEUE", bool, False)
+LOGURU_CONTEXT = env("LOGURU_CONTEXT", str, None)
+LOGURU_CATCH = env("LOGURU_CATCH", bool, True)
+
+LOGURU_TRACE_NO = env("LOGURU_TRACE_NO", int, 5)
+LOGURU_TRACE_COLOR = env("LOGURU_TRACE_COLOR", str, "<cyan><bold>")
+LOGURU_TRACE_ICON = env("LOGURU_TRACE_ICON", str, "\u270F\uFE0F")  # Pencil
+
+LOGURU_DEBUG_NO = env("LOGURU_DEBUG_NO", int, 10)
+LOGURU_DEBUG_COLOR = env("LOGURU_DEBUG_COLOR", str, "<blue><bold>")
+LOGURU_DEBUG_ICON = env("LOGURU_DEBUG_ICON", str, "\U0001F41E")  # Lady Beetle
+
+LOGURU_INFO_NO = env("LOGURU_INFO_NO", int, 20)
+LOGURU_INFO_COLOR = env("LOGURU_INFO_COLOR", str, "<bold>")
+LOGURU_INFO_ICON = env("LOGURU_INFO_ICON", str, "\u2139\uFE0F")  # Information
+
+LOGURU_SUCCESS_NO = env("LOGURU_SUCCESS_NO", int, 25)
+LOGURU_SUCCESS_COLOR = env("LOGURU_SUCCESS_COLOR", str, "<green><bold>")
+LOGURU_SUCCESS_ICON = env("LOGURU_SUCCESS_ICON", str, "\u2705")  # White Heavy Check Mark
+
+LOGURU_WARNING_NO = env("LOGURU_WARNING_NO", int, 30)
+LOGURU_WARNING_COLOR = env("LOGURU_WARNING_COLOR", str, "<yellow><bold>")
+LOGURU_WARNING_ICON = env("LOGURU_WARNING_ICON", str, "\u26A0\uFE0F")  # Warning
+
+LOGURU_ERROR_NO = env("LOGURU_ERROR_NO", int, 40)
+LOGURU_ERROR_COLOR = env("LOGURU_ERROR_COLOR", str, "<red><bold>")
+LOGURU_ERROR_ICON = env("LOGURU_ERROR_ICON", str, "\u274C")  # Cross Mark
+
+LOGURU_CRITICAL_NO = env("LOGURU_CRITICAL_NO", int, 50)
+LOGURU_CRITICAL_COLOR = env("LOGURU_CRITICAL_COLOR", str, "<RED><bold>")
+LOGURU_CRITICAL_ICON = env("LOGURU_CRITICAL_ICON", str, "\u2620\uFE0F")  # Skull and Crossbones
diff --git a/loguru/_error_interceptor.py b/loguru/_error_interceptor.py
index e9758e4..9f63d3d 100644
--- a/loguru/_error_interceptor.py
+++ b/loguru/_error_interceptor.py
@@ -3,7 +3,32 @@ import traceback


 class ErrorInterceptor:
-
     def __init__(self, should_catch, handler_id):
         self._should_catch = should_catch
         self._handler_id = handler_id
+
+    def should_catch(self):
+        return self._should_catch
+
+    def print(self, record=None, *, exception=None):
+        if not sys.stderr:
+            return
+
+        if exception is None:
+            type_, value, traceback_ = sys.exc_info()
+        else:
+            type_, value, traceback_ = (type(exception), exception, exception.__traceback__)
+
+        try:
+            sys.stderr.write("--- Logging error in Loguru Handler #%d ---\n" % self._handler_id)
+            try:
+                record_repr = str(record)
+            except Exception:
+                record_repr = "/!\\ Unprintable record /!\\"
+            sys.stderr.write("Record was: %s\n" % record_repr)
+            traceback.print_exception(type_, value, traceback_, None, sys.stderr)
+            sys.stderr.write("--- End of logging error ---\n")
+        except OSError:
+            pass
+        finally:
+            del type_, value, traceback_
diff --git a/loguru/_file_sink.py b/loguru/_file_sink.py
index 8053660..bdc6ccd 100644
--- a/loguru/_file_sink.py
+++ b/loguru/_file_sink.py
@@ -7,66 +7,146 @@ import shutil
 import string
 from functools import partial
 from stat import ST_DEV, ST_INO
+
 from . import _string_parsers as string_parsers
 from ._ctime_functions import get_ctime, set_ctime
 from ._datetime import aware_now


-class FileDateFormatter:
+def generate_rename_path(root, ext, creation_time):
+    creation_datetime = datetime.datetime.fromtimestamp(creation_time)
+    date = FileDateFormatter(creation_datetime)
+
+    renamed_path = "{}.{}{}".format(root, date, ext)
+    counter = 1
+
+    while os.path.exists(renamed_path):
+        counter += 1
+        renamed_path = "{}.{}.{}{}".format(root, date, counter, ext)
+
+    return renamed_path
+

+class FileDateFormatter:
     def __init__(self, datetime=None):
         self.datetime = datetime or aware_now()

     def __format__(self, spec):
         if not spec:
-            spec = '%Y-%m-%d_%H-%M-%S_%f'
+            spec = "%Y-%m-%d_%H-%M-%S_%f"
         return self.datetime.__format__(spec)


 class Compression:
-    pass
+    @staticmethod
+    def add_compress(path_in, path_out, opener, **kwargs):
+        with opener(path_out, **kwargs) as f_comp:
+            f_comp.add(path_in, os.path.basename(path_in))
+
+    @staticmethod
+    def write_compress(path_in, path_out, opener, **kwargs):
+        with opener(path_out, **kwargs) as f_comp:
+            f_comp.write(path_in, os.path.basename(path_in))
+
+    @staticmethod
+    def copy_compress(path_in, path_out, opener, **kwargs):
+        with open(path_in, "rb") as f_in:
+            with opener(path_out, **kwargs) as f_out:
+                shutil.copyfileobj(f_in, f_out)
+
+    @staticmethod
+    def compression(path_in, ext, compress_function):
+        path_out = "{}{}".format(path_in, ext)
+
+        if os.path.exists(path_out):
+            creation_time = get_ctime(path_out)
+            root, ext_before = os.path.splitext(path_in)
+            renamed_path = generate_rename_path(root, ext_before + ext, creation_time)
+            os.rename(path_out, renamed_path)
+        compress_function(path_in, path_out)
+        os.remove(path_in)


 class Retention:
-    pass
+    @staticmethod
+    def retention_count(logs, number):
+        def key_log(log):
+            return (-os.stat(log).st_mtime, log)
+
+        for log in sorted(logs, key=key_log)[number:]:
+            os.remove(log)
+
+    @staticmethod
+    def retention_age(logs, seconds):
+        t = datetime.datetime.now().timestamp()
+        for log in logs:
+            if os.stat(log).st_mtime <= t - seconds:
+                os.remove(log)


 class Rotation:
+    @staticmethod
+    def forward_day(t):
+        return t + datetime.timedelta(days=1)

+    @staticmethod
+    def forward_weekday(t, weekday):
+        while True:
+            t += datetime.timedelta(days=1)
+            if t.weekday() == weekday:
+                return t

-    class RotationTime:
+    @staticmethod
+    def forward_interval(t, interval):
+        return t + interval

+    @staticmethod
+    def rotation_size(message, file, size_limit):
+        file.seek(0, 2)
+        return file.tell() + len(message) > size_limit
+
+    class RotationTime:
         def __init__(self, step_forward, time_init=None):
             self._step_forward = step_forward
             self._time_init = time_init
             self._limit = None

         def __call__(self, message, file):
-            record_time = message.record['time']
+            record_time = message.record["time"]
+
             if self._limit is None:
                 filepath = os.path.realpath(file.name)
                 creation_time = get_ctime(filepath)
                 set_ctime(filepath, creation_time)
-                start_time = datetime.datetime.fromtimestamp(creation_time,
-                    tz=datetime.timezone.utc)
+                start_time = datetime.datetime.fromtimestamp(
+                    creation_time, tz=datetime.timezone.utc
+                )
+
                 time_init = self._time_init
+
                 if time_init is None:
-                    limit = start_time.astimezone(record_time.tzinfo).replace(
-                        tzinfo=None)
+                    limit = start_time.astimezone(record_time.tzinfo).replace(tzinfo=None)
                     limit = self._step_forward(limit)
                 else:
-                    tzinfo = (record_time.tzinfo if time_init.tzinfo is
-                        None else time_init.tzinfo)
-                    limit = start_time.astimezone(tzinfo).replace(hour=
-                        time_init.hour, minute=time_init.minute, second=
-                        time_init.second, microsecond=time_init.microsecond)
+                    tzinfo = record_time.tzinfo if time_init.tzinfo is None else time_init.tzinfo
+                    limit = start_time.astimezone(tzinfo).replace(
+                        hour=time_init.hour,
+                        minute=time_init.minute,
+                        second=time_init.second,
+                        microsecond=time_init.microsecond,
+                    )
+
                     if limit <= start_time:
                         limit = self._step_forward(limit)
+
                     if time_init.tzinfo is None:
                         limit = limit.replace(tzinfo=None)
+
                 self._limit = limit
+
             if self._limit.tzinfo is None:
                 record_time = record_time.replace(tzinfo=None)
+
             if record_time >= self._limit:
                 while self._limit <= record_time:
                     self._limit = self._step_forward(self._limit)
@@ -75,25 +155,280 @@ class Rotation:


 class FileSink:
-
-    def __init__(self, path, *, rotation=None, retention=None, compression=
-        None, delay=False, watch=False, mode='a', buffering=1, encoding=
-        'utf8', **kwargs):
+    def __init__(
+        self,
+        path,
+        *,
+        rotation=None,
+        retention=None,
+        compression=None,
+        delay=False,
+        watch=False,
+        mode="a",
+        buffering=1,
+        encoding="utf8",
+        **kwargs
+    ):
         self.encoding = encoding
-        self._kwargs = {**kwargs, 'mode': mode, 'buffering': buffering,
-            'encoding': self.encoding}
+
+        self._kwargs = {**kwargs, "mode": mode, "buffering": buffering, "encoding": self.encoding}
         self._path = str(path)
+
         self._glob_patterns = self._make_glob_patterns(self._path)
         self._rotation_function = self._make_rotation_function(rotation)
         self._retention_function = self._make_retention_function(retention)
-        self._compression_function = self._make_compression_function(
-            compression)
+        self._compression_function = self._make_compression_function(compression)
+
         self._file = None
         self._file_path = None
+
         self._watch = watch
         self._file_dev = -1
         self._file_ino = -1
+
         if not delay:
             path = self._create_path()
             self._create_dirs(path)
             self._create_file(path)
+
+    def write(self, message):
+        if self._file is None:
+            path = self._create_path()
+            self._create_dirs(path)
+            self._create_file(path)
+
+        if self._watch:
+            self._reopen_if_needed()
+
+        if self._rotation_function is not None and self._rotation_function(message, self._file):
+            self._terminate_file(is_rotating=True)
+
+        self._file.write(message)
+
+    def stop(self):
+        if self._watch:
+            self._reopen_if_needed()
+
+        self._terminate_file(is_rotating=False)
+
+    def tasks_to_complete(self):
+        return []
+
+    def _create_path(self):
+        path = self._path.format_map({"time": FileDateFormatter()})
+        return os.path.abspath(path)
+
+    def _create_dirs(self, path):
+        dirname = os.path.dirname(path)
+        os.makedirs(dirname, exist_ok=True)
+
+    def _create_file(self, path):
+        self._file = open(path, **self._kwargs)
+        self._file_path = path
+
+        if self._watch:
+            fileno = self._file.fileno()
+            result = os.fstat(fileno)
+            self._file_dev = result[ST_DEV]
+            self._file_ino = result[ST_INO]
+
+    def _close_file(self):
+        self._file.flush()
+        self._file.close()
+
+        self._file = None
+        self._file_path = None
+        self._file_dev = -1
+        self._file_ino = -1
+
+    def _reopen_if_needed(self):
+        # Implemented based on standard library:
+        # https://github.com/python/cpython/blob/cb589d1b/Lib/logging/handlers.py#L486
+        if not self._file:
+            return
+
+        filepath = self._file_path
+
+        try:
+            result = os.stat(filepath)
+        except FileNotFoundError:
+            result = None
+
+        if not result or result[ST_DEV] != self._file_dev or result[ST_INO] != self._file_ino:
+            self._close_file()
+            self._create_dirs(filepath)
+            self._create_file(filepath)
+
+    def _terminate_file(self, *, is_rotating=False):
+        old_path = self._file_path
+
+        if self._file is not None:
+            self._close_file()
+
+        if is_rotating:
+            new_path = self._create_path()
+            self._create_dirs(new_path)
+
+            if new_path == old_path:
+                creation_time = get_ctime(old_path)
+                root, ext = os.path.splitext(old_path)
+                renamed_path = generate_rename_path(root, ext, creation_time)
+                os.rename(old_path, renamed_path)
+                old_path = renamed_path
+
+        if is_rotating or self._rotation_function is None:
+            if self._compression_function is not None and old_path is not None:
+                self._compression_function(old_path)
+
+            if self._retention_function is not None:
+                logs = {
+                    file
+                    for pattern in self._glob_patterns
+                    for file in glob.glob(pattern)
+                    if os.path.isfile(file)
+                }
+                self._retention_function(list(logs))
+
+        if is_rotating:
+            self._create_file(new_path)
+            set_ctime(new_path, datetime.datetime.now().timestamp())
+
+    @staticmethod
+    def _make_glob_patterns(path):
+        formatter = string.Formatter()
+        tokens = formatter.parse(path)
+        escaped = "".join(glob.escape(text) + "*" * (name is not None) for text, name, *_ in tokens)
+
+        root, ext = os.path.splitext(escaped)
+
+        if not ext:
+            return [escaped, escaped + ".*"]
+
+        return [escaped, escaped + ".*", root + ".*" + ext, root + ".*" + ext + ".*"]
+
+    @staticmethod
+    def _make_rotation_function(rotation):
+        if rotation is None:
+            return None
+        elif isinstance(rotation, str):
+            size = string_parsers.parse_size(rotation)
+            if size is not None:
+                return FileSink._make_rotation_function(size)
+            interval = string_parsers.parse_duration(rotation)
+            if interval is not None:
+                return FileSink._make_rotation_function(interval)
+            frequency = string_parsers.parse_frequency(rotation)
+            if frequency is not None:
+                return Rotation.RotationTime(frequency)
+            daytime = string_parsers.parse_daytime(rotation)
+            if daytime is not None:
+                day, time = daytime
+                if day is None:
+                    return FileSink._make_rotation_function(time)
+                if time is None:
+                    time = datetime.time(0, 0, 0)
+                step_forward = partial(Rotation.forward_weekday, weekday=day)
+                return Rotation.RotationTime(step_forward, time)
+            raise ValueError("Cannot parse rotation from: '%s'" % rotation)
+        elif isinstance(rotation, (numbers.Real, decimal.Decimal)):
+            return partial(Rotation.rotation_size, size_limit=rotation)
+        elif isinstance(rotation, datetime.time):
+            return Rotation.RotationTime(Rotation.forward_day, rotation)
+        elif isinstance(rotation, datetime.timedelta):
+            step_forward = partial(Rotation.forward_interval, interval=rotation)
+            return Rotation.RotationTime(step_forward)
+        elif callable(rotation):
+            return rotation
+        else:
+            raise TypeError(
+                "Cannot infer rotation for objects of type: '%s'" % type(rotation).__name__
+            )
+
+    @staticmethod
+    def _make_retention_function(retention):
+        if retention is None:
+            return None
+        elif isinstance(retention, str):
+            interval = string_parsers.parse_duration(retention)
+            if interval is None:
+                raise ValueError("Cannot parse retention from: '%s'" % retention)
+            return FileSink._make_retention_function(interval)
+        elif isinstance(retention, int):
+            return partial(Retention.retention_count, number=retention)
+        elif isinstance(retention, datetime.timedelta):
+            return partial(Retention.retention_age, seconds=retention.total_seconds())
+        elif callable(retention):
+            return retention
+        else:
+            raise TypeError(
+                "Cannot infer retention for objects of type: '%s'" % type(retention).__name__
+            )
+
+    @staticmethod
+    def _make_compression_function(compression):
+        if compression is None:
+            return None
+        elif isinstance(compression, str):
+            ext = compression.strip().lstrip(".")
+
+            if ext == "gz":
+                import gzip
+
+                compress = partial(Compression.copy_compress, opener=gzip.open, mode="wb")
+            elif ext == "bz2":
+                import bz2
+
+                compress = partial(Compression.copy_compress, opener=bz2.open, mode="wb")
+
+            elif ext == "xz":
+                import lzma
+
+                compress = partial(
+                    Compression.copy_compress, opener=lzma.open, mode="wb", format=lzma.FORMAT_XZ
+                )
+
+            elif ext == "lzma":
+                import lzma
+
+                compress = partial(
+                    Compression.copy_compress, opener=lzma.open, mode="wb", format=lzma.FORMAT_ALONE
+                )
+            elif ext == "tar":
+                import tarfile
+
+                compress = partial(Compression.add_compress, opener=tarfile.open, mode="w:")
+            elif ext == "tar.gz":
+                import gzip
+                import tarfile
+
+                compress = partial(Compression.add_compress, opener=tarfile.open, mode="w:gz")
+            elif ext == "tar.bz2":
+                import bz2
+                import tarfile
+
+                compress = partial(Compression.add_compress, opener=tarfile.open, mode="w:bz2")
+
+            elif ext == "tar.xz":
+                import lzma
+                import tarfile
+
+                compress = partial(Compression.add_compress, opener=tarfile.open, mode="w:xz")
+            elif ext == "zip":
+                import zipfile
+
+                compress = partial(
+                    Compression.write_compress,
+                    opener=zipfile.ZipFile,
+                    mode="w",
+                    compression=zipfile.ZIP_DEFLATED,
+                )
+            else:
+                raise ValueError("Invalid compression format: '%s'" % ext)
+
+            return partial(Compression.compression, ext="." + ext, compress_function=compress)
+        elif callable(compression):
+            return compression
+        else:
+            raise TypeError(
+                "Cannot infer compression for objects of type: '%s'" % type(compression).__name__
+            )
diff --git a/loguru/_filters.py b/loguru/_filters.py
index e69de29..a192b26 100644
--- a/loguru/_filters.py
+++ b/loguru/_filters.py
@@ -0,0 +1,24 @@
+def filter_none(record):
+    return record["name"] is not None
+
+
+def filter_by_name(record, parent, length):
+    name = record["name"]
+    if name is None:
+        return False
+    return (name + ".")[:length] == parent
+
+
+def filter_by_level(record, level_per_module):
+    name = record["name"]
+
+    while True:
+        level = level_per_module.get(name, None)
+        if level is False:
+            return False
+        if level is not None:
+            return record["level"].no >= level
+        if not name:
+            return True
+        index = name.rfind(".")
+        name = name[:index] if index != -1 else ""
diff --git a/loguru/_get_frame.py b/loguru/_get_frame.py
index 2acdfbf..ee6eaab 100644
--- a/loguru/_get_frame.py
+++ b/loguru/_get_frame.py
@@ -1,3 +1,23 @@
 import sys
 from sys import exc_info
+
+
+def get_frame_fallback(n):
+    try:
+        raise Exception
+    except Exception:
+        frame = exc_info()[2].tb_frame.f_back
+        for _ in range(n):
+            frame = frame.f_back
+        return frame
+
+
+def load_get_frame_function():
+    if hasattr(sys, "_getframe"):
+        get_frame = sys._getframe
+    else:
+        get_frame = get_frame_fallback
+    return get_frame
+
+
 get_frame = load_get_frame_function()
diff --git a/loguru/_handler.py b/loguru/_handler.py
index cbb5940..81a3dca 100644
--- a/loguru/_handler.py
+++ b/loguru/_handler.py
@@ -5,20 +5,48 @@ import os
 import threading
 from contextlib import contextmanager
 from threading import Thread
+
 from ._colorizer import Colorizer
 from ._locks_machinery import create_handler_lock


+def prepare_colored_format(format_, ansi_level):
+    colored = Colorizer.prepare_format(format_)
+    return colored, colored.colorize(ansi_level)
+
+
+def prepare_stripped_format(format_):
+    colored = Colorizer.prepare_format(format_)
+    return colored.strip()
+
+
+def memoize(function):
+    return functools.lru_cache(maxsize=64)(function)
+
+
 class Message(str):
-    __slots__ = 'record',
+    __slots__ = ("record",)


 class Handler:
-
-    def __init__(self, *, sink, name, levelno, formatter,
-        is_formatter_dynamic, filter_, colorize, serialize, enqueue,
-        multiprocessing_context, error_interceptor, exception_formatter,
-        id_, levels_ansi_codes):
+    def __init__(
+        self,
+        *,
+        sink,
+        name,
+        levelno,
+        formatter,
+        is_formatter_dynamic,
+        filter_,
+        colorize,
+        serialize,
+        enqueue,
+        multiprocessing_context,
+        error_interceptor,
+        exception_formatter,
+        id_,
+        levels_ansi_codes
+    ):
         self._name = name
         self._sink = sink
         self._levelno = levelno
@@ -32,10 +60,12 @@ class Handler:
         self._error_interceptor = error_interceptor
         self._exception_formatter = exception_formatter
         self._id = id_
-        self._levels_ansi_codes = levels_ansi_codes
+        self._levels_ansi_codes = levels_ansi_codes  # Warning, reference shared among handlers
+
         self._decolorized_format = None
         self._precolorized_formats = {}
         self._memoize_dynamic_format = None
+
         self._stopped = False
         self._lock = create_handler_lock()
         self._lock_acquired = threading.local()
@@ -45,16 +75,19 @@ class Handler:
         self._confirmation_lock = None
         self._owner_process_pid = None
         self._thread = None
+
         if self._is_formatter_dynamic:
             if self._colorize:
                 self._memoize_dynamic_format = memoize(prepare_colored_format)
             else:
                 self._memoize_dynamic_format = memoize(prepare_stripped_format)
-        elif self._colorize:
-            for level_name in self._levels_ansi_codes:
-                self.update_format(level_name)
         else:
-            self._decolorized_format = self._formatter.strip()
+            if self._colorize:
+                for level_name in self._levels_ansi_codes:
+                    self.update_format(level_name)
+            else:
+                self._decolorized_format = self._formatter.strip()
+
         if self._enqueue:
             if self._multiprocessing_context is None:
                 self._queue = multiprocessing.SimpleQueue()
@@ -62,34 +95,237 @@ class Handler:
                 self._confirmation_lock = multiprocessing.Lock()
             else:
                 self._queue = self._multiprocessing_context.SimpleQueue()
-                self._confirmation_event = self._multiprocessing_context.Event(
-                    )
+                self._confirmation_event = self._multiprocessing_context.Event()
                 self._confirmation_lock = self._multiprocessing_context.Lock()
             self._queue_lock = create_handler_lock()
             self._owner_process_pid = os.getpid()
-            self._thread = Thread(target=self._queued_writer, daemon=True,
-                name='loguru-writer-%d' % self._id)
+            self._thread = Thread(
+                target=self._queued_writer, daemon=True, name="loguru-writer-%d" % self._id
+            )
             self._thread.start()

     def __repr__(self):
-        return '(id=%d, level=%d, sink=%s)' % (self._id, self._levelno,
-            self._name)
+        return "(id=%d, level=%d, sink=%s)" % (self._id, self._levelno, self._name)

     @contextmanager
     def _protected_lock(self):
         """Acquire the lock, but fail fast if its already acquired by the current thread."""
-        pass
+        if getattr(self._lock_acquired, "acquired", False):
+            raise RuntimeError(
+                "Could not acquire internal lock because it was already in use (deadlock avoided). "
+                "This likely happened because the logger was re-used inside a sink, a signal "
+                "handler or a '__del__' method. This is not permitted because the logger and its "
+                "handlers are not re-entrant."
+            )
+        self._lock_acquired.acquired = True
+        try:
+            with self._lock:
+                yield
+        finally:
+            self._lock_acquired.acquired = False
+
+    def emit(self, record, level_id, from_decorator, is_raw, colored_message):
+        try:
+            if self._levelno > record["level"].no:
+                return
+
+            if self._filter is not None:
+                if not self._filter(record):
+                    return
+
+            if self._is_formatter_dynamic:
+                dynamic_format = self._formatter(record)
+
+            formatter_record = record.copy()
+
+            if not record["exception"]:
+                formatter_record["exception"] = ""
+            else:
+                type_, value, tb = record["exception"]
+                formatter = self._exception_formatter
+                lines = formatter.format_exception(type_, value, tb, from_decorator=from_decorator)
+                formatter_record["exception"] = "".join(lines)
+
+            if colored_message is not None and colored_message.stripped != record["message"]:
+                colored_message = None
+
+            if is_raw:
+                if colored_message is None or not self._colorize:
+                    formatted = record["message"]
+                else:
+                    ansi_level = self._levels_ansi_codes[level_id]
+                    formatted = colored_message.colorize(ansi_level)
+            elif self._is_formatter_dynamic:
+                if not self._colorize:
+                    precomputed_format = self._memoize_dynamic_format(dynamic_format)
+                    formatted = precomputed_format.format_map(formatter_record)
+                elif colored_message is None:
+                    ansi_level = self._levels_ansi_codes[level_id]
+                    _, precomputed_format = self._memoize_dynamic_format(dynamic_format, ansi_level)
+                    formatted = precomputed_format.format_map(formatter_record)
+                else:
+                    ansi_level = self._levels_ansi_codes[level_id]
+                    formatter, precomputed_format = self._memoize_dynamic_format(
+                        dynamic_format, ansi_level
+                    )
+                    coloring_message = formatter.make_coloring_message(
+                        record["message"], ansi_level=ansi_level, colored_message=colored_message
+                    )
+                    formatter_record["message"] = coloring_message
+                    formatted = precomputed_format.format_map(formatter_record)
+
+            else:
+                if not self._colorize:
+                    precomputed_format = self._decolorized_format
+                    formatted = precomputed_format.format_map(formatter_record)
+                elif colored_message is None:
+                    ansi_level = self._levels_ansi_codes[level_id]
+                    precomputed_format = self._precolorized_formats[level_id]
+                    formatted = precomputed_format.format_map(formatter_record)
+                else:
+                    ansi_level = self._levels_ansi_codes[level_id]
+                    precomputed_format = self._precolorized_formats[level_id]
+                    coloring_message = self._formatter.make_coloring_message(
+                        record["message"], ansi_level=ansi_level, colored_message=colored_message
+                    )
+                    formatter_record["message"] = coloring_message
+                    formatted = precomputed_format.format_map(formatter_record)
+
+            if self._serialize:
+                formatted = self._serialize_record(formatted, record)
+
+            str_record = Message(formatted)
+            str_record.record = record
+
+            with self._protected_lock():
+                if self._stopped:
+                    return
+                if self._enqueue:
+                    self._queue.put(str_record)
+                else:
+                    self._sink.write(str_record)
+        except Exception:
+            if not self._error_interceptor.should_catch():
+                raise
+            self._error_interceptor.print(record)
+
+    def stop(self):
+        with self._protected_lock():
+            self._stopped = True
+            if self._enqueue:
+                if self._owner_process_pid != os.getpid():
+                    return
+                self._queue.put(None)
+                self._thread.join()
+                if hasattr(self._queue, "close"):
+                    self._queue.close()
+
+            self._sink.stop()
+
+    def complete_queue(self):
+        if not self._enqueue:
+            return
+
+        with self._confirmation_lock:
+            self._queue.put(True)
+            self._confirmation_event.wait()
+            self._confirmation_event.clear()
+
+    def tasks_to_complete(self):
+        if self._enqueue and self._owner_process_pid != os.getpid():
+            return []
+        lock = self._queue_lock if self._enqueue else self._protected_lock()
+        with lock:
+            return self._sink.tasks_to_complete()
+
+    def update_format(self, level_id):
+        if not self._colorize or self._is_formatter_dynamic:
+            return
+        ansi_code = self._levels_ansi_codes[level_id]
+        self._precolorized_formats[level_id] = self._formatter.colorize(ansi_code)
+
+    @property
+    def levelno(self):
+        return self._levelno
+
+    @staticmethod
+    def _serialize_record(text, record):
+        exception = record["exception"]
+
+        if exception is not None:
+            exception = {
+                "type": None if exception.type is None else exception.type.__name__,
+                "value": exception.value,
+                "traceback": bool(exception.traceback),
+            }
+
+        serializable = {
+            "text": text,
+            "record": {
+                "elapsed": {
+                    "repr": record["elapsed"],
+                    "seconds": record["elapsed"].total_seconds(),
+                },
+                "exception": exception,
+                "extra": record["extra"],
+                "file": {"name": record["file"].name, "path": record["file"].path},
+                "function": record["function"],
+                "level": {
+                    "icon": record["level"].icon,
+                    "name": record["level"].name,
+                    "no": record["level"].no,
+                },
+                "line": record["line"],
+                "message": record["message"],
+                "module": record["module"],
+                "name": record["name"],
+                "process": {"id": record["process"].id, "name": record["process"].name},
+                "thread": {"id": record["thread"].id, "name": record["thread"].name},
+                "time": {"repr": record["time"], "timestamp": record["time"].timestamp()},
+            },
+        }
+
+        return json.dumps(serializable, default=str, ensure_ascii=False) + "\n"
+
+    def _queued_writer(self):
+        message = None
+        queue = self._queue
+
+        # We need to use a lock to protect sink during fork.
+        # Particularly, writing to stderr may lead to deadlock in child process.
+        lock = self._queue_lock
+
+        while True:
+            try:
+                message = queue.get()
+            except Exception:
+                with lock:
+                    self._error_interceptor.print(None)
+                continue
+
+            if message is None:
+                break
+
+            if message is True:
+                self._confirmation_event.set()
+                continue
+
+            with lock:
+                try:
+                    self._sink.write(message)
+                except Exception:
+                    self._error_interceptor.print(message.record)

     def __getstate__(self):
         state = self.__dict__.copy()
-        state['_lock'] = None
-        state['_lock_acquired'] = None
-        state['_memoize_dynamic_format'] = None
+        state["_lock"] = None
+        state["_lock_acquired"] = None
+        state["_memoize_dynamic_format"] = None
         if self._enqueue:
-            state['_sink'] = None
-            state['_thread'] = None
-            state['_owner_process'] = None
-            state['_queue_lock'] = None
+            state["_sink"] = None
+            state["_thread"] = None
+            state["_owner_process"] = None
+            state["_queue_lock"] = None
         return state

     def __setstate__(self, state):
diff --git a/loguru/_locks_machinery.py b/loguru/_locks_machinery.py
index a65aee1..6f02110 100644
--- a/loguru/_locks_machinery.py
+++ b/loguru/_locks_machinery.py
@@ -1,9 +1,50 @@
 import os
 import threading
 import weakref
-if not hasattr(os, 'register_at_fork'):
+
+if not hasattr(os, "register_at_fork"):
+
+    def create_logger_lock():
+        return threading.Lock()
+
+    def create_handler_lock():
+        return threading.Lock()
+
 else:
+    # While forking, we need to sanitize all locks to make sure the child process doesn't run into
+    # a deadlock (if a lock already acquired is inherited) and to protect sink from corrupted state.
+    # It's very important to acquire logger locks before handlers one to prevent possible deadlock
+    # while 'remove()' is called for example.
+
     logger_locks = weakref.WeakSet()
     handler_locks = weakref.WeakSet()
-    os.register_at_fork(before=acquire_locks, after_in_parent=release_locks,
-        after_in_child=release_locks)
+
+    def acquire_locks():
+        for lock in logger_locks:
+            lock.acquire()
+
+        for lock in handler_locks:
+            lock.acquire()
+
+    def release_locks():
+        for lock in logger_locks:
+            lock.release()
+
+        for lock in handler_locks:
+            lock.release()
+
+    os.register_at_fork(
+        before=acquire_locks,
+        after_in_parent=release_locks,
+        after_in_child=release_locks,
+    )
+
+    def create_logger_lock():
+        lock = threading.Lock()
+        logger_locks.add(lock)
+        return lock
+
+    def create_handler_lock():
+        lock = threading.Lock()
+        handler_locks.add(lock)
+        return lock
diff --git a/loguru/_logger.py b/loguru/_logger.py
index 8812d5c..f750967 100644
--- a/loguru/_logger.py
+++ b/loguru/_logger.py
@@ -95,6 +95,7 @@ from multiprocessing import current_process, get_context
 from multiprocessing.context import BaseContext
 from os.path import basename, splitext
 from threading import current_thread
+
 from . import _asyncio_loop, _colorama, _defaults, _filters
 from ._better_exceptions import ExceptionFormatter
 from ._colorizer import Colorizer
@@ -107,49 +108,95 @@ from ._handler import Handler
 from ._locks_machinery import create_logger_lock
 from ._recattrs import RecordException, RecordFile, RecordLevel, RecordProcess, RecordThread
 from ._simple_sinks import AsyncSink, CallableSink, StandardSink, StreamSink
+
 if sys.version_info >= (3, 6):
     from os import PathLike
 else:
     from pathlib import PurePath as PathLike
-Level = namedtuple('Level', ['name', 'no', 'color', 'icon'])
+
+
+Level = namedtuple("Level", ["name", "no", "color", "icon"])
+
 start_time = aware_now()
-context = ContextVar('loguru_context', default={})

+context = ContextVar("loguru_context", default={})

-class Core:

+class Core:
     def __init__(self):
-        levels = [Level('TRACE', _defaults.LOGURU_TRACE_NO, _defaults.
-            LOGURU_TRACE_COLOR, _defaults.LOGURU_TRACE_ICON), Level('DEBUG',
-            _defaults.LOGURU_DEBUG_NO, _defaults.LOGURU_DEBUG_COLOR,
-            _defaults.LOGURU_DEBUG_ICON), Level('INFO', _defaults.
-            LOGURU_INFO_NO, _defaults.LOGURU_INFO_COLOR, _defaults.
-            LOGURU_INFO_ICON), Level('SUCCESS', _defaults.LOGURU_SUCCESS_NO,
-            _defaults.LOGURU_SUCCESS_COLOR, _defaults.LOGURU_SUCCESS_ICON),
-            Level('WARNING', _defaults.LOGURU_WARNING_NO, _defaults.
-            LOGURU_WARNING_COLOR, _defaults.LOGURU_WARNING_ICON), Level(
-            'ERROR', _defaults.LOGURU_ERROR_NO, _defaults.
-            LOGURU_ERROR_COLOR, _defaults.LOGURU_ERROR_ICON), Level(
-            'CRITICAL', _defaults.LOGURU_CRITICAL_NO, _defaults.
-            LOGURU_CRITICAL_COLOR, _defaults.LOGURU_CRITICAL_ICON)]
+        levels = [
+            Level(
+                "TRACE",
+                _defaults.LOGURU_TRACE_NO,
+                _defaults.LOGURU_TRACE_COLOR,
+                _defaults.LOGURU_TRACE_ICON,
+            ),
+            Level(
+                "DEBUG",
+                _defaults.LOGURU_DEBUG_NO,
+                _defaults.LOGURU_DEBUG_COLOR,
+                _defaults.LOGURU_DEBUG_ICON,
+            ),
+            Level(
+                "INFO",
+                _defaults.LOGURU_INFO_NO,
+                _defaults.LOGURU_INFO_COLOR,
+                _defaults.LOGURU_INFO_ICON,
+            ),
+            Level(
+                "SUCCESS",
+                _defaults.LOGURU_SUCCESS_NO,
+                _defaults.LOGURU_SUCCESS_COLOR,
+                _defaults.LOGURU_SUCCESS_ICON,
+            ),
+            Level(
+                "WARNING",
+                _defaults.LOGURU_WARNING_NO,
+                _defaults.LOGURU_WARNING_COLOR,
+                _defaults.LOGURU_WARNING_ICON,
+            ),
+            Level(
+                "ERROR",
+                _defaults.LOGURU_ERROR_NO,
+                _defaults.LOGURU_ERROR_COLOR,
+                _defaults.LOGURU_ERROR_ICON,
+            ),
+            Level(
+                "CRITICAL",
+                _defaults.LOGURU_CRITICAL_NO,
+                _defaults.LOGURU_CRITICAL_COLOR,
+                _defaults.LOGURU_CRITICAL_ICON,
+            ),
+        ]
         self.levels = {level.name: level for level in levels}
-        self.levels_ansi_codes = {**{name: Colorizer.ansify(level.color) for
-            name, level in self.levels.items()}, None: ''}
-        self.levels_lookup = {name: (name, name, level.no, level.icon) for 
-            name, level in self.levels.items()}
+        self.levels_ansi_codes = {
+            **{name: Colorizer.ansify(level.color) for name, level in self.levels.items()},
+            None: "",
+        }
+
+        # Cache used internally to quickly access level attributes based on their name or severity.
+        # It can also contain integers as keys, it serves to avoid calling "isinstance()" repeatedly
+        # when "logger.log()" is used.
+        self.levels_lookup = {
+            name: (name, name, level.no, level.icon) for name, level in self.levels.items()
+        }
+
         self.handlers_count = 0
         self.handlers = {}
+
         self.extra = {}
         self.patcher = None
-        self.min_level = float('inf')
+
+        self.min_level = float("inf")
         self.enabled = {}
         self.activation_list = []
         self.activation_none = True
+
         self.lock = create_logger_lock()

     def __getstate__(self):
         state = self.__dict__.copy()
-        state['lock'] = None
+        state["lock"] = None
         return state

     def __setstate__(self, state):
@@ -180,23 +227,30 @@ class Logger:
     You should not instantiate a |Logger| by yourself, use ``from loguru import logger`` instead.
     """

-    def __init__(self, core, exception, depth, record, lazy, colors, raw,
-        capture, patchers, extra):
+    def __init__(self, core, exception, depth, record, lazy, colors, raw, capture, patchers, extra):
         self._core = core
-        self._options = (exception, depth, record, lazy, colors, raw,
-            capture, patchers, extra)
+        self._options = (exception, depth, record, lazy, colors, raw, capture, patchers, extra)

     def __repr__(self):
-        return '<loguru.logger handlers=%r>' % list(self._core.handlers.
-            values())
-
-    def add(self, sink, *, level=_defaults.LOGURU_LEVEL, format=_defaults.
-        LOGURU_FORMAT, filter=_defaults.LOGURU_FILTER, colorize=_defaults.
-        LOGURU_COLORIZE, serialize=_defaults.LOGURU_SERIALIZE, backtrace=
-        _defaults.LOGURU_BACKTRACE, diagnose=_defaults.LOGURU_DIAGNOSE,
-        enqueue=_defaults.LOGURU_ENQUEUE, context=_defaults.LOGURU_CONTEXT,
-        catch=_defaults.LOGURU_CATCH, **kwargs):
-        """Add a handler sending log messages to a sink adequately configured.
+        return "<loguru.logger handlers=%r>" % list(self._core.handlers.values())
+
+    def add(
+        self,
+        sink,
+        *,
+        level=_defaults.LOGURU_LEVEL,
+        format=_defaults.LOGURU_FORMAT,
+        filter=_defaults.LOGURU_FILTER,
+        colorize=_defaults.LOGURU_COLORIZE,
+        serialize=_defaults.LOGURU_SERIALIZE,
+        backtrace=_defaults.LOGURU_BACKTRACE,
+        diagnose=_defaults.LOGURU_DIAGNOSE,
+        enqueue=_defaults.LOGURU_ENQUEUE,
+        context=_defaults.LOGURU_CONTEXT,
+        catch=_defaults.LOGURU_CATCH,
+        **kwargs
+    ):
+        r"""Add a handler sending log messages to a sink adequately configured.

         Parameters
         ----------
@@ -334,7 +388,7 @@ class Logger:
         If fine-grained control is needed, the ``format`` can also be a function which takes the
         record as parameter and return the format template string. However, note that in such a
         case, you should take care of appending the line ending and exception field to the returned
-        format, while ``"\\n{exception}"`` is automatically appended for convenience if ``format`` is
+        format, while ``"\n{exception}"`` is automatically appended for convenience if ``format`` is
         a string.

         The ``filter`` attribute can be used to control which messages are effectively passed to the
@@ -604,9 +658,9 @@ class Logger:

         Tags which are not recognized will raise an exception during parsing, to inform you about
         possible misuse. If you wish to display a markup tag literally, you can escape it by
-        prepending a ``\\`` like for example ``\\<blue>``. If, for some reason, you need to escape a
+        prepending a ``\`` like for example ``\<blue>``. If, for some reason, you need to escape a
         string programmatically, note that the regex used internally to parse markup tags is
-        ``r"\\\\?</?((?:[fb]g\\s)?[^<>\\s]*)>"``.
+        ``r"\\?</?((?:[fb]g\s)?[^<>\s]*)>"``.

         Note that when logging a message with ``opt(colors=True)``, color tags present in the
         formatting arguments (``args`` and ``kwargs``) are completely ignored. This is important if
@@ -722,7 +776,239 @@ class Logger:
         >>> stream_object = RandomStream(seed=12345, threshold=0.25)
         >>> logger.add(stream_object, level="INFO")
         """
-        pass
+        with self._core.lock:
+            handler_id = self._core.handlers_count
+            self._core.handlers_count += 1
+
+        error_interceptor = ErrorInterceptor(catch, handler_id)
+
+        if colorize is None and serialize:
+            colorize = False
+
+        if isinstance(sink, (str, PathLike)):
+            path = sink
+            name = "'%s'" % path
+
+            if colorize is None:
+                colorize = False
+
+            wrapped_sink = FileSink(path, **kwargs)
+            kwargs = {}
+            encoding = wrapped_sink.encoding
+            terminator = "\n"
+            exception_prefix = ""
+        elif hasattr(sink, "write") and callable(sink.write):
+            name = getattr(sink, "name", None) or repr(sink)
+
+            if colorize is None:
+                colorize = _colorama.should_colorize(sink)
+
+            if colorize is True and _colorama.should_wrap(sink):
+                stream = _colorama.wrap(sink)
+            else:
+                stream = sink
+
+            wrapped_sink = StreamSink(stream)
+            encoding = getattr(sink, "encoding", None)
+            terminator = "\n"
+            exception_prefix = ""
+        elif isinstance(sink, logging.Handler):
+            name = repr(sink)
+
+            if colorize is None:
+                colorize = False
+
+            wrapped_sink = StandardSink(sink)
+            encoding = getattr(sink, "encoding", None)
+            terminator = ""
+            exception_prefix = "\n"
+        elif iscoroutinefunction(sink) or iscoroutinefunction(
+            getattr(sink, "__call__", None)  # noqa: B004
+        ):
+            name = getattr(sink, "__name__", None) or repr(sink)
+
+            if colorize is None:
+                colorize = False
+
+            loop = kwargs.pop("loop", None)
+
+            # The worker thread needs an event loop, it can't create a new one internally because it
+            # has to be accessible by the user while calling "complete()", instead we use the global
+            # one when the sink is added. If "enqueue=False" the event loop is dynamically retrieved
+            # at each logging call, which is much more convenient. However, coroutine can't access
+            # running loop in Python 3.5.2 and earlier versions, see python/asyncio#452.
+            if enqueue and loop is None:
+                try:
+                    loop = _asyncio_loop.get_running_loop()
+                except RuntimeError as e:
+                    raise ValueError(
+                        "An event loop is required to add a coroutine sink with `enqueue=True`, "
+                        "but none has been passed as argument and none is currently running."
+                    ) from e
+
+            coro = sink if iscoroutinefunction(sink) else sink.__call__
+            wrapped_sink = AsyncSink(coro, loop, error_interceptor)
+            encoding = "utf8"
+            terminator = "\n"
+            exception_prefix = ""
+        elif callable(sink):
+            name = getattr(sink, "__name__", None) or repr(sink)
+
+            if colorize is None:
+                colorize = False
+
+            wrapped_sink = CallableSink(sink)
+            encoding = "utf8"
+            terminator = "\n"
+            exception_prefix = ""
+        else:
+            raise TypeError("Cannot log to objects of type '%s'" % type(sink).__name__)
+
+        if kwargs:
+            raise TypeError("add() got an unexpected keyword argument '%s'" % next(iter(kwargs)))
+
+        if filter is None:
+            filter_func = None
+        elif filter == "":
+            filter_func = _filters.filter_none
+        elif isinstance(filter, str):
+            parent = filter + "."
+            length = len(parent)
+            filter_func = functools.partial(_filters.filter_by_name, parent=parent, length=length)
+        elif isinstance(filter, dict):
+            level_per_module = {}
+            for module, level_ in filter.items():
+                if module is not None and not isinstance(module, str):
+                    raise TypeError(
+                        "The filter dict contains an invalid module, "
+                        "it should be a string (or None), not: '%s'" % type(module).__name__
+                    )
+                if level_ is False:
+                    levelno_ = False
+                elif level_ is True:
+                    levelno_ = 0
+                elif isinstance(level_, str):
+                    try:
+                        levelno_ = self.level(level_).no
+                    except ValueError:
+                        raise ValueError(
+                            "The filter dict contains a module '%s' associated to a level name "
+                            "which does not exist: '%s'" % (module, level_)
+                        ) from None
+                elif isinstance(level_, int):
+                    levelno_ = level_
+                else:
+                    raise TypeError(
+                        "The filter dict contains a module '%s' associated to an invalid level, "
+                        "it should be an integer, a string or a boolean, not: '%s'"
+                        % (module, type(level_).__name__)
+                    )
+                if levelno_ < 0:
+                    raise ValueError(
+                        "The filter dict contains a module '%s' associated to an invalid level, "
+                        "it should be a positive integer, not: '%d'" % (module, levelno_)
+                    )
+                level_per_module[module] = levelno_
+            filter_func = functools.partial(
+                _filters.filter_by_level, level_per_module=level_per_module
+            )
+        elif callable(filter):
+            if filter == builtins.filter:
+                raise ValueError(
+                    "The built-in 'filter()' function cannot be used as a 'filter' parameter, "
+                    "this is most likely a mistake (please double-check the arguments passed "
+                    "to 'logger.add()')."
+                )
+            filter_func = filter
+        else:
+            raise TypeError(
+                "Invalid filter, it should be a function, a string or a dict, not: '%s'"
+                % type(filter).__name__
+            )
+
+        if isinstance(level, str):
+            levelno = self.level(level).no
+        elif isinstance(level, int):
+            levelno = level
+        else:
+            raise TypeError(
+                "Invalid level, it should be an integer or a string, not: '%s'"
+                % type(level).__name__
+            )
+
+        if levelno < 0:
+            raise ValueError(
+                "Invalid level value, it should be a positive integer, not: %d" % levelno
+            )
+
+        if isinstance(format, str):
+            try:
+                formatter = Colorizer.prepare_format(format + terminator + "{exception}")
+            except ValueError as e:
+                raise ValueError(
+                    "Invalid format, color markups could not be parsed correctly"
+                ) from e
+            is_formatter_dynamic = False
+        elif callable(format):
+            if format == builtins.format:
+                raise ValueError(
+                    "The built-in 'format()' function cannot be used as a 'format' parameter, "
+                    "this is most likely a mistake (please double-check the arguments passed "
+                    "to 'logger.add()')."
+                )
+            formatter = format
+            is_formatter_dynamic = True
+        else:
+            raise TypeError(
+                "Invalid format, it should be a string or a function, not: '%s'"
+                % type(format).__name__
+            )
+
+        if not isinstance(encoding, str):
+            encoding = "ascii"
+
+        if isinstance(context, str):
+            context = get_context(context)
+        elif context is not None and not isinstance(context, BaseContext):
+            raise TypeError(
+                "Invalid context, it should be a string or a multiprocessing context, "
+                "not: '%s'" % type(context).__name__
+            )
+
+        with self._core.lock:
+            exception_formatter = ExceptionFormatter(
+                colorize=colorize,
+                encoding=encoding,
+                diagnose=diagnose,
+                backtrace=backtrace,
+                hidden_frames_filename=self.catch.__code__.co_filename,
+                prefix=exception_prefix,
+            )
+
+            handler = Handler(
+                name=name,
+                sink=wrapped_sink,
+                levelno=levelno,
+                formatter=formatter,
+                is_formatter_dynamic=is_formatter_dynamic,
+                filter_=filter_func,
+                colorize=colorize,
+                serialize=serialize,
+                enqueue=enqueue,
+                multiprocessing_context=context,
+                id_=handler_id,
+                error_interceptor=error_interceptor,
+                exception_formatter=exception_formatter,
+                levels_ansi_codes=self._core.levels_ansi_codes,
+            )
+
+            handlers = self._core.handlers.copy()
+            handlers[handler_id] = handler
+
+            self._core.min_level = min(self._core.min_level, levelno)
+            self._core.handlers = handlers
+
+        return handler_id

     def remove(self, handler_id=None):
         """Remove a previously added handler and stop sending logs to its sink.
@@ -746,7 +1032,32 @@ class Logger:
         >>> logger.remove(i)
         >>> logger.info("No longer logging")
         """
-        pass
+        if not (handler_id is None or isinstance(handler_id, int)):
+            raise TypeError(
+                "Invalid handler id, it should be an integer as returned "
+                "by the 'add()' method (or None), not: '%s'" % type(handler_id).__name__
+            )
+
+        with self._core.lock:
+            handlers = self._core.handlers.copy()
+
+            if handler_id is not None and handler_id not in handlers:
+                raise ValueError("There is no existing handler with id %d" % handler_id) from None
+
+            if handler_id is None:
+                handler_ids = list(handlers.keys())
+            else:
+                handler_ids = [handler_id]
+
+            for handler_id in handler_ids:
+                handler = handlers.pop(handler_id)
+
+                # This needs to be done first in case "stop()" raises an exception
+                levelnos = (h.levelno for h in handlers.values())
+                self._core.min_level = min(levelnos, default=float("inf"))
+                self._core.handlers = handlers
+
+                handler.stop()

     def complete(self):
         """Wait for the end of enqueued messages and asynchronous tasks scheduled by handlers.
@@ -798,12 +1109,34 @@ class Logger:
         >>> process.join()
         Message sent from the child
         """
-        pass
-
-    def catch(self, exception=Exception, *, level='ERROR', reraise=False,
-        onerror=None, exclude=None, default=None, message=
-        "An error has been caught in function '{record[function]}', process '{record[process].name}' ({record[process].id}), thread '{record[thread].name}' ({record[thread].id}):"
-        ):
+        tasks = []
+
+        with self._core.lock:
+            handlers = self._core.handlers.copy()
+            for handler in handlers.values():
+                handler.complete_queue()
+                tasks.extend(handler.tasks_to_complete())
+
+        class AwaitableCompleter:
+            def __await__(self):
+                for task in tasks:
+                    yield from task.__await__()
+
+        return AwaitableCompleter()
+
+    def catch(
+        self,
+        exception=Exception,
+        *,
+        level="ERROR",
+        reraise=False,
+        onerror=None,
+        exclude=None,
+        default=None,
+        message="An error has been caught in function '{record[function]}', "
+        "process '{record[process].name}' ({record[process].id}), "
+        "thread '{record[thread].name}' ({record[thread].id}):"
+    ):
         """Return a decorator to automatically log possibly caught error in wrapped function.

         This is useful to ensure unexpected exceptions are logged, the entire program can be
@@ -876,11 +1209,92 @@ class Logger:
         ... def main():
         ...     1 / 0
         """
-        pass
+        if callable(exception) and (
+            not isclass(exception) or not issubclass(exception, BaseException)
+        ):
+            return self.catch()(exception)
+
+        logger = self
+
+        class Catcher:
+            def __init__(self, from_decorator):
+                self._from_decorator = from_decorator
+
+            def __enter__(self):
+                return None
+
+            def __exit__(self, type_, value, traceback_):
+                if type_ is None:
+                    return
+
+                if not issubclass(type_, exception):
+                    return False
+
+                if exclude is not None and issubclass(type_, exclude):
+                    return False
+
+                from_decorator = self._from_decorator
+                _, depth, _, *options = logger._options
+
+                if from_decorator:
+                    depth += 1
+
+                catch_options = [(type_, value, traceback_), depth, True] + options
+                logger._log(level, from_decorator, catch_options, message, (), {})
+
+                if onerror is not None:
+                    onerror(value)
+
+                return not reraise
+
+            def __call__(self, function):
+                if isclass(function):
+                    raise TypeError(
+                        "Invalid object decorated with 'catch()', it must be a function, "
+                        "not a class (tried to wrap '%s')" % function.__name__
+                    )

-    def opt(self, *, exception=None, record=False, lazy=False, colors=False,
-        raw=False, capture=True, depth=0, ansi=False):
-        """Parametrize a logging call to slightly change generated log message.
+                catcher = Catcher(True)
+
+                if iscoroutinefunction(function):
+
+                    async def catch_wrapper(*args, **kwargs):
+                        with catcher:
+                            return await function(*args, **kwargs)
+                        return default
+
+                elif isgeneratorfunction(function):
+
+                    def catch_wrapper(*args, **kwargs):
+                        with catcher:
+                            return (yield from function(*args, **kwargs))
+                        return default
+
+                else:
+
+                    def catch_wrapper(*args, **kwargs):
+                        with catcher:
+                            return function(*args, **kwargs)
+                        return default
+
+                functools.update_wrapper(catch_wrapper, function)
+                return catch_wrapper
+
+        return Catcher(False)
+
+    def opt(
+        self,
+        *,
+        exception=None,
+        record=False,
+        lazy=False,
+        colors=False,
+        raw=False,
+        capture=True,
+        depth=0,
+        ansi=False
+    ):
+        r"""Parametrize a logging call to slightly change generated log message.

         Note that it's not possible to chain |opt| calls, the last one takes precedence over the
         others as it will "reset" the options to their default values.
@@ -942,7 +1356,7 @@ class Logger:
         >>> logger.opt(colors=True).warning("We got a <red>BIG</red> problem")
         [18:11:30] WARNING in '<module>' - We got a BIG problem

-        >>> logger.opt(raw=True).debug("No formatting\\n")
+        >>> logger.opt(raw=True).debug("No formatting\n")
         No formatting

         >>> logger.opt(capture=False).info("Displayed but not captured: {value}", value=123)
@@ -957,9 +1371,18 @@ class Logger:
         >>> func()
         [18:11:54] DEBUG in 'func' - Get parent context
         """
-        pass
-
-    def bind(__self, **kwargs):
+        if ansi:
+            colors = True
+            warnings.warn(
+                "The 'ansi' parameter is deprecated, please use 'colors' instead",
+                DeprecationWarning,
+                stacklevel=2,
+            )
+
+        args = self._options[-2:]
+        return Logger(self._core, exception, depth, record, lazy, colors, raw, capture, *args)
+
+    def bind(__self, **kwargs):  # noqa: N805
         """Bind attributes to the ``extra`` dict of each logged message record.

         This is used to add custom context to each logging call.
@@ -992,10 +1415,11 @@ class Logger:
         >>> instance_2.call("Second instance")
         127.0.0.1 - Second instance
         """
-        pass
+        *options, extra = __self._options
+        return Logger(__self._core, *options, {**extra, **kwargs})

     @contextlib.contextmanager
-    def contextualize(__self, **kwargs):
+    def contextualize(__self, **kwargs):  # noqa: N805
         """Bind attributes to the context-local ``extra`` dict while inside the ``with`` block.

         Contrary to |bind| there is no ``logger`` returned, the ``extra`` dict is modified in-place
@@ -1029,7 +1453,15 @@ class Logger:
         >>> logger.info("Done.")
         Done. | {}
         """
-        pass
+        with __self._core.lock:
+            new_context = {**context.get(), **kwargs}
+            token = context.set(new_context)
+
+        try:
+            yield
+        finally:
+            with __self._core.lock:
+                context.reset(token)

     def patch(self, patcher):
         """Attach a function to modify the record dict created by each logging call.
@@ -1077,7 +1509,8 @@ class Logger:
         ...     level, message = record["level"], record["message"]
         ...     logger.patch(lambda r: r.update(record)).log(level, message)
         """
-        pass
+        *options, patchers, extra = self._options
+        return Logger(self._core, *options, patchers + [patcher], extra)

     def level(self, name, no=None, color=None, icon=None):
         """Add, update or retrieve a logging level.
@@ -1132,7 +1565,55 @@ class Logger:
         >>> logger.warning("Updated!")
         30 /!\\ Updated!
         """
-        pass
+        if not isinstance(name, str):
+            raise TypeError(
+                "Invalid level name, it should be a string, not: '%s'" % type(name).__name__
+            )
+
+        if no is color is icon is None:
+            try:
+                return self._core.levels[name]
+            except KeyError:
+                raise ValueError("Level '%s' does not exist" % name) from None
+
+        if name not in self._core.levels:
+            if no is None:
+                raise ValueError(
+                    "Level '%s' does not exist, you have to create it by specifying a level no"
+                    % name
+                )
+            else:
+                old_color, old_icon = "", " "
+        elif no is not None:
+            raise TypeError("Level '%s' already exists, you can't update its severity no" % name)
+        else:
+            _, no, old_color, old_icon = self.level(name)
+
+        if color is None:
+            color = old_color
+
+        if icon is None:
+            icon = old_icon
+
+        if not isinstance(no, int):
+            raise TypeError(
+                "Invalid level no, it should be an integer, not: '%s'" % type(no).__name__
+            )
+
+        if no < 0:
+            raise ValueError("Invalid level no, it should be a positive integer, not: %d" % no)
+
+        ansi = Colorizer.ansify(color)
+        level = Level(name, no, color, icon)
+
+        with self._core.lock:
+            self._core.levels[name] = level
+            self._core.levels_ansi_codes[name] = ansi
+            self._core.levels_lookup[name] = (name, name, no, icon)
+            for handler in self._core.handlers.values():
+                handler.update_format(name)
+
+        return level

     def disable(self, name):
         """Disable logging of messages coming from ``name`` module and its children.
@@ -1156,7 +1637,7 @@ class Logger:
         >>> logger.disable("my_library")
         >>> logger.info("While publishing a library, don't forget to disable logging")
         """
-        pass
+        self._change_activation(name, False)

     def enable(self, name):
         """Enable logging of messages coming from ``name`` module and its children.
@@ -1180,10 +1661,9 @@ class Logger:
         >>> logger.info("Re-enabled, messages are logged.")
         [22:46:12] Re-enabled, messages are logged.
         """
-        pass
+        self._change_activation(name, True)

-    def configure(self, *, handlers=None, levels=None, extra=None, patcher=
-        None, activation=None):
+    def configure(self, *, handlers=None, levels=None, extra=None, patcher=None, activation=None):
         """Configure the core logger.

         It should be noted that ``extra`` values set using this function are available across all
@@ -1246,10 +1726,75 @@ class Logger:
         >>> logger.bind(context="bar").info("Suppress global context")
         >>> # => "bar - Suppress global context"
         """
-        pass
+        if handlers is not None:
+            self.remove()
+        else:
+            handlers = []
+
+        if levels is not None:
+            for params in levels:
+                self.level(**params)
+
+        if patcher is not None:
+            with self._core.lock:
+                self._core.patcher = patcher
+
+        if extra is not None:
+            with self._core.lock:
+                self._core.extra.clear()
+                self._core.extra.update(extra)
+
+        if activation is not None:
+            for name, state in activation:
+                if state:
+                    self.enable(name)
+                else:
+                    self.disable(name)
+
+        return [self.add(**params) for params in handlers]
+
+    def _change_activation(self, name, status):
+        if not (name is None or isinstance(name, str)):
+            raise TypeError(
+                "Invalid name, it should be a string (or None), not: '%s'" % type(name).__name__
+            )
+
+        with self._core.lock:
+            enabled = self._core.enabled.copy()
+
+            if name is None:
+                for n in enabled:
+                    if n is None:
+                        enabled[n] = status
+                self._core.activation_none = status
+                self._core.enabled = enabled
+                return
+
+            if name != "":
+                name += "."
+
+            activation_list = [
+                (n, s) for n, s in self._core.activation_list if n[: len(name)] != name
+            ]
+
+            parent_status = next((s for n, s in activation_list if name[: len(n)] == n), None)
+            if parent_status != status and not (name == "" and status is True):
+                activation_list.append((name, status))
+
+                def modules_depth(x):
+                    return x[0].count(".")
+
+                activation_list.sort(key=modules_depth, reverse=True)
+
+            for n in enabled:
+                if n is not None and (n + ".")[: len(name)] == name:
+                    enabled[n] = status
+
+            self._core.activation_list = activation_list
+            self._core.enabled = enabled

     @staticmethod
-    def parse(file, pattern, *, cast={}, chunk=2 ** 16):
+    def parse(file, pattern, *, cast={}, chunk=2**16):  # noqa: B006
         """Parse raw logs and extract each entry as a |dict|.

         The logging format has to be specified as the regex ``pattern``, it will then be
@@ -1296,43 +1841,232 @@ class Logger:
         ...     for log in logger.parse(file, reg, cast=cast):
         ...         print(log["date"], log["something_else"])
         """
-        pass
-
-    def trace(__self, __message, *args, **kwargs):
-        """Log ``message.format(*args, **kwargs)`` with severity ``'TRACE'``."""
-        pass
+        if isinstance(file, (str, PathLike)):
+            should_close = True
+            fileobj = open(str(file))
+        elif hasattr(file, "read") and callable(file.read):
+            should_close = False
+            fileobj = file
+        else:
+            raise TypeError(
+                "Invalid file, it should be a string path or a file object, not: '%s'"
+                % type(file).__name__
+            )
+
+        if isinstance(cast, dict):
+
+            def cast_function(groups):
+                for key, converter in cast.items():
+                    if key in groups:
+                        groups[key] = converter(groups[key])
+
+        elif callable(cast):
+            cast_function = cast
+        else:
+            raise TypeError(
+                "Invalid cast, it should be a function or a dict, not: '%s'" % type(cast).__name__
+            )
+
+        try:
+            regex = re.compile(pattern)
+        except TypeError:
+            raise TypeError(
+                "Invalid pattern, it should be a string or a compiled regex, not: '%s'"
+                % type(pattern).__name__
+            ) from None
+
+        matches = Logger._find_iter(fileobj, regex, chunk)
+
+        for match in matches:
+            groups = match.groupdict()
+            cast_function(groups)
+            yield groups
+
+        if should_close:
+            fileobj.close()

-    def debug(__self, __message, *args, **kwargs):
-        """Log ``message.format(*args, **kwargs)`` with severity ``'DEBUG'``."""
-        pass
-
-    def info(__self, __message, *args, **kwargs):
-        """Log ``message.format(*args, **kwargs)`` with severity ``'INFO'``."""
-        pass
-
-    def success(__self, __message, *args, **kwargs):
-        """Log ``message.format(*args, **kwargs)`` with severity ``'SUCCESS'``."""
-        pass
-
-    def warning(__self, __message, *args, **kwargs):
-        """Log ``message.format(*args, **kwargs)`` with severity ``'WARNING'``."""
-        pass
-
-    def error(__self, __message, *args, **kwargs):
-        """Log ``message.format(*args, **kwargs)`` with severity ``'ERROR'``."""
-        pass
-
-    def critical(__self, __message, *args, **kwargs):
-        """Log ``message.format(*args, **kwargs)`` with severity ``'CRITICAL'``."""
-        pass
-
-    def exception(__self, __message, *args, **kwargs):
-        """Convenience method for logging an ``'ERROR'`` with exception information."""
-        pass
-
-    def log(__self, __level, __message, *args, **kwargs):
-        """Log ``message.format(*args, **kwargs)`` with severity ``level``."""
-        pass
+    @staticmethod
+    def _find_iter(fileobj, regex, chunk):
+        buffer = fileobj.read(0)
+
+        while 1:
+            text = fileobj.read(chunk)
+            buffer += text
+            matches = list(regex.finditer(buffer))
+
+            if not text:
+                yield from matches
+                break
+
+            if len(matches) > 1:
+                end = matches[-2].end()
+                buffer = buffer[end:]
+                yield from matches[:-1]
+
+    def _log(self, level, from_decorator, options, message, args, kwargs):
+        core = self._core
+
+        if not core.handlers:
+            return
+
+        try:
+            level_id, level_name, level_no, level_icon = core.levels_lookup[level]
+        except (KeyError, TypeError):
+            if isinstance(level, str):
+                raise ValueError("Level '%s' does not exist" % level) from None
+            if not isinstance(level, int):
+                raise TypeError(
+                    "Invalid level, it should be an integer or a string, not: '%s'"
+                    % type(level).__name__
+                ) from None
+            if level < 0:
+                raise ValueError(
+                    "Invalid level value, it should be a positive integer, not: %d" % level
+                ) from None
+            cache = (None, "Level %d" % level, level, " ")
+            level_id, level_name, level_no, level_icon = cache
+            core.levels_lookup[level] = cache
+
+        if level_no < core.min_level:
+            return
+
+        (exception, depth, record, lazy, colors, raw, capture, patchers, extra) = options
+
+        frame = get_frame(depth + 2)
+
+        try:
+            name = frame.f_globals["__name__"]
+        except KeyError:
+            name = None
+
+        try:
+            if not core.enabled[name]:
+                return
+        except KeyError:
+            enabled = core.enabled
+            if name is None:
+                status = core.activation_none
+                enabled[name] = status
+                if not status:
+                    return
+            else:
+                dotted_name = name + "."
+                for dotted_module_name, status in core.activation_list:
+                    if dotted_name[: len(dotted_module_name)] == dotted_module_name:
+                        if status:
+                            break
+                        enabled[name] = False
+                        return
+                enabled[name] = True
+
+        current_datetime = aware_now()
+
+        code = frame.f_code
+        file_path = code.co_filename
+        file_name = basename(file_path)
+        thread = current_thread()
+        process = current_process()
+        elapsed = current_datetime - start_time
+
+        if exception:
+            if isinstance(exception, BaseException):
+                type_, value, traceback = (type(exception), exception, exception.__traceback__)
+            elif isinstance(exception, tuple):
+                type_, value, traceback = exception
+            else:
+                type_, value, traceback = sys.exc_info()
+            exception = RecordException(type_, value, traceback)
+        else:
+            exception = None
+
+        log_record = {
+            "elapsed": elapsed,
+            "exception": exception,
+            "extra": {**core.extra, **context.get(), **extra},
+            "file": RecordFile(file_name, file_path),
+            "function": code.co_name,
+            "level": RecordLevel(level_name, level_no, level_icon),
+            "line": frame.f_lineno,
+            "message": str(message),
+            "module": splitext(file_name)[0],
+            "name": name,
+            "process": RecordProcess(process.ident, process.name),
+            "thread": RecordThread(thread.ident, thread.name),
+            "time": current_datetime,
+        }
+
+        if lazy:
+            args = [arg() for arg in args]
+            kwargs = {key: value() for key, value in kwargs.items()}
+
+        if capture and kwargs:
+            log_record["extra"].update(kwargs)
+
+        if record:
+            if "record" in kwargs:
+                raise TypeError(
+                    "The message can't be formatted: 'record' shall not be used as a keyword "
+                    "argument while logger has been configured with '.opt(record=True)'"
+                )
+            kwargs.update(record=log_record)
+
+        if colors:
+            if args or kwargs:
+                colored_message = Colorizer.prepare_message(message, args, kwargs)
+            else:
+                colored_message = Colorizer.prepare_simple_message(str(message))
+            log_record["message"] = colored_message.stripped
+        elif args or kwargs:
+            colored_message = None
+            log_record["message"] = message.format(*args, **kwargs)
+        else:
+            colored_message = None
+
+        if core.patcher:
+            core.patcher(log_record)
+
+        for patcher in patchers:
+            patcher(log_record)
+
+        for handler in core.handlers.values():
+            handler.emit(log_record, level_id, from_decorator, raw, colored_message)
+
+    def trace(__self, __message, *args, **kwargs):  # noqa: N805
+        r"""Log ``message.format(*args, **kwargs)`` with severity ``'TRACE'``."""
+        __self._log("TRACE", False, __self._options, __message, args, kwargs)
+
+    def debug(__self, __message, *args, **kwargs):  # noqa: N805
+        r"""Log ``message.format(*args, **kwargs)`` with severity ``'DEBUG'``."""
+        __self._log("DEBUG", False, __self._options, __message, args, kwargs)
+
+    def info(__self, __message, *args, **kwargs):  # noqa: N805
+        r"""Log ``message.format(*args, **kwargs)`` with severity ``'INFO'``."""
+        __self._log("INFO", False, __self._options, __message, args, kwargs)
+
+    def success(__self, __message, *args, **kwargs):  # noqa: N805
+        r"""Log ``message.format(*args, **kwargs)`` with severity ``'SUCCESS'``."""
+        __self._log("SUCCESS", False, __self._options, __message, args, kwargs)
+
+    def warning(__self, __message, *args, **kwargs):  # noqa: N805
+        r"""Log ``message.format(*args, **kwargs)`` with severity ``'WARNING'``."""
+        __self._log("WARNING", False, __self._options, __message, args, kwargs)
+
+    def error(__self, __message, *args, **kwargs):  # noqa: N805
+        r"""Log ``message.format(*args, **kwargs)`` with severity ``'ERROR'``."""
+        __self._log("ERROR", False, __self._options, __message, args, kwargs)
+
+    def critical(__self, __message, *args, **kwargs):  # noqa: N805
+        r"""Log ``message.format(*args, **kwargs)`` with severity ``'CRITICAL'``."""
+        __self._log("CRITICAL", False, __self._options, __message, args, kwargs)
+
+    def exception(__self, __message, *args, **kwargs):  # noqa: N805
+        r"""Convenience method for logging an ``'ERROR'`` with exception information."""
+        options = (True,) + __self._options[1:]
+        __self._log("ERROR", False, options, __message, args, kwargs)
+
+    def log(__self, __level, __message, *args, **kwargs):  # noqa: N805
+        r"""Log ``message.format(*args, **kwargs)`` with severity ``level``."""
+        __self._log(__level, False, __self._options, __message, args, kwargs)

     def start(self, *args, **kwargs):
         """Deprecated function to |add| a new handler.
@@ -1343,7 +2077,12 @@ class Logger:
           ``start()`` will be removed in Loguru 1.0.0, it is replaced by ``add()`` which is a less
           confusing name.
         """
-        pass
+        warnings.warn(
+            "The 'start()' method is deprecated, please use 'add()' instead",
+            DeprecationWarning,
+            stacklevel=2,
+        )
+        return self.add(*args, **kwargs)

     def stop(self, *args, **kwargs):
         """Deprecated function to |remove| an existing handler.
@@ -1354,4 +2093,9 @@ class Logger:
           ``stop()`` will be removed in Loguru 1.0.0, it is replaced by ``remove()`` which is a less
           confusing name.
         """
-        pass
+        warnings.warn(
+            "The 'stop()' method is deprecated, please use 'remove()' instead",
+            DeprecationWarning,
+            stacklevel=2,
+        )
+        return self.remove(*args, **kwargs)
diff --git a/loguru/_recattrs.py b/loguru/_recattrs.py
index d344c9f..b09426e 100644
--- a/loguru/_recattrs.py
+++ b/loguru/_recattrs.py
@@ -3,7 +3,7 @@ from collections import namedtuple


 class RecordLevel:
-    __slots__ = 'name', 'no', 'icon'
+    __slots__ = ("name", "no", "icon")

     def __init__(self, name, no, icon):
         self.name = name
@@ -11,66 +11,80 @@ class RecordLevel:
         self.icon = icon

     def __repr__(self):
-        return '(name=%r, no=%r, icon=%r)' % (self.name, self.no, self.icon)
+        return "(name=%r, no=%r, icon=%r)" % (self.name, self.no, self.icon)

     def __format__(self, spec):
         return self.name.__format__(spec)


 class RecordFile:
-    __slots__ = 'name', 'path'
+    __slots__ = ("name", "path")

     def __init__(self, name, path):
         self.name = name
         self.path = path

     def __repr__(self):
-        return '(name=%r, path=%r)' % (self.name, self.path)
+        return "(name=%r, path=%r)" % (self.name, self.path)

     def __format__(self, spec):
         return self.name.__format__(spec)


 class RecordThread:
-    __slots__ = 'id', 'name'
+    __slots__ = ("id", "name")

     def __init__(self, id_, name):
         self.id = id_
         self.name = name

     def __repr__(self):
-        return '(id=%r, name=%r)' % (self.id, self.name)
+        return "(id=%r, name=%r)" % (self.id, self.name)

     def __format__(self, spec):
         return self.id.__format__(spec)


 class RecordProcess:
-    __slots__ = 'id', 'name'
+    __slots__ = ("id", "name")

     def __init__(self, id_, name):
         self.id = id_
         self.name = name

     def __repr__(self):
-        return '(id=%r, name=%r)' % (self.id, self.name)
+        return "(id=%r, name=%r)" % (self.id, self.name)

     def __format__(self, spec):
         return self.id.__format__(spec)


-class RecordException(namedtuple('RecordException', ('type', 'value',
-    'traceback'))):
-
+class RecordException(namedtuple("RecordException", ("type", "value", "traceback"))):
     def __repr__(self):
-        return '(type=%r, value=%r, traceback=%r)' % (self.type, self.value,
-            self.traceback)
+        return "(type=%r, value=%r, traceback=%r)" % (self.type, self.value, self.traceback)

     def __reduce__(self):
+        # The traceback is not picklable, therefore it needs to be removed. Additionally, there's a
+        # possibility that the exception value is not picklable either. In such cases, we also need
+        # to remove it. This is done for user convenience, aiming to prevent error logging caused by
+        # custom exceptions from third-party libraries. If the serialization succeeds, we can reuse
+        # the pickled value later for optimization (so that it's not pickled twice). It's important
+        # to note that custom exceptions might not necessarily raise a PickleError, hence the
+        # generic Exception catch.
         try:
             pickled_value = pickle.dumps(self.value)
         except Exception:
-            return RecordException, (self.type, None, None)
+            return (RecordException, (self.type, None, None))
+        else:
+            return (RecordException._from_pickled_value, (self.type, pickled_value, None))
+
+    @classmethod
+    def _from_pickled_value(cls, type_, pickled_value, traceback_):
+        try:
+            # It's safe to use "pickle.loads()" in this case because the pickled value is generated
+            # by the same code and is not coming from an untrusted source.
+            value = pickle.loads(pickled_value)
+        except Exception:
+            return cls(type_, None, traceback_)
         else:
-            return RecordException._from_pickled_value, (self.type,
-                pickled_value, None)
+            return cls(type_, value, traceback_)
diff --git a/loguru/_simple_sinks.py b/loguru/_simple_sinks.py
index a21a218..068f1e1 100644
--- a/loguru/_simple_sinks.py
+++ b/loguru/_simple_sinks.py
@@ -1,36 +1,112 @@
 import asyncio
 import logging
 import weakref
+
 from ._asyncio_loop import get_running_loop, get_task_loop


 class StreamSink:
-
     def __init__(self, stream):
         self._stream = stream
-        self._flushable = callable(getattr(stream, 'flush', None))
-        self._stoppable = callable(getattr(stream, 'stop', None))
-        self._completable = asyncio.iscoroutinefunction(getattr(stream,
-            'complete', None))
+        self._flushable = callable(getattr(stream, "flush", None))
+        self._stoppable = callable(getattr(stream, "stop", None))
+        self._completable = asyncio.iscoroutinefunction(getattr(stream, "complete", None))

+    def write(self, message):
+        self._stream.write(message)
+        if self._flushable:
+            self._stream.flush()

-class StandardSink:
+    def stop(self):
+        if self._stoppable:
+            self._stream.stop()

+    def tasks_to_complete(self):
+        if not self._completable:
+            return []
+        return [self._stream.complete()]
+
+
+class StandardSink:
     def __init__(self, handler):
         self._handler = handler

+    def write(self, message):
+        record = message.record
+        message = str(message)
+        exc = record["exception"]
+        record = logging.getLogger().makeRecord(
+            record["name"],
+            record["level"].no,
+            record["file"].path,
+            record["line"],
+            message,
+            (),
+            (exc.type, exc.value, exc.traceback) if exc else None,
+            record["function"],
+            {"extra": record["extra"]},
+        )
+        if exc:
+            record.exc_text = "\n"
+        self._handler.handle(record)
+
+    def stop(self):
+        self._handler.close()
+
+    def tasks_to_complete(self):
+        return []

-class AsyncSink:

+class AsyncSink:
     def __init__(self, function, loop, error_interceptor):
         self._function = function
         self._loop = loop
         self._error_interceptor = error_interceptor
         self._tasks = weakref.WeakSet()

+    def write(self, message):
+        try:
+            loop = self._loop or get_running_loop()
+        except RuntimeError:
+            return
+
+        coroutine = self._function(message)
+        task = loop.create_task(coroutine)
+
+        def check_exception(future):
+            if future.cancelled() or future.exception() is None:
+                return
+            if not self._error_interceptor.should_catch():
+                raise future.exception()
+            self._error_interceptor.print(message.record, exception=future.exception())
+
+        task.add_done_callback(check_exception)
+        self._tasks.add(task)
+
+    def stop(self):
+        for task in self._tasks:
+            task.cancel()
+
+    def tasks_to_complete(self):
+        # To avoid errors due to "self._tasks" being mutated while iterated, the
+        # "tasks_to_complete()" method must be protected by the same lock as "write()" (which
+        # happens to be the handler lock). However, the tasks must not be awaited while the lock is
+        # acquired as this could lead to a deadlock. Therefore, we first need to collect the tasks
+        # to complete, then return them so that they can be awaited outside of the lock.
+        return [self._complete_task(task) for task in self._tasks]
+
+    async def _complete_task(self, task):
+        loop = get_running_loop()
+        if get_task_loop(task) is not loop:
+            return
+        try:
+            await task
+        except Exception:
+            pass  # Handled in "check_exception()"
+
     def __getstate__(self):
         state = self.__dict__.copy()
-        state['_tasks'] = None
+        state["_tasks"] = None
         return state

     def __setstate__(self, state):
@@ -39,6 +115,14 @@ class AsyncSink:


 class CallableSink:
-
     def __init__(self, function):
         self._function = function
+
+    def write(self, message):
+        self._function(message)
+
+    def stop(self):
+        pass
+
+    def tasks_to_complete(self):
+        return []
diff --git a/loguru/_string_parsers.py b/loguru/_string_parsers.py
index 520211d..da00904 100644
--- a/loguru/_string_parsers.py
+++ b/loguru/_string_parsers.py
@@ -3,4 +3,185 @@ import re


 class Frequencies:
-    pass
+    @staticmethod
+    def hourly(t):
+        dt = t + datetime.timedelta(hours=1)
+        return dt.replace(minute=0, second=0, microsecond=0)
+
+    @staticmethod
+    def daily(t):
+        dt = t + datetime.timedelta(days=1)
+        return dt.replace(hour=0, minute=0, second=0, microsecond=0)
+
+    @staticmethod
+    def weekly(t):
+        dt = t + datetime.timedelta(days=7 - t.weekday())
+        return dt.replace(hour=0, minute=0, second=0, microsecond=0)
+
+    @staticmethod
+    def monthly(t):
+        if t.month == 12:
+            y, m = t.year + 1, 1
+        else:
+            y, m = t.year, t.month + 1
+        return t.replace(year=y, month=m, day=1, hour=0, minute=0, second=0, microsecond=0)
+
+    @staticmethod
+    def yearly(t):
+        y = t.year + 1
+        return t.replace(year=y, month=1, day=1, hour=0, minute=0, second=0, microsecond=0)
+
+
+def parse_size(size):
+    size = size.strip()
+    reg = re.compile(r"([e\+\-\.\d]+)\s*([kmgtpezy])?(i)?(b)", flags=re.I)
+
+    match = reg.fullmatch(size)
+
+    if not match:
+        return None
+
+    s, u, i, b = match.groups()
+
+    try:
+        s = float(s)
+    except ValueError as e:
+        raise ValueError("Invalid float value while parsing size: '%s'" % s) from e
+
+    u = "kmgtpezy".index(u.lower()) + 1 if u else 0
+    i = 1024 if i else 1000
+    b = {"b": 8, "B": 1}[b] if b else 1
+    size = s * i**u / b
+
+    return size
+
+
+def parse_duration(duration):
+    duration = duration.strip()
+    reg = r"(?:([e\+\-\.\d]+)\s*([a-z]+)[\s\,]*)"
+
+    units = [
+        ("y|years?", 31536000),
+        ("months?", 2628000),
+        ("w|weeks?", 604800),
+        ("d|days?", 86400),
+        ("h|hours?", 3600),
+        ("min(?:ute)?s?", 60),
+        ("s|sec(?:ond)?s?", 1),
+        ("ms|milliseconds?", 0.001),
+        ("us|microseconds?", 0.000001),
+    ]
+
+    if not re.fullmatch(reg + "+", duration, flags=re.I):
+        return None
+
+    seconds = 0
+
+    for value, unit in re.findall(reg, duration, flags=re.I):
+        try:
+            value = float(value)
+        except ValueError as e:
+            raise ValueError("Invalid float value while parsing duration: '%s'" % value) from e
+
+        try:
+            unit = next(u for r, u in units if re.fullmatch(r, unit, flags=re.I))
+        except StopIteration:
+            raise ValueError("Invalid unit value while parsing duration: '%s'" % unit) from None
+
+        seconds += value * unit
+
+    return datetime.timedelta(seconds=seconds)
+
+
+def parse_frequency(frequency):
+    frequencies = {
+        "hourly": Frequencies.hourly,
+        "daily": Frequencies.daily,
+        "weekly": Frequencies.weekly,
+        "monthly": Frequencies.monthly,
+        "yearly": Frequencies.yearly,
+    }
+    frequency = frequency.strip().lower()
+    return frequencies.get(frequency, None)
+
+
+def parse_day(day):
+    days = {
+        "monday": 0,
+        "tuesday": 1,
+        "wednesday": 2,
+        "thursday": 3,
+        "friday": 4,
+        "saturday": 5,
+        "sunday": 6,
+    }
+    day = day.strip().lower()
+    if day in days:
+        return days[day]
+    elif day.startswith("w") and day[1:].isdigit():
+        day = int(day[1:])
+        if not 0 <= day < 7:
+            raise ValueError("Invalid weekday value while parsing day (expected [0-6]): '%d'" % day)
+    else:
+        day = None
+
+    return day
+
+
+def parse_time(time):
+    time = time.strip()
+    reg = re.compile(r"^[\d\.\:]+\s*(?:[ap]m)?$", flags=re.I)
+
+    if not reg.match(time):
+        return None
+
+    formats = [
+        "%H",
+        "%H:%M",
+        "%H:%M:%S",
+        "%H:%M:%S.%f",
+        "%I %p",
+        "%I:%M %S",
+        "%I:%M:%S %p",
+        "%I:%M:%S.%f %p",
+    ]
+
+    for format_ in formats:
+        try:
+            dt = datetime.datetime.strptime(time, format_)
+        except ValueError:
+            pass
+        else:
+            return dt.time()
+
+    raise ValueError("Unrecognized format while parsing time: '%s'" % time)
+
+
+def parse_daytime(daytime):
+    daytime = daytime.strip()
+    reg = re.compile(r"^(.*?)\s+at\s+(.*)$", flags=re.I)
+
+    match = reg.match(daytime)
+    if match:
+        day, time = match.groups()
+    else:
+        day = time = daytime
+
+    try:
+        day = parse_day(day)
+        if match and day is None:
+            raise ValueError
+    except ValueError as e:
+        raise ValueError("Invalid day while parsing daytime: '%s'" % day) from e
+
+    try:
+        time = parse_time(time)
+        if match and time is None:
+            raise ValueError
+    except ValueError as e:
+        raise ValueError("Invalid time while parsing daytime: '%s'" % time) from e
+
+    if day is None and time is None:
+        return None
+
+    return day, time