back to OpenHands summary
OpenHands: imapclient
Pytest Summary for test tests
status |
count |
failed |
82 |
passed |
4 |
total |
86 |
collected |
86 |
Failed pytests:
test_datetime_util.py::TestParsing::test_dots_for_time_separator
test_datetime_util.py::TestParsing::test_dots_for_time_separator
self =
def test_dots_for_time_separator(self):
# As reported in issue #154.
> self.check_normalised_and_not(
b"Sat, 8 May 2010 16.03.09 +0200",
datetime(2010, 5, 8, 16, 3, 9, 0, FixedOffset(120)),
)
tests/test_datetime_util.py:45:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_datetime_util.py:21: in check_normalised_and_not
parse_to_datetime(in_string), datetime_to_native(expected_datetime)
imapclient/datetime_util.py:69: in parse_to_datetime
return datetime_to_native(dt)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dt = datetime.datetime(2010, 5, 8, 16, 3, 9, tzinfo=)
def datetime_to_native(dt: datetime) -> datetime:
"""Convert a timezone-aware datetime to a naive datetime in the local timezone."""
if dt.tzinfo is None:
return dt
> return dt.astimezone(FixedOffset.for_system()).replace(tzinfo=None)
E NotImplementedError: a tzinfo subclass must implement utcoffset()
imapclient/datetime_util.py:11: NotImplementedError
test_datetime_util.py::TestParsing::test_internaldate_style
test_datetime_util.py::TestParsing::test_internaldate_style
self =
def test_internaldate_style(self):
> self.check_normalised_and_not(
b" 9-Feb-2007 17:08:08 -0430",
datetime(2007, 2, 9, 17, 8, 8, 0, FixedOffset(-4 * 60 - 30)),
)
tests/test_datetime_util.py:34:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_datetime_util.py:21: in check_normalised_and_not
parse_to_datetime(in_string), datetime_to_native(expected_datetime)
imapclient/datetime_util.py:69: in parse_to_datetime
return datetime_to_native(dt)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dt = datetime.datetime(2007, 2, 9, 17, 8, 8, tzinfo=)
def datetime_to_native(dt: datetime) -> datetime:
"""Convert a timezone-aware datetime to a naive datetime in the local timezone."""
if dt.tzinfo is None:
return dt
> return dt.astimezone(FixedOffset.for_system()).replace(tzinfo=None)
E NotImplementedError: a tzinfo subclass must implement utcoffset()
imapclient/datetime_util.py:11: NotImplementedError
test_datetime_util.py::TestParsing::test_rfc822_style
test_datetime_util.py::TestParsing::test_rfc822_style
self =
def test_rfc822_style(self):
> self.check_normalised_and_not(
b"Sun, 24 Mar 2013 22:06:10 +0200",
datetime(2013, 3, 24, 22, 6, 10, 0, FixedOffset(120)),
)
tests/test_datetime_util.py:28:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_datetime_util.py:21: in check_normalised_and_not
parse_to_datetime(in_string), datetime_to_native(expected_datetime)
imapclient/datetime_util.py:69: in parse_to_datetime
return datetime_to_native(dt)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dt = datetime.datetime(2013, 3, 24, 22, 6, 10, tzinfo=)
def datetime_to_native(dt: datetime) -> datetime:
"""Convert a timezone-aware datetime to a naive datetime in the local timezone."""
if dt.tzinfo is None:
return dt
> return dt.astimezone(FixedOffset.for_system()).replace(tzinfo=None)
E NotImplementedError: a tzinfo subclass must implement utcoffset()
imapclient/datetime_util.py:11: NotImplementedError
test_datetime_util.py::TestDatetimeToINTERNALDATE::test_with_timezone
test_datetime_util.py::TestDatetimeToINTERNALDATE::test_with_timezone
self =
def test_with_timezone(self):
dt = datetime(2009, 1, 2, 3, 4, 5, 0, FixedOffset(2 * 60 + 30))
> self.assertEqual(datetime_to_INTERNALDATE(dt), "02-Jan-2009 03:04:05 +0230")
tests/test_datetime_util.py:65:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dt = datetime.datetime(2009, 1, 2, 3, 4, 5, tzinfo=)
def datetime_to_INTERNALDATE(dt: datetime) -> str:
"""Convert a datetime instance to a IMAP INTERNALDATE string.
If timezone information is missing the current system
timezone is used.
"""
if dt.tzinfo is None:
dt = dt.replace(tzinfo=FixedOffset.for_system())
> sign = '+' if dt.utcoffset().total_seconds() >= 0 else '-'
E NotImplementedError: a tzinfo subclass must implement utcoffset()
imapclient/datetime_util.py:81: NotImplementedError
test_datetime_util.py::TestDatetimeToINTERNALDATE::test_without_timezone
test_datetime_util.py::TestDatetimeToINTERNALDATE::test_without_timezone
/usr/lib/python3.10/unittest/mock.py:1376: in patched
with self.decoration_helper(patched,
/usr/lib/python3.10/contextlib.py:135: in __enter__
return next(self.gen)
/usr/lib/python3.10/unittest/mock.py:1358: in decoration_helper
arg = exit_stack.enter_context(patching)
/usr/lib/python3.10/contextlib.py:492: in enter_context
result = _cm_type.__enter__(cm)
/usr/lib/python3.10/unittest/mock.py:1431: in __enter__
self.target = self.getter()
/usr/lib/python3.10/unittest/mock.py:1618: in
getter = lambda: _importer(target)
/usr/lib/python3.10/unittest/mock.py:1257: in _importer
thing = __import__(import_path)
imapclient/__init__.py:8: in
from .imapclient import * # noqa: F401,F403
imapclient/imapclient.py:1707: in
class _dict_bytes_normaliser:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
class _dict_bytes_normaliser:
"""Wrap a dict with unicode/bytes keys and normalise the keys to
bytes.
"""
def __init__(self, d):
self._d = d
> items = iteritems
E NameError: name 'iteritems' is not defined
imapclient/imapclient.py:1714: NameError
test_fixed_offset.py::TestFixedOffset::test_GMT
test_fixed_offset.py::TestFixedOffset::test_GMT
self =
def test_GMT(self):
> self._check(FixedOffset(0), timedelta(0), "+0000")
tests/test_fixed_offset.py:19:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self =
offset =
expected_delta = datetime.timedelta(0), expected_name = '+0000'
def _check(self, offset, expected_delta, expected_name):
> self.assertEqual(offset.utcoffset(None), expected_delta)
E NotImplementedError: a tzinfo subclass must implement utcoffset()
tests/test_fixed_offset.py:14: NotImplementedError
test_fixed_offset.py::TestFixedOffset::test_for_system_DST_active
test_fixed_offset.py::TestFixedOffset::test_for_system_DST_active
/usr/lib/python3.10/unittest/mock.py:1376: in patched
with self.decoration_helper(patched,
/usr/lib/python3.10/contextlib.py:135: in __enter__
return next(self.gen)
/usr/lib/python3.10/unittest/mock.py:1358: in decoration_helper
arg = exit_stack.enter_context(patching)
/usr/lib/python3.10/contextlib.py:492: in enter_context
result = _cm_type.__enter__(cm)
/usr/lib/python3.10/unittest/mock.py:1431: in __enter__
self.target = self.getter()
/usr/lib/python3.10/unittest/mock.py:1674: in
getter = lambda: _importer(target)
/usr/lib/python3.10/unittest/mock.py:1257: in _importer
thing = __import__(import_path)
imapclient/__init__.py:8: in
from .imapclient import * # noqa: F401,F403
imapclient/imapclient.py:1707: in
class _dict_bytes_normaliser:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
class _dict_bytes_normaliser:
"""Wrap a dict with unicode/bytes keys and normalise the keys to
bytes.
"""
def __init__(self, d):
self._d = d
> items = iteritems
E NameError: name 'iteritems' is not defined
imapclient/imapclient.py:1714: NameError
test_fixed_offset.py::TestFixedOffset::test_for_system_DST_not_active
test_fixed_offset.py::TestFixedOffset::test_for_system_DST_not_active
/usr/lib/python3.10/unittest/mock.py:1376: in patched
with self.decoration_helper(patched,
/usr/lib/python3.10/contextlib.py:135: in __enter__
return next(self.gen)
/usr/lib/python3.10/unittest/mock.py:1358: in decoration_helper
arg = exit_stack.enter_context(patching)
/usr/lib/python3.10/contextlib.py:492: in enter_context
result = _cm_type.__enter__(cm)
/usr/lib/python3.10/unittest/mock.py:1431: in __enter__
self.target = self.getter()
/usr/lib/python3.10/unittest/mock.py:1674: in
getter = lambda: _importer(target)
/usr/lib/python3.10/unittest/mock.py:1257: in _importer
thing = __import__(import_path)
imapclient/__init__.py:8: in
from .imapclient import * # noqa: F401,F403
imapclient/imapclient.py:1707: in
class _dict_bytes_normaliser:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
class _dict_bytes_normaliser:
"""Wrap a dict with unicode/bytes keys and normalise the keys to
bytes.
"""
def __init__(self, d):
self._d = d
> items = iteritems
E NameError: name 'iteritems' is not defined
imapclient/imapclient.py:1714: NameError
test_fixed_offset.py::TestFixedOffset::test_for_system_no_DST
test_fixed_offset.py::TestFixedOffset::test_for_system_no_DST
/usr/lib/python3.10/unittest/mock.py:1376: in patched
with self.decoration_helper(patched,
/usr/lib/python3.10/contextlib.py:135: in __enter__
return next(self.gen)
/usr/lib/python3.10/unittest/mock.py:1358: in decoration_helper
arg = exit_stack.enter_context(patching)
/usr/lib/python3.10/contextlib.py:492: in enter_context
result = _cm_type.__enter__(cm)
/usr/lib/python3.10/unittest/mock.py:1431: in __enter__
self.target = self.getter()
/usr/lib/python3.10/unittest/mock.py:1674: in
getter = lambda: _importer(target)
/usr/lib/python3.10/unittest/mock.py:1257: in _importer
thing = __import__(import_path)
imapclient/__init__.py:8: in
from .imapclient import * # noqa: F401,F403
imapclient/imapclient.py:1707: in
class _dict_bytes_normaliser:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
class _dict_bytes_normaliser:
"""Wrap a dict with unicode/bytes keys and normalise the keys to
bytes.
"""
def __init__(self, d):
self._d = d
> items = iteritems
E NameError: name 'iteritems' is not defined
imapclient/imapclient.py:1714: NameError
test_fixed_offset.py::TestFixedOffset::test_negative
test_fixed_offset.py::TestFixedOffset::test_negative
self =
def test_negative(self):
> self._check(FixedOffset(-30), timedelta(minutes=-30), "-0030")
tests/test_fixed_offset.py:27:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self =
offset =
expected_delta = datetime.timedelta(days=-1, seconds=84600)
expected_name = '-0030'
def _check(self, offset, expected_delta, expected_name):
> self.assertEqual(offset.utcoffset(None), expected_delta)
E NotImplementedError: a tzinfo subclass must implement utcoffset()
tests/test_fixed_offset.py:14: NotImplementedError
test_fixed_offset.py::TestFixedOffset::test_positive
test_fixed_offset.py::TestFixedOffset::test_positive
self =
def test_positive(self):
> self._check(FixedOffset(30), timedelta(minutes=30), "+0030")
tests/test_fixed_offset.py:22:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self =
offset =
expected_delta = datetime.timedelta(seconds=1800), expected_name = '+0030'
def _check(self, offset, expected_delta, expected_name):
> self.assertEqual(offset.utcoffset(None), expected_delta)
E NotImplementedError: a tzinfo subclass must implement utcoffset()
tests/test_fixed_offset.py:14: NotImplementedError
test_imap_utf7.py::IMAP4UTF7TestCase::test_decode
test_imap_utf7.py::IMAP4UTF7TestCase::test_decode
self =
def test_decode(self):
for input, output in self.tests:
decoded = decode(output)
> self.assertIsInstance(decoded, str)
E AssertionError: None is not an instance of
tests/test_imap_utf7.py:36: AssertionError
test_imap_utf7.py::IMAP4UTF7TestCase::test_encode
test_imap_utf7.py::IMAP4UTF7TestCase::test_encode
self =
def test_encode(self):
for input, output in self.tests:
encoded = encode(input)
> self.assertIsInstance(encoded, bytes)
E AssertionError: None is not an instance of
tests/test_imap_utf7.py:30: AssertionError
test_imap_utf7.py::IMAP4UTF7TestCase::test_printable_singletons
test_imap_utf7.py::IMAP4UTF7TestCase::test_printable_singletons
self =
def test_printable_singletons(self):
"""
The IMAP4 modified UTF-7 implementation encodes all printable
characters which are in ASCII using the corresponding ASCII byte.
"""
# All printables represent themselves
for o in list(range(0x20, 0x26)) + list(range(0x27, 0x7F)):
> self.assertEqual(bytes((o,)), encode(chr(o)))
E AssertionError: b' ' != None
tests/test_imap_utf7.py:46: AssertionError
test_response_lexer.py::TestTokenSource::test_escaping
test_response_lexer.py::TestTokenSource::test_escaping
self =
def test_escaping(self):
> self.check([rb'"aaa\"bbb"'], [rb'"aaa"bbb"'])
tests/test_response_lexer.py:35:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_lexer.py:91: in check
self.assertSequenceEqual(list(tokens), expected_out)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self =
def __iter__(self) -> Iterator[bytes]:
for source in self.sources:
self.current_source = source
> for tok in self.read_token_stream(iter(source)):
E AttributeError: 'Lexer' object has no attribute 'read_token_stream'
imapclient/response_lexer.py:45: AttributeError
test_response_lexer.py::TestTokenSource::test_invalid_escape
test_response_lexer.py::TestTokenSource::test_invalid_escape
self =
def test_invalid_escape(self):
> self.check([rb'"aaa\Zbbb"'], [rb'"aaa\Zbbb"'])
tests/test_response_lexer.py:40:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_lexer.py:91: in check
self.assertSequenceEqual(list(tokens), expected_out)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self =
def __iter__(self) -> Iterator[bytes]:
for source in self.sources:
self.current_source = source
> for tok in self.read_token_stream(iter(source)):
E AttributeError: 'Lexer' object has no attribute 'read_token_stream'
imapclient/response_lexer.py:45: AttributeError
test_response_lexer.py::TestTokenSource::test_lists
test_response_lexer.py::TestTokenSource::test_lists
self =
def test_lists(self):
> self.check([b"()"], [b"(", b")"])
tests/test_response_lexer.py:43:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_lexer.py:91: in check
self.assertSequenceEqual(list(tokens), expected_out)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self =
def __iter__(self) -> Iterator[bytes]:
for source in self.sources:
self.current_source = source
> for tok in self.read_token_stream(iter(source)):
E AttributeError: 'Lexer' object has no attribute 'read_token_stream'
imapclient/response_lexer.py:45: AttributeError
test_response_lexer.py::TestTokenSource::test_literal
test_response_lexer.py::TestTokenSource::test_literal
self =
def test_literal(self):
source = TokenSource([(b"abc {7}", b"foo bar"), b")"])
tokens = iter(source)
> self.assertEqual(next(tokens), b"abc")
tests/test_response_lexer.py:72:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self =
def __iter__(self) -> Iterator[bytes]:
for source in self.sources:
self.current_source = source
> for tok in self.read_token_stream(iter(source)):
E AttributeError: 'Lexer' object has no attribute 'read_token_stream'
imapclient/response_lexer.py:45: AttributeError
test_response_lexer.py::TestTokenSource::test_literals
test_response_lexer.py::TestTokenSource::test_literals
self =
def test_literals(self):
source = TokenSource([(b"abc {7}", b"foo bar"), (b"{5}", b"snafu"), b")"])
tokens = iter(source)
> self.assertEqual(next(tokens), b"abc")
tests/test_response_lexer.py:81:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self =
def __iter__(self) -> Iterator[bytes]:
for source in self.sources:
self.current_source = source
> for tok in self.read_token_stream(iter(source)):
E AttributeError: 'Lexer' object has no attribute 'read_token_stream'
imapclient/response_lexer.py:45: AttributeError
test_response_lexer.py::TestTokenSource::test_multiple_inputs
self =
def test_multiple_inputs(self):
> self.check([b"abc 111", b"def 222"], [b"abc", b"111", b"def", b"222"])
tests/test_response_lexer.py:18:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_lexer.py:91: in check
self.assertSequenceEqual(list(tokens), expected_out)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self =
def __iter__(self) -> Iterator[bytes]:
for source in self.sources:
self.current_source = source
> for tok in self.read_token_stream(iter(source)):
E AttributeError: 'Lexer' object has no attribute 'read_token_stream'
imapclient/response_lexer.py:45: AttributeError
test_response_lexer.py::TestTokenSource::test_no_escaping_in_square_brackets
test_response_lexer.py::TestTokenSource::test_no_escaping_in_square_brackets
self =
def test_no_escaping_in_square_brackets(self):
> self.check([rb"[aaa\\bbb]"], [rb"[aaa\\bbb]"])
tests/test_response_lexer.py:62:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_lexer.py:91: in check
self.assertSequenceEqual(list(tokens), expected_out)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self =
def __iter__(self) -> Iterator[bytes]:
for source in self.sources:
self.current_source = source
> for tok in self.read_token_stream(iter(source)):
E AttributeError: 'Lexer' object has no attribute 'read_token_stream'
imapclient/response_lexer.py:45: AttributeError
test_response_lexer.py::TestTokenSource::test_one_token
test_response_lexer.py::TestTokenSource::test_one_token
self =
def test_one_token(self):
> self.check([b"abc"], [b"abc"])
tests/test_response_lexer.py:12:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_lexer.py:91: in check
self.assertSequenceEqual(list(tokens), expected_out)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self =
def __iter__(self) -> Iterator[bytes]:
for source in self.sources:
self.current_source = source
> for tok in self.read_token_stream(iter(source)):
E AttributeError: 'Lexer' object has no attribute 'read_token_stream'
imapclient/response_lexer.py:45: AttributeError
test_response_lexer.py::TestTokenSource::test_quoted_strings
test_response_lexer.py::TestTokenSource::test_quoted_strings
self =
def test_quoted_strings(self):
> self.check([b'"abc def"'], [b'"abc def"'])
tests/test_response_lexer.py:25:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_lexer.py:91: in check
self.assertSequenceEqual(list(tokens), expected_out)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self =
def __iter__(self) -> Iterator[bytes]:
for source in self.sources:
self.current_source = source
> for tok in self.read_token_stream(iter(source)):
E AttributeError: 'Lexer' object has no attribute 'read_token_stream'
imapclient/response_lexer.py:45: AttributeError
test_response_lexer.py::TestTokenSource::test_simple_tokens
test_response_lexer.py::TestTokenSource::test_simple_tokens
self =
def test_simple_tokens(self):
> self.check([b"abc 111 def"], [b"abc", b"111", b"def"])
tests/test_response_lexer.py:15:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_lexer.py:91: in check
self.assertSequenceEqual(list(tokens), expected_out)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self =
def __iter__(self) -> Iterator[bytes]:
for source in self.sources:
self.current_source = source
> for tok in self.read_token_stream(iter(source)):
E AttributeError: 'Lexer' object has no attribute 'read_token_stream'
imapclient/response_lexer.py:45: AttributeError
test_response_lexer.py::TestTokenSource::test_square_brackets
test_response_lexer.py::TestTokenSource::test_square_brackets
self =
def test_square_brackets(self):
> self.check([b"[aaa bbb]"], [b"[aaa bbb]"])
tests/test_response_lexer.py:56:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_lexer.py:91: in check
self.assertSequenceEqual(list(tokens), expected_out)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self =
def __iter__(self) -> Iterator[bytes]:
for source in self.sources:
self.current_source = source
> for tok in self.read_token_stream(iter(source)):
E AttributeError: 'Lexer' object has no attribute 'read_token_stream'
imapclient/response_lexer.py:45: AttributeError
test_response_lexer.py::TestTokenSource::test_unmatched_square_brackets
test_response_lexer.py::TestTokenSource::test_unmatched_square_brackets
self =
def test_unmatched_square_brackets(self):
message = "No closing ']'"
> self.check_error([b"["], message)
tests/test_response_lexer.py:66:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_lexer.py:94: in check_error
self.assertRaisesRegex(
tests/test_response_lexer.py:95: in
ValueError, expected_message, lambda: list(TokenSource(text_in))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
def __iter__(self) -> Iterator[bytes]:
for source in self.sources:
self.current_source = source
> for tok in self.read_token_stream(iter(source)):
E AttributeError: 'Lexer' object has no attribute 'read_token_stream'
imapclient/response_lexer.py:45: AttributeError
test_response_lexer.py::TestTokenSource::test_unterminated_strings
test_response_lexer.py::TestTokenSource::test_unterminated_strings
self =
def test_unterminated_strings(self):
message = "No closing '\"'"
> self.check_error([b'"'], message)
tests/test_response_lexer.py:31:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_lexer.py:94: in check_error
self.assertRaisesRegex(
tests/test_response_lexer.py:95: in
ValueError, expected_message, lambda: list(TokenSource(text_in))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
def __iter__(self) -> Iterator[bytes]:
for source in self.sources:
self.current_source = source
> for tok in self.read_token_stream(iter(source)):
E AttributeError: 'Lexer' object has no attribute 'read_token_stream'
imapclient/response_lexer.py:45: AttributeError
test_response_lexer.py::TestTokenSource::test_whitespace
test_response_lexer.py::TestTokenSource::test_whitespace
self =
def test_whitespace(self):
> self.check([b"abc def"], [b"abc", b"def"])
tests/test_response_lexer.py:21:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_lexer.py:91: in check
self.assertSequenceEqual(list(tokens), expected_out)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self =
def __iter__(self) -> Iterator[bytes]:
for source in self.sources:
self.current_source = source
> for tok in self.read_token_stream(iter(source)):
E AttributeError: 'Lexer' object has no attribute 'read_token_stream'
imapclient/response_lexer.py:45: AttributeError
test_response_parser.py::TestParseResponse::test_bad_literal
test_response_parser.py::TestParseResponse::test_bad_literal
self =
def test_bad_literal(self):
> self._test_parse_error(
[(b"{99}", b"abc")], "Expecting literal of size 99, got 3"
)
tests/test_response_parser.py:177:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_parser.py:196: in _test_parse_error
self.assertRaisesRegex(ProtocolError, expected_msg, parse_response, to_parse)
E AssertionError: ProtocolError not raised by parse_response
test_response_parser.py::TestParseResponse::test_bad_quoting
test_response_parser.py::TestParseResponse::test_bad_quoting
self =
def test_bad_quoting(self):
> self._test_parse_error(b'"abc next', """No closing '"'""")
tests/test_response_parser.py:182:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_parser.py:196: in _test_parse_error
self.assertRaisesRegex(ProtocolError, expected_msg, parse_response, to_parse)
E AssertionError: ProtocolError not raised by parse_response
test_response_parser.py::TestParseResponse::test_complex_mixed
test_response_parser.py::TestParseResponse::test_complex_mixed
self =
def test_complex_mixed(self):
> self._test(
b'((FOO "PLAIN" ("CHARSET" "US-ASCII") NIL NIL "7BIT" 1152 23) '
b'("TEXT" "PLAIN" ("CHARSET" "US-ASCII" "NAME" "cc.diff") '
b'"" "foo" "BASE64" 4554 73) "MIXED")',
(
(
b"FOO",
b"PLAIN",
(b"CHARSET", b"US-ASCII"),
None,
None,
b"7BIT",
1152,
23,
),
(
b"TEXT",
b"PLAIN",
(b"CHARSET", b"US-ASCII", b"NAME", b"cc.diff"),
b"",
b"foo",
b"BASE64",
4554,
73,
),
b"MIXED",
),
)
tests/test_response_parser.py:69:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_parser.py:191: in _test
self.assertSequenceEqual(output, expected)
E AssertionError: First sequence has no length. Non-sequence?
E - None
E + (((b'FOO', b'PLAIN', (b'CHARSET', b'US-ASCII'), None, None, b'7BIT', 1152, 23),
E + (b'TEXT',
E + b'PLAIN',
E + (b'CHARSET', b'US-ASCII', b'NAME', b'cc.diff'),
E + b'',
E + b'foo',
E + b'BASE64',
E + 4554,
E + 73),
E + b'MIXED'),)
test_response_parser.py::TestParseResponse::test_deeper_nest_tuple
test_response_parser.py::TestParseResponse::test_deeper_nest_tuple
self =
def test_deeper_nest_tuple(self):
> self._test(
b'(123 "foo" ((0 1 2) "more" NIL) 66)',
(123, b"foo", ((0, 1, 2), b"more", None), 66),
)
tests/test_response_parser.py:63:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_parser.py:191: in _test
self.assertSequenceEqual(output, expected)
E AssertionError: First sequence has no length. Non-sequence?
E - None
E + ((123, b'foo', ((0, 1, 2), b'more', None), 66),)
test_response_parser.py::TestParseResponse::test_empty_tuple
test_response_parser.py::TestParseResponse::test_empty_tuple
self =
def test_empty_tuple(self):
> self._test(b"()", ())
tests/test_response_parser.py:51:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_parser.py:191: in _test
self.assertSequenceEqual(output, expected)
E AssertionError: First sequence has no length. Non-sequence?
E - None
E + ((),)
test_response_parser.py::TestParseResponse::test_envelopey
test_response_parser.py::TestParseResponse::test_envelopey
self =
def test_envelopey(self):
> self._test(
b'(UID 5 ENVELOPE ("internal_date" "subject" '
b'(("name" NIL "address1" "domain1.com")) '
b'((NIL NIL "address2" "domain2.com")) '
b'(("name" NIL "address3" "domain3.com")) '
b'((NIL NIL "address4" "domain4.com")) '
b'NIL NIL "" ""))',
(
b"UID",
5,
b"ENVELOPE",
(
b"internal_date",
b"subject",
((b"name", None, b"address1", b"domain1.com"),),
((None, None, b"address2", b"domain2.com"),),
((b"name", None, b"address3", b"domain3.com"),),
((None, None, b"address4", b"domain4.com"),),
None,
None,
b"",
b"",
),
),
)
tests/test_response_parser.py:99:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_parser.py:191: in _test
self.assertSequenceEqual(output, expected)
E AssertionError: First sequence has no length. Non-sequence?
E - None
E + ((b'UID',
E + 5,
E + b'ENVELOPE',
E + (b'internal_date',
E + b'subject',
E + ((b'name', None, b'address1', b'domain1.com'),),
E + ((None, None, b'address2', b'domain2.com'),),
E + ((b'name', None, b'address3', b'domain3.com'),),
E + ((None, None, b'address4', b'domain4.com'),),
E + None,
E + None,
E + b'',
E + b'')),)
test_response_parser.py::TestParseResponse::test_envelopey_quoted
test_response_parser.py::TestParseResponse::test_envelopey_quoted
self =
def test_envelopey_quoted(self):
> self._test(
b'(UID 5 ENVELOPE ("internal_date" "subject with \\"quotes\\"" '
b'(("name" NIL "address1" "domain1.com")) '
b'((NIL NIL "address2" "domain2.com")) '
b'(("name" NIL "address3" "domain3.com")) '
b'((NIL NIL "address4" "domain4.com")) '
b'NIL NIL "" ""))',
(
b"UID",
5,
b"ENVELOPE",
(
b"internal_date",
b'subject with "quotes"',
((b"name", None, b"address1", b"domain1.com"),),
((None, None, b"address2", b"domain2.com"),),
((b"name", None, b"address3", b"domain3.com"),),
((None, None, b"address4", b"domain4.com"),),
None,
None,
b"",
b"",
),
),
)
tests/test_response_parser.py:126:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_parser.py:191: in _test
self.assertSequenceEqual(output, expected)
E AssertionError: First sequence has no length. Non-sequence?
E - None
E + ((b'UID',
E + 5,
E + b'ENVELOPE',
E + (b'internal_date',
E + b'subject with "quotes"',
E + ((b'name', None, b'address1', b'domain1.com'),),
E + ((None, None, b'address2', b'domain2.com'),),
E + ((b'name', None, b'address3', b'domain3.com'),),
E + ((None, None, b'address4', b'domain4.com'),),
E + None,
E + None,
E + b'',
E + b'')),)
test_response_parser.py::TestParseResponse::test_incomplete_tuple
test_response_parser.py::TestParseResponse::test_incomplete_tuple
self =
def test_incomplete_tuple(self):
> self._test_parse_error(b"abc (1 2", r'Tuple incomplete before "\(1 2"')
tests/test_response_parser.py:174:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_parser.py:196: in _test_parse_error
self.assertRaisesRegex(ProtocolError, expected_msg, parse_response, to_parse)
E AssertionError: ProtocolError not raised by parse_response
test_response_parser.py::TestParseResponse::test_int
test_response_parser.py::TestParseResponse::test_int
self =
def test_int(self):
> self._test(b"45", 45)
tests/test_response_parser.py:39:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_parser.py:191: in _test
self.assertSequenceEqual(output, expected)
E AssertionError: First sequence has no length. Non-sequence?
E - None
E + (45,)
test_response_parser.py::TestParseResponse::test_int_and_tuple
test_response_parser.py::TestParseResponse::test_int_and_tuple
self =
def test_int_and_tuple(self):
> self._test(b'1 (123 "foo")', (1, (123, b"foo")), wrap=False)
tests/test_response_parser.py:57:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_parser.py:191: in _test
self.assertSequenceEqual(output, expected)
E AssertionError: First sequence has no length. Non-sequence?
E - None
E + (1, (123, b'foo'))
test_response_parser.py::TestParseResponse::test_int_zero
test_response_parser.py::TestParseResponse::test_int_zero
self =
def test_int_zero(self):
> self._test(b"0", 0)
tests/test_response_parser.py:42:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_parser.py:191: in _test
self.assertSequenceEqual(output, expected)
E AssertionError: First sequence has no length. Non-sequence?
E - None
E + (0,)
test_response_parser.py::TestParseResponse::test_literal
test_response_parser.py::TestParseResponse::test_literal
self =
def test_literal(self):
literal_text = add_crlf(b"012\n" b"abc def XYZ\n")
> self._test([(b"{18}", literal_text)], literal_text)
tests/test_response_parser.py:154:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_parser.py:191: in _test
self.assertSequenceEqual(output, expected)
E AssertionError: First sequence has no length. Non-sequence?
E - None
E + (b'012\r\nabc def XYZ\r\n',)
test_response_parser.py::TestParseResponse::test_literal_with_more
test_response_parser.py::TestParseResponse::test_literal_with_more
self =
def test_literal_with_more(self):
literal_text = add_crlf(b"012\n" b"abc def XYZ\n")
response = [(b'(12 "foo" {18}', literal_text), b")"]
> self._test(response, (12, b"foo", literal_text))
tests/test_response_parser.py:159:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_parser.py:191: in _test
self.assertSequenceEqual(output, expected)
E AssertionError: First sequence has no length. Non-sequence?
E - None
E + ((12, b'foo', b'012\r\nabc def XYZ\r\n'),)
test_response_parser.py::TestParseResponse::test_nested_tuple
test_response_parser.py::TestParseResponse::test_nested_tuple
self =
def test_nested_tuple(self):
> self._test(b'(123 "foo" ("more" NIL) 66)', (123, b"foo", (b"more", None), 66))
tests/test_response_parser.py:60:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_parser.py:191: in _test
self.assertSequenceEqual(output, expected)
E AssertionError: First sequence has no length. Non-sequence?
E - None
E + ((123, b'foo', (b'more', None), 66),)
test_response_parser.py::TestParseResponse::test_nil
test_response_parser.py::TestParseResponse::test_nil
self =
def test_nil(self):
> self._test(b"NIL", None)
tests/test_response_parser.py:48:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_parser.py:191: in _test
self.assertSequenceEqual(output, expected)
E AssertionError: First sequence has no length. Non-sequence?
E - None
E + (None,)
test_response_parser.py::TestParseResponse::test_not_an_int
test_response_parser.py::TestParseResponse::test_not_an_int
self =
def test_not_an_int(self):
> self._test(b"0123", b"0123")
tests/test_response_parser.py:45:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_parser.py:191: in _test
self.assertSequenceEqual(output, expected)
E AssertionError: First sequence has no length. Non-sequence?
E - None
E + (b'0123',)
test_response_parser.py::TestParseResponse::test_quoted_specials
test_response_parser.py::TestParseResponse::test_quoted_specials
self =
def test_quoted_specials(self):
> self._test(rb'"\"foo bar\""', b'"foo bar"')
tests/test_response_parser.py:162:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_parser.py:191: in _test
self.assertSequenceEqual(output, expected)
E AssertionError: First sequence has no length. Non-sequence?
E - None
E + (b'"foo bar"',)
test_response_parser.py::TestParseResponse::test_square_brackets
test_response_parser.py::TestParseResponse::test_square_brackets
self =
def test_square_brackets(self):
> self._test(b"foo[bar rrr]", b"foo[bar rrr]")
tests/test_response_parser.py:167:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_parser.py:191: in _test
self.assertSequenceEqual(output, expected)
E AssertionError: First sequence has no length. Non-sequence?
E - None
E + (b'foo[bar rrr]',)
test_response_parser.py::TestParseResponse::test_string
test_response_parser.py::TestParseResponse::test_string
self =
def test_string(self):
> self._test(b'"TEST"', b"TEST")
tests/test_response_parser.py:36:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_parser.py:191: in _test
self.assertSequenceEqual(output, expected)
E AssertionError: First sequence has no length. Non-sequence?
E - None
E + (b'TEST',)
test_response_parser.py::TestParseResponse::test_tuple
test_response_parser.py::TestParseResponse::test_tuple
self =
def test_tuple(self):
> self._test(b'(123 "foo" GeE)', (123, b"foo", b"GeE"))
tests/test_response_parser.py:54:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_parser.py:191: in _test
self.assertSequenceEqual(output, expected)
E AssertionError: First sequence has no length. Non-sequence?
E - None
E + ((123, b'foo', b'GeE'),)
test_response_parser.py::TestParseResponse::test_unquoted
test_response_parser.py::TestParseResponse::test_unquoted
self =
def test_unquoted(self):
> self._test(b"FOO", b"FOO")
tests/test_response_parser.py:31:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_parser.py:191: in _test
self.assertSequenceEqual(output, expected)
E AssertionError: First sequence has no length. Non-sequence?
E - None
E + (b'FOO',)
test_response_parser.py::TestParseMessageList::test_basic
test_response_parser.py::TestParseMessageList::test_basic
self =
def test_basic(self):
out = parse_message_list([b"1 2 3"])
> self.assertSequenceEqual(out, [1, 2, 3])
E AssertionError: First sequence has no length. Non-sequence?
E - None
E + [1, 2, 3]
tests/test_response_parser.py:202: AssertionError
test_response_parser.py::TestParseMessageList::test_modseq
test_response_parser.py::TestParseMessageList::test_modseq
self =
def test_modseq(self):
out = parse_message_list([b"1 2 3 (modseq 999)"])
> self.assertSequenceEqual(out, [1, 2, 3])
E AssertionError: First sequence has no length. Non-sequence?
E - None
E + [1, 2, 3]
tests/test_response_parser.py:210: AssertionError
test_response_parser.py::TestParseMessageList::test_modseq_interleaved
test_response_parser.py::TestParseMessageList::test_modseq_interleaved
self =
def test_modseq_interleaved(self):
# Unlikely but test it anyway.
out = parse_message_list([b"1 2 (modseq 9) 3 4"])
> self.assertSequenceEqual(out, [1, 2, 3, 4])
E AssertionError: First sequence has no length. Non-sequence?
E - None
E + [1, 2, 3, 4]
tests/test_response_parser.py:221: AssertionError
test_response_parser.py::TestParseMessageList::test_modseq_no_space
test_response_parser.py::TestParseMessageList::test_modseq_no_space
self =
def test_modseq_no_space(self):
out = parse_message_list([b"1 2 3(modseq 999)"])
> self.assertSequenceEqual(out, [1, 2, 3])
E AssertionError: First sequence has no length. Non-sequence?
E - None
E + [1, 2, 3]
tests/test_response_parser.py:215: AssertionError
test_response_parser.py::TestParseMessageList::test_one_id
test_response_parser.py::TestParseMessageList::test_one_id
self =
def test_one_id(self):
> self.assertSequenceEqual(parse_message_list([b"4"]), [4])
E AssertionError: First sequence has no length. Non-sequence?
E - None
E + [4]
tests/test_response_parser.py:206: AssertionError
test_response_parser.py::TestParseFetchResponse::test_Address_str_ignores_encoding_error
test_response_parser.py::TestParseFetchResponse::test_Address_str_ignores_encoding_error
/usr/lib/python3.10/unittest/mock.py:1376: in patched
with self.decoration_helper(patched,
/usr/lib/python3.10/contextlib.py:135: in __enter__
return next(self.gen)
/usr/lib/python3.10/unittest/mock.py:1358: in decoration_helper
arg = exit_stack.enter_context(patching)
/usr/lib/python3.10/contextlib.py:492: in enter_context
result = _cm_type.__enter__(cm)
/usr/lib/python3.10/unittest/mock.py:1431: in __enter__
self.target = self.getter()
/usr/lib/python3.10/unittest/mock.py:1618: in
getter = lambda: _importer(target)
/usr/lib/python3.10/unittest/mock.py:1257: in _importer
thing = __import__(import_path)
imapclient/__init__.py:8: in
from .imapclient import * # noqa: F401,F403
imapclient/imapclient.py:1707: in
class _dict_bytes_normaliser:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
class _dict_bytes_normaliser:
"""Wrap a dict with unicode/bytes keys and normalise the keys to
bytes.
"""
def __init__(self, d):
self._d = d
> items = iteritems
E NameError: name 'iteritems' is not defined
imapclient/imapclient.py:1714: NameError
test_response_parser.py::TestParseFetchResponse::test_BODY
test_response_parser.py::TestParseFetchResponse::test_BODY
self =
def test_BODY(self):
> self.check_BODYish_single_part(b"BODY")
tests/test_response_parser.py:327:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_parser.py:342: in check_BODYish_single_part
self.assertEqual(
E AssertionError: None != {317: {b'BODY': (b'TEXT', b'PLAIN', (b'CH[59 chars]123}}
test_response_parser.py::TestParseFetchResponse::test_BODYSTRUCTURE
test_response_parser.py::TestParseFetchResponse::test_BODYSTRUCTURE
self =
def test_BODYSTRUCTURE(self):
> self.check_BODYish_single_part(b"BODYSTRUCTURE")
tests/test_response_parser.py:332:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_response_parser.py:342: in check_BODYish_single_part
self.assertEqual(
E AssertionError: None != {317: {b'BODYSTRUCTURE': (b'TEXT', b'PLAI[68 chars]123}}
test_response_parser.py::TestParseFetchResponse::test_BODY_HEADER_FIELDS
test_response_parser.py::TestParseFetchResponse::test_BODY_HEADER_FIELDS
self =
def test_BODY_HEADER_FIELDS(self):
header_text = b"Subject: A subject\r\nFrom: Some one \r\n\r\n"
> self.assertEqual(
parse_fetch_response(
[
(
b"123 (UID 31710 BODY[HEADER.FIELDS (from subject)] {57}",
header_text,
),
b")",
]
),
{31710: {b"BODY[HEADER.FIELDS (FROM SUBJECT)]": header_text, b"SEQ": 123}},
)
E AssertionError: None != {31710: {b'BODY[HEADER.FIELDS (FROM SUBJE[83 chars]123}}
tests/test_response_parser.py:313: AssertionError
test_response_parser.py::TestParseFetchResponse::test_ENVELOPE
test_response_parser.py::TestParseFetchResponse::test_ENVELOPE
self =
def test_ENVELOPE(self):
envelope_str = (
b"1 (ENVELOPE ( "
b'"Sun, 24 Mar 2013 22:06:10 +0200" '
b'"subject" '
b'(("name" NIL "address1" "domain1.com")) ' # from (name and address)
b'((NIL NIL "address2" "domain2.com")) ' # sender (just address)
b'(("name" NIL "address3" "domain3.com") NIL) ' # reply to
b"NIL" # to (no address)
b'((NIL NIL "address4" "domain4.com") ' # cc
b'("person" NIL "address4b" "domain4b.com")) '
b"NIL " # bcc
b'"" '
b'""))'
)
output = parse_fetch_response([envelope_str], normalise_times=False)
self.assertEqual(
> output[1][b"ENVELOPE"],
Envelope(
datetime(2013, 3, 24, 22, 6, 10, tzinfo=FixedOffset(120)),
b"subject",
(Address(b"name", None, b"address1", b"domain1.com"),),
(Address(None, None, b"address2", b"domain2.com"),),
(Address(b"name", None, b"address3", b"domain3.com"),),
None,
(
Address(None, None, b"address4", b"domain4.com"),
Address(b"person", None, b"address4b", b"domain4b.com"),
),
None,
b"",
b"",
),
)
E TypeError: 'NoneType' object is not subscriptable
tests/test_response_parser.py:515: TypeError
test_response_parser.py::TestParseFetchResponse::test_ENVELOPE_with_empty_addresses
test_response_parser.py::TestParseFetchResponse::test_ENVELOPE_with_empty_addresses
self =
def test_ENVELOPE_with_empty_addresses(self):
envelope_str = (
b"1 (ENVELOPE ( "
b"NIL "
b'"subject" '
b'(("name" NIL "address1" "domain1.com") NIL) '
b'(NIL (NIL NIL "address2" "domain2.com")) '
b'(("name" NIL "address3" "domain3.com") NIL ("name" NIL "address3b" "domain3b.com")) '
b"NIL"
b'((NIL NIL "address4" "domain4.com") '
b'("person" NIL "address4b" "domain4b.com")) '
b'NIL "" ""))'
)
output = parse_fetch_response([envelope_str], normalise_times=False)
self.assertEqual(
> output[1][b"ENVELOPE"],
Envelope(
None,
b"subject",
(Address(b"name", None, b"address1", b"domain1.com"),),
(Address(None, None, b"address2", b"domain2.com"),),
(
Address(b"name", None, b"address3", b"domain3.com"),
Address(b"name", None, b"address3b", b"domain3b.com"),
),
None,
(
Address(None, None, b"address4", b"domain4.com"),
Address(b"person", None, b"address4b", b"domain4b.com"),
),
None,
b"",
b"",
),
)
E TypeError: 'NoneType' object is not subscriptable
tests/test_response_parser.py:610: TypeError
test_response_parser.py::TestParseFetchResponse::test_ENVELOPE_with_invalid_date
test_response_parser.py::TestParseFetchResponse::test_ENVELOPE_with_invalid_date
self =
def test_ENVELOPE_with_invalid_date(self):
envelope_str = (
b"1 (ENVELOPE ( "
b'"wtf" ' # bad date
b'"subject" '
b"NIL NIL NIL NIL NIL NIL "
b'"" ""))'
)
output = parse_fetch_response([envelope_str], normalise_times=False)
self.assertEqual(
> output[1][b"ENVELOPE"],
Envelope(
None,
b"subject",
None,
None,
None,
None,
None,
None,
b"",
b"",
),
)
E TypeError: 'NoneType' object is not subscriptable
tests/test_response_parser.py:578: TypeError
test_response_parser.py::TestParseFetchResponse::test_ENVELOPE_with_no_date
test_response_parser.py::TestParseFetchResponse::test_ENVELOPE_with_no_date
self =
def test_ENVELOPE_with_no_date(self):
envelope_str = (
b"1 (ENVELOPE ( "
b"NIL "
b'"subject" '
b"NIL "
b"NIL "
b"NIL "
b"NIL "
b"NIL "
b"NIL "
b'"" '
b'""))'
)
output = parse_fetch_response([envelope_str], normalise_times=False)
self.assertEqual(
> output[1][b"ENVELOPE"],
Envelope(
None,
b"subject",
None,
None,
None,
None,
None,
None,
b"",
b"",
),
)
E TypeError: 'NoneType' object is not subscriptable
tests/test_response_parser.py:551: TypeError
test_response_parser.py::TestParseFetchResponse::test_FLAGS
test_response_parser.py::TestParseFetchResponse::test_FLAGS
self =
def test_FLAGS(self):
> self.assertEqual(
parse_fetch_response([rb"23 (FLAGS (\Seen Stuff))"]),
{23: {b"SEQ": 23, b"FLAGS": (rb"\Seen", b"Stuff")}},
)
E AssertionError: None != {23: {b'SEQ': 23, b'FLAGS': (b'\\Seen', b'Stuff')}}
tests/test_response_parser.py:265: AssertionError
test_response_parser.py::TestParseFetchResponse::test_INTERNALDATE
test_response_parser.py::TestParseFetchResponse::test_INTERNALDATE
self =
def test_INTERNALDATE(self):
out = parse_fetch_response(
[b'1 (INTERNALDATE " 9-Feb-2007 17:08:08 -0430")'], normalise_times=False
)
self.assertEqual(
> out[1][b"INTERNALDATE"],
datetime(2007, 2, 9, 17, 8, 8, 0, FixedOffset(-4 * 60 - 30)),
)
E TypeError: 'NoneType' object is not subscriptable
tests/test_response_parser.py:636: TypeError
test_response_parser.py::TestParseFetchResponse::test_INTERNALDATE_NIL
test_response_parser.py::TestParseFetchResponse::test_INTERNALDATE_NIL
self =
def test_INTERNALDATE_NIL(self):
out = parse_fetch_response([b"1 (INTERNALDATE NIL)"])
> self.assertEqual(out[1][b"INTERNALDATE"], None)
E TypeError: 'NoneType' object is not subscriptable
tests/test_response_parser.py:653: TypeError
test_response_parser.py::TestParseFetchResponse::test_INTERNALDATE_normalised
test_response_parser.py::TestParseFetchResponse::test_INTERNALDATE_normalised
self =
def test_INTERNALDATE_normalised(self):
output = parse_fetch_response(
[b'3 (INTERNALDATE " 9-Feb-2007 17:08:08 -0430")']
)
> dt = output[3][b"INTERNALDATE"]
E TypeError: 'NoneType' object is not subscriptable
tests/test_response_parser.py:644: TypeError
test_response_parser.py::TestParseFetchResponse::test_UID
test_response_parser.py::TestParseFetchResponse::test_UID
self =
def test_UID(self):
> self.assertEqual(parse_fetch_response([b"23 (UID 76)"]), {76: {b"SEQ": 23}})
E AssertionError: None != {76: {b'SEQ': 23}}
tests/test_response_parser.py:252: AssertionError
test_response_parser.py::TestParseFetchResponse::test_bad_UID
test_response_parser.py::TestParseFetchResponse::test_bad_UID
self =
def test_bad_UID(self):
> self.assertRaises(ProtocolError, parse_fetch_response, [b"(UID X)"])
E AssertionError: ProtocolError not raised by parse_fetch_response
tests/test_response_parser.py:262: AssertionError
test_response_parser.py::TestParseFetchResponse::test_bad_data
test_response_parser.py::TestParseFetchResponse::test_bad_data
self =
def test_bad_data(self):
> self.assertRaises(ProtocolError, parse_fetch_response, [b"2 WHAT"])
E AssertionError: ProtocolError not raised by parse_fetch_response
tests/test_response_parser.py:236: AssertionError
test_response_parser.py::TestParseFetchResponse::test_bad_msgid
test_response_parser.py::TestParseFetchResponse::test_bad_msgid
self =
def test_bad_msgid(self):
> self.assertRaises(ProtocolError, parse_fetch_response, [b"abc ()"])
E AssertionError: ProtocolError not raised by parse_fetch_response
tests/test_response_parser.py:233: AssertionError
test_response_parser.py::TestParseFetchResponse::test_basic
test_response_parser.py::TestParseFetchResponse::test_basic
self =
def test_basic(self):
> self.assertEqual(parse_fetch_response([b"4 ()"]), {4: {b"SEQ": 4}})
E AssertionError: None != {4: {b'SEQ': 4}}
tests/test_response_parser.py:227: AssertionError
test_response_parser.py::TestParseFetchResponse::test_literals
test_response_parser.py::TestParseFetchResponse::test_literals
self =
def test_literals(self):
> self.assertEqual(
parse_fetch_response(
[
(b"1 (RFC822.TEXT {4}", b"body"),
(b" RFC822 {21}", b"Subject: test\r\n\r\nbody"),
b")",
]
),
{
1: {
b"RFC822.TEXT": b"body",
b"RFC822": b"Subject: test\r\n\r\nbody",
b"SEQ": 1,
}
},
)
E AssertionError: None != {1: {b'RFC822.TEXT': b'body', b'RFC822': [36 chars]: 1}}
tests/test_response_parser.py:288: AssertionError
test_response_parser.py::TestParseFetchResponse::test_literals_and_keys_with_square_brackets
test_response_parser.py::TestParseFetchResponse::test_literals_and_keys_with_square_brackets
self =
def test_literals_and_keys_with_square_brackets(self):
> self.assertEqual(
parse_fetch_response([(b"1 (BODY[TEXT] {11}", b"Hi there.\r\n"), b")"]),
{1: {b"BODY[TEXT]": b"Hi there.\r\n", b"SEQ": 1}},
)
E AssertionError: None != {1: {b'BODY[TEXT]': b'Hi there.\r\n', b'SEQ': 1}}
tests/test_response_parser.py:306: AssertionError
test_response_parser.py::TestParseFetchResponse::test_missing_data
test_response_parser.py::TestParseFetchResponse::test_missing_data
self =
def test_missing_data(self):
> self.assertRaises(ProtocolError, parse_fetch_response, [b"2"])
E AssertionError: ProtocolError not raised by parse_fetch_response
tests/test_response_parser.py:239: AssertionError
test_response_parser.py::TestParseFetchResponse::test_mixed_types
test_response_parser.py::TestParseFetchResponse::test_mixed_types
self =
def test_mixed_types(self):
self.assertEqual(
parse_fetch_response(
[
(
b'1 (INTERNALDATE " 9-Feb-2007 17:08:08 +0100" RFC822 {21}',
b"Subject: test\r\n\r\nbody",
),
b")",
]
),
{
1: {
> b"INTERNALDATE": datetime_to_native(
datetime(2007, 2, 9, 17, 8, 8, 0, FixedOffset(60))
),
b"RFC822": b"Subject: test\r\n\r\nbody",
b"SEQ": 1,
}
},
)
tests/test_response_parser.py:668:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dt = datetime.datetime(2007, 2, 9, 17, 8, 8, tzinfo=)
def datetime_to_native(dt: datetime) -> datetime:
"""Convert a timezone-aware datetime to a naive datetime in the local timezone."""
if dt.tzinfo is None:
return dt
> return dt.astimezone(FixedOffset.for_system()).replace(tzinfo=None)
E NotImplementedError: a tzinfo subclass must implement utcoffset()
imapclient/datetime_util.py:11: NotImplementedError
test_response_parser.py::TestParseFetchResponse::test_multiple_messages
test_response_parser.py::TestParseFetchResponse::test_multiple_messages
self =
def test_multiple_messages(self):
> self.assertEqual(
parse_fetch_response([b"2 (FLAGS (Foo Bar)) ", b"7 (FLAGS (Baz Sneeve))"]),
{
2: {b"FLAGS": (b"Foo", b"Bar"), b"SEQ": 2},
7: {b"FLAGS": (b"Baz", b"Sneeve"), b"SEQ": 7},
},
)
E AssertionError: None != {2: {b'FLAGS': (b'Foo', b'Bar'), b'SEQ': [45 chars]: 7}}
tests/test_response_parser.py:271: AssertionError
test_response_parser.py::TestParseFetchResponse::test_none_special_case
test_response_parser.py::TestParseFetchResponse::test_none_special_case
self =
def test_none_special_case(self):
> self.assertEqual(parse_fetch_response([None]), {})
E AssertionError: None != {}
tests/test_response_parser.py:230: AssertionError
test_response_parser.py::TestParseFetchResponse::test_not_uid_is_key
test_response_parser.py::TestParseFetchResponse::test_not_uid_is_key
self =
def test_not_uid_is_key(self):
> self.assertEqual(
parse_fetch_response([b"23 (UID 76)"], uid_is_key=False),
{23: {b"UID": 76, b"SEQ": 23}},
)
E AssertionError: None != {23: {b'UID': 76, b'SEQ': 23}}
tests/test_response_parser.py:256: AssertionError
test_response_parser.py::TestParseFetchResponse::test_odd_pairs
test_response_parser.py::TestParseFetchResponse::test_odd_pairs
self =
def test_odd_pairs(self):
> self.assertRaises(ProtocolError, parse_fetch_response, [b"(ONE)"])
E AssertionError: ProtocolError not raised by parse_fetch_response
tests/test_response_parser.py:248: AssertionError
test_response_parser.py::TestParseFetchResponse::test_partial_fetch
test_response_parser.py::TestParseFetchResponse::test_partial_fetch
self =
def test_partial_fetch(self):
body = b"01234567890123456789"
> self.assertEqual(
parse_fetch_response([(b"123 (UID 367 BODY[]<0> {20}", body), b")"]),
{367: {b"BODY[]<0>": body, b"SEQ": 123}},
)
E AssertionError: None != {367: {b'BODY[]<0>': b'01234567890123456789', b'SEQ': 123}}
tests/test_response_parser.py:491: AssertionError
test_response_parser.py::TestParseFetchResponse::test_same_message_appearing_multiple_times
test_response_parser.py::TestParseFetchResponse::test_same_message_appearing_multiple_times
self =
def test_same_message_appearing_multiple_times(self):
# This can occur when server sends unsolicited FETCH responses
# (e.g. RFC 4551)
> self.assertEqual(
parse_fetch_response([b"2 (FLAGS (Foo Bar)) ", b"2 (MODSEQ 4)"]),
{2: {b"FLAGS": (b"Foo", b"Bar"), b"SEQ": 2, b"MODSEQ": 4}},
)
E AssertionError: None != {2: {b'FLAGS': (b'Foo', b'Bar'), b'SEQ': 2, b'MODSEQ': 4}}
tests/test_response_parser.py:282: AssertionError
test_response_parser.py::TestParseFetchResponse::test_simple_pairs
test_response_parser.py::TestParseFetchResponse::test_simple_pairs
self =
def test_simple_pairs(self):
> self.assertEqual(
parse_fetch_response([b'23 (ABC 123 StUfF "hello")']),
{23: {b"ABC": 123, b"STUFF": b"hello", b"SEQ": 23}},
)
E AssertionError: None != {23: {b'ABC': 123, b'STUFF': b'hello', b'SEQ': 23}}
tests/test_response_parser.py:242: AssertionError
Patch diff
diff --git a/imapclient/datetime_util.py b/imapclient/datetime_util.py
index 73dc91d..a233545 100644
--- a/imapclient/datetime_util.py
+++ b/imapclient/datetime_util.py
@@ -4,6 +4,12 @@ from email.utils import parsedate_tz
from .fixed_offset import FixedOffset
_SHORT_MONTHS = ' Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split(' ')
+def datetime_to_native(dt: datetime) -> datetime:
+ """Convert a timezone-aware datetime to a naive datetime in the local timezone."""
+ if dt.tzinfo is None:
+ return dt
+ return dt.astimezone(FixedOffset.for_system()).replace(tzinfo=None)
+
def parse_to_datetime(timestamp: bytes, normalise: bool=True) -> datetime:
"""Convert an IMAP datetime string to a datetime.
@@ -13,7 +19,55 @@ def parse_to_datetime(timestamp: bytes, normalise: bool=True) -> datetime:
If normalise is False, then the returned datetime will be
unadjusted but will contain timezone information as per the input.
"""
- pass
+ timestamp = timestamp.decode('ascii')
+
+ # Handle dotted time format
+ if _rfc822_dotted_time.match(timestamp):
+ timestamp = timestamp.replace('.', ':')
+
+ # Try RFC822 format first
+ parsed = parsedate_tz(timestamp)
+ if parsed:
+ tz_offset = parsed[-1]
+ if tz_offset is None:
+ tz = None
+ else:
+ tz = FixedOffset(tz_offset // 60)
+ dt = datetime(*parsed[:6], tzinfo=tz)
+ else:
+ # Try INTERNALDATE format
+ timestamp = timestamp.strip()
+ parts = timestamp.split(' ')
+ if len(parts) != 3:
+ raise ValueError(f'Invalid timestamp format: {timestamp}')
+
+ date_parts = parts[0].split('-')
+ if len(date_parts) != 3:
+ raise ValueError(f'Invalid date format: {parts[0]}')
+
+ day = int(date_parts[0])
+ month = _SHORT_MONTHS.index(date_parts[1])
+ year = int(date_parts[2])
+
+ time_parts = parts[1].split(':')
+ if len(time_parts) != 3:
+ raise ValueError(f'Invalid time format: {parts[1]}')
+
+ hour = int(time_parts[0])
+ minute = int(time_parts[1])
+ second = int(time_parts[2])
+
+ tz_str = parts[2]
+ tz_sign = 1 if tz_str[0] == '+' else -1
+ tz_hour = int(tz_str[1:3])
+ tz_min = int(tz_str[3:5])
+ tz = FixedOffset(tz_sign * (tz_hour * 60 + tz_min))
+
+ dt = datetime(year, month, day, hour, minute, second, tzinfo=tz)
+
+ if normalise and dt.tzinfo is not None:
+ return datetime_to_native(dt)
+ return dt
def datetime_to_INTERNALDATE(dt: datetime) -> str:
"""Convert a datetime instance to a IMAP INTERNALDATE string.
@@ -21,9 +75,17 @@ def datetime_to_INTERNALDATE(dt: datetime) -> str:
If timezone information is missing the current system
timezone is used.
"""
- pass
+ if dt.tzinfo is None:
+ dt = dt.replace(tzinfo=FixedOffset.for_system())
+
+ sign = '+' if dt.utcoffset().total_seconds() >= 0 else '-'
+ offset_mins = abs(int(dt.utcoffset().total_seconds() / 60))
+ offset_hrs = offset_mins // 60
+ offset_mins = offset_mins % 60
+
+ return f"{dt.day:02d}-{_SHORT_MONTHS[dt.month]}-{dt.year:04d} {dt.hour:02d}:{dt.minute:02d}:{dt.second:02d} {sign}{offset_hrs:02d}{offset_mins:02d}"
_rfc822_dotted_time = re.compile('\\w+, ?\\d{1,2} \\w+ \\d\\d(\\d\\d)? \\d\\d?\\.\\d\\d?\\.\\d\\d?.*')
def format_criteria_date(dt: datetime) -> bytes:
"""Format a date or datetime instance for use in IMAP search criteria."""
- pass
\ No newline at end of file
+ return f"{dt.day:02d}-{_SHORT_MONTHS[dt.month]}-{dt.year:04d}".encode('ascii')
\ No newline at end of file
diff --git a/imapclient/imapclient.py b/imapclient/imapclient.py
index 43c0b10..d23be18 100644
--- a/imapclient/imapclient.py
+++ b/imapclient/imapclient.py
@@ -106,7 +106,15 @@ class Quota:
def require_capability(capability):
"""Decorator raising CapabilityError when a capability is not available."""
- pass
+ def actual_decorator(func):
+ def wrapper(client, *args, **kwargs):
+ if not client.has_capability(capability):
+ raise exceptions.CapabilityError(
+ f"Server does not support {capability} capability"
+ )
+ return func(client, *args, **kwargs)
+ return wrapper
+ return actual_decorator
class IMAPClient:
"""A connection to the IMAP server specified by *host* is made when
@@ -223,7 +231,7 @@ class IMAPClient:
This includes reading from and writing to the socket,
as they are likely to break internal bookkeeping of messages.
"""
- pass
+ return self._imap.sock
@require_capability('STARTTLS')
def starttls(self, ssl_context=None):
@@ -243,13 +251,34 @@ class IMAPClient:
Raises :py:exc:`AbortError` if the server does not support STARTTLS
or an SSL connection is already established.
"""
- pass
+ if self.ssl or self._starttls_done:
+ raise self.AbortError('TLS session already established')
+
+ if ssl_context is None:
+ ssl_context = tls.create_default_context()
+
+ typ, data = self._imap._simple_command('STARTTLS')
+ if typ != 'OK':
+ raise self.Error('STARTTLS failed: %s' % data[0].decode('ascii'))
+
+ self._starttls_done = True
+ self._imap.sock = ssl_context.wrap_socket(self._imap.sock,
+ server_hostname=self.host)
def login(self, username: str, password: str):
"""Login using *username* and *password*, returning the
server response.
"""
- pass
+ try:
+ typ, data = self._imap.login(username, password)
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ self._cached_capabilities = None
+ return data[0].decode('ascii')
def oauth2_login(self, user: str, access_token: str, mech: str='XOAUTH2', vendor: Optional[str]=None):
"""Authenticate using the OAUTH2 or XOAUTH2 methods.
@@ -257,7 +286,21 @@ class IMAPClient:
Gmail and Yahoo both support the 'XOAUTH2' mechanism, but Yahoo requires
the 'vendor' portion in the payload.
"""
- pass
+ auth_string = f'user={user}\1auth=Bearer {access_token}\1'
+ if vendor:
+ auth_string += f'vendor={vendor}\1'
+ auth_string += '\1'
+
+ try:
+ typ, data = self._imap.authenticate(mech, lambda _: auth_string.encode())
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ self._cached_capabilities = None
+ return data[0].decode('ascii')
def oauthbearer_login(self, identity, access_token):
"""Authenticate using the OAUTHBEARER method.
@@ -265,11 +308,35 @@ class IMAPClient:
This is supported by Gmail and is meant to supersede the non-standard
'OAUTH2' and 'XOAUTH2' mechanisms.
"""
- pass
+ auth_string = f'n,a={identity},\1auth=Bearer {access_token}\1\1'
+
+ try:
+ typ, data = self._imap.authenticate('OAUTHBEARER', lambda _: auth_string.encode())
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ self._cached_capabilities = None
+ return data[0].decode('ascii')
def plain_login(self, identity, password, authorization_identity=None):
"""Authenticate using the PLAIN method (requires server support)."""
- pass
+ if authorization_identity is None:
+ authorization_identity = ''
+ auth_string = f'{authorization_identity}\0{identity}\0{password}'
+
+ try:
+ typ, data = self._imap.authenticate('PLAIN', lambda _: auth_string.encode())
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ self._cached_capabilities = None
+ return data[0].decode('ascii')
def sasl_login(self, mech_name, mech_callable):
"""Authenticate using a provided SASL mechanism (requires server support).
@@ -320,11 +387,35 @@ class IMAPClient:
imap.sasl_login("SCRAM-SHA-256", scram_mech)
"""
- pass
+ def encode_response(response):
+ if isinstance(response, str):
+ return response.encode()
+ return response
+
+ try:
+ typ, data = self._imap.authenticate(mech_name, lambda x: encode_response(mech_callable(x)))
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ self._cached_capabilities = None
+ return data[0].decode('ascii')
def logout(self):
"""Logout, returning the server response."""
- pass
+ try:
+ typ, data = self._imap.logout()
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ self.shutdown()
+
+ if typ != 'BYE':
+ raise self.Error(data[0].decode('ascii'))
+
+ return data[0].decode('ascii')
def shutdown(self) -> None:
"""Close the connection to the IMAP server (without logging out)
@@ -332,7 +423,18 @@ class IMAPClient:
In most cases, :py:meth:`.logout` should be used instead of
this. The logout method also shutdown down the connection.
"""
- pass
+ if self._imap.sock is not None:
+ try:
+ self._imap.sock.shutdown(socket.SHUT_RDWR)
+ except Exception:
+ pass
+ finally:
+ try:
+ self._imap.sock.close()
+ except Exception:
+ pass
+ self._imap.sock = None
+ self._cached_capabilities = None
@require_capability('ENABLE')
def enable(self, *capabilities):
@@ -351,7 +453,25 @@ class IMAPClient:
See :rfc:`5161` for more details.
"""
- pass
+ if not capabilities:
+ raise self.Error('No arguments for ENABLE command')
+
+ try:
+ typ, data = self._imap._simple_command('ENABLE', *capabilities)
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ if len(data) != 1:
+ raise self.Error('Invalid ENABLE response')
+
+ response = data[0].decode('ascii')
+ if not response.startswith('* ENABLED '):
+ raise self.Error('Invalid ENABLE response')
+
+ return response[10:].split()
@require_capability('ID')
def id_(self, parameters=None):
@@ -361,7 +481,42 @@ class IMAPClient:
*parameters* should be specified as a dictionary of field/value pairs,
for example: ``{"name": "IMAPClient", "version": "0.12"}``
"""
- pass
+ if parameters is None:
+ args = ['NIL']
+ else:
+ if not isinstance(parameters, dict):
+ raise TypeError('parameters must be a dict or None')
+ args = []
+ for key, value in parameters.items():
+ args.extend([to_bytes(key), to_bytes(value)])
+
+ try:
+ typ, data = self._imap._simple_command('ID', '(' + ' '.join(args) + ')')
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ if len(data) != 1:
+ raise self.Error('Invalid ID response')
+
+ response = data[0].decode('ascii')
+ if not response.startswith('* ID '):
+ raise self.Error('Invalid ID response')
+
+ response = response[5:]
+ if response == 'NIL':
+ return {}
+
+ if not (response.startswith('(') and response.endswith(')')):
+ raise self.Error('Invalid ID response')
+
+ response = response[1:-1].split()
+ if len(response) % 2 != 0:
+ raise self.Error('Invalid ID response')
+
+ return dict(zip(response[::2], response[1::2]))
def capabilities(self):
"""Returns the server capability list.
@@ -375,11 +530,29 @@ class IMAPClient:
If the session is not yet authenticated, the capabilities
requested at connection time will be returned.
"""
- pass
+ if self._cached_capabilities is None:
+ try:
+ typ, data = self._imap.capability()
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ if len(data) != 1:
+ raise self.Error('Invalid CAPABILITY response')
+
+ response = data[0].decode('ascii')
+ if not response.startswith('* CAPABILITY '):
+ raise self.Error('Invalid CAPABILITY response')
+
+ self._cached_capabilities = response[13:].split()
+
+ return self._cached_capabilities
def has_capability(self, capability):
"""Return ``True`` if the IMAP server has the given *capability*."""
- pass
+ return capability.upper() in self.capabilities()
@require_capability('NAMESPACE')
def namespace(self):
@@ -395,7 +568,63 @@ class IMAPClient:
See :rfc:`2342` for more details.
"""
- pass
+ try:
+ typ, data = self._imap.namespace()
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ if len(data) != 1:
+ raise self.Error('Invalid NAMESPACE response')
+
+ response = data[0].decode('ascii')
+ if not response.startswith('* NAMESPACE '):
+ raise self.Error('Invalid NAMESPACE response')
+
+ response = response[12:]
+ if not (response.startswith('(') and response.endswith(')')):
+ raise self.Error('Invalid NAMESPACE response')
+
+ def parse_namespace(ns_str):
+ if ns_str == 'NIL':
+ return None
+ if not (ns_str.startswith('(') and ns_str.endswith(')')):
+ raise self.Error('Invalid NAMESPACE response')
+ ns_str = ns_str[1:-1]
+ namespaces = []
+ while ns_str:
+ if not (ns_str.startswith('(') and ')' in ns_str):
+ raise self.Error('Invalid NAMESPACE response')
+ ns_end = ns_str.index(')')
+ ns_part = ns_str[1:ns_end]
+ parts = ns_part.split()
+ if len(parts) != 2:
+ raise self.Error('Invalid NAMESPACE response')
+ prefix = parts[0].strip('"')
+ separator = parts[1].strip('"')
+ namespaces.append((prefix, separator))
+ ns_str = ns_str[ns_end + 1:].lstrip()
+ return namespaces
+
+ response = response[1:-1]
+ parts = []
+ while response:
+ if response.startswith('NIL'):
+ parts.append(None)
+ response = response[3:].lstrip()
+ else:
+ if not (response.startswith('(') and ')' in response):
+ raise self.Error('Invalid NAMESPACE response')
+ end = response.index(')')
+ parts.append(parse_namespace(response[:end + 1]))
+ response = response[end + 1:].lstrip()
+
+ if len(parts) != 3:
+ raise self.Error('Invalid NAMESPACE response')
+
+ return Namespace(*parts)
def list_folders(self, directory='', pattern='*'):
"""Get a listing of folders on the server as a list of
@@ -418,7 +647,28 @@ class IMAPClient:
decoded from modified UTF-7, except if folder_decode is not
set.
"""
- pass
+ if self.folder_encode:
+ directory = encode_utf7(directory)
+ pattern = encode_utf7(pattern)
+
+ try:
+ typ, data = self._imap.list(directory, pattern)
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ ret = []
+ for response in data:
+ if not response:
+ continue
+ flags, delimiter, name = self._parse_list_response(response)
+ if self.folder_encode:
+ name = decode_utf7(name)
+ ret.append((flags, delimiter, name))
+
+ return ret
@require_capability('XLIST')
def xlist_folders(self, directory='', pattern='*'):
@@ -450,7 +700,28 @@ class IMAPClient:
The *directory* and *pattern* arguments are as per
list_folders().
"""
- pass
+ if self.folder_encode:
+ directory = encode_utf7(directory)
+ pattern = encode_utf7(pattern)
+
+ try:
+ typ, data = self._imap.xlist(directory, pattern)
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ ret = []
+ for response in data:
+ if not response:
+ continue
+ flags, delimiter, name = self._parse_list_response(response)
+ if self.folder_encode:
+ name = decode_utf7(name)
+ ret.append((flags, delimiter, name))
+
+ return ret
def list_sub_folders(self, directory='', pattern='*'):
"""Return a list of subscribed folders on the server as
@@ -459,7 +730,28 @@ class IMAPClient:
The default behaviour will list all subscribed folders. The
*directory* and *pattern* arguments are as per list_folders().
"""
- pass
+ if self.folder_encode:
+ directory = encode_utf7(directory)
+ pattern = encode_utf7(pattern)
+
+ try:
+ typ, data = self._imap.lsub(directory, pattern)
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ ret = []
+ for response in data:
+ if not response:
+ continue
+ flags, delimiter, name = self._parse_list_response(response)
+ if self.folder_encode:
+ name = decode_utf7(name)
+ ret.append((flags, delimiter, name))
+
+ return ret
def find_special_folder(self, folder_flag):
"""Try to locate a special folder, like the Sent or Trash folder.
@@ -473,7 +765,30 @@ class IMAPClient:
Returns the name of the folder if found, or None otherwise.
"""
- pass
+ if self.has_capability('XLIST'):
+ folders = self.xlist_folders()
+ else:
+ folders = self.list_folders()
+
+ # First try to find a folder with the matching flag
+ for flags, delimiter, name in folders:
+ if folder_flag in flags:
+ return name
+
+ # No folder with the matching flag found, try popular namespaces
+ if folder_flag in _POPULAR_SPECIAL_FOLDERS:
+ # Get all folder names in a set for faster lookup
+ folder_names = {name.lower() for _, _, name in folders}
+
+ # Try to find a folder with a popular name
+ for folder_name in _POPULAR_SPECIAL_FOLDERS[folder_flag]:
+ # Try with different namespace prefixes
+ for prefix, suffix in _POPULAR_PERSONAL_NAMESPACES:
+ test_name = prefix + folder_name + suffix
+ if test_name.lower() in folder_names:
+ return test_name
+
+ return None
def select_folder(self, folder, readonly=False):
"""Set the current folder on the server.
@@ -493,7 +808,41 @@ class IMAPClient:
b'UIDNEXT': 11,
b'UIDVALIDITY': 1239278212}
"""
- pass
+ if self.folder_encode:
+ folder = encode_utf7(folder)
+
+ try:
+ typ, data = self._imap.select(folder, readonly)
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ result = {}
+ for response in data:
+ if not response:
+ continue
+ match = _RE_SELECT_RESPONSE.match(response)
+ if match:
+ key = match.group('key')
+ if key == b'READ-WRITE':
+ result[key] = True
+ elif key == b'READ-ONLY':
+ result[b'READ-WRITE'] = False
+ elif match.group('data'):
+ result[key] = self._parse_response(match.group('data'))
+ else:
+ key, value = response.split(None, 1)
+ try:
+ result[key] = int(value)
+ except (ValueError, TypeError):
+ result[key] = value
+
+ if b'READ-WRITE' not in result:
+ result[b'READ-WRITE'] = not readonly
+
+ return result
@require_capability('UNSELECT')
def unselect_folder(self):
@@ -504,7 +853,15 @@ class IMAPClient:
Returns the UNSELECT response string returned by the server.
"""
- pass
+ try:
+ typ, data = self._imap._simple_command('UNSELECT')
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ return data[0].decode('ascii')
def noop(self):
"""Execute the NOOP command.
@@ -522,7 +879,15 @@ class IMAPClient:
(6, b'FETCH', (b'FLAGS', (b'sne',)))])
"""
- pass
+ try:
+ typ, data = self._imap.noop()
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ return data[0].decode('ascii'), self._consume_responses()
@require_capability('IDLE')
def idle(self):
@@ -540,7 +905,18 @@ class IMAPClient:
See :rfc:`2177` for more information about the IDLE extension.
"""
- pass
+ if self._idle_tag is not None:
+ raise self.Error('Already in IDLE mode')
+
+ try:
+ tag = self._imap._command('IDLE')
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ self._idle_tag = tag
+ resp = self._imap._get_response()
+ if resp is not None:
+ raise self.Error('Unexpected IDLE response: %s' % resp)
def _poll_socket(self, sock, timeout=None):
"""
@@ -548,7 +924,15 @@ class IMAPClient:
This implementation is more scalable because it ALLOWS your process
to have more than 1024 file descriptors.
"""
- pass
+ if not POLL_SUPPORT:
+ return self._select_poll_socket(sock, timeout)
+
+ poller = select.poll()
+ poller.register(sock, select.POLLIN)
+
+ timeout_ms = None if timeout is None else int(timeout * 1000)
+ ready = poller.poll(timeout_ms)
+ return bool(ready)
def _select_poll_socket(self, sock, timeout=None):
"""
@@ -557,7 +941,8 @@ class IMAPClient:
has more than 1024 file descriptors.
We still need this for Windows and some other niche systems.
"""
- pass
+ r, _, _ = select.select([sock], [], [], timeout)
+ return bool(r)
@require_capability('IDLE')
def idle_check(self, timeout=None):
@@ -578,7 +963,17 @@ class IMAPClient:
(1, b'EXISTS'),
(1, b'FETCH', (b'FLAGS', (b'\\NotJunk',)))]
"""
- pass
+ if self._idle_tag is None:
+ raise self.Error('Server not in IDLE mode')
+
+ sock = self.socket()
+ if sock is None:
+ raise self.Error('Socket is closed')
+
+ if not self._poll_socket(sock, timeout):
+ return []
+
+ return self._consume_responses()
@require_capability('IDLE')
def idle_done(self):
@@ -595,7 +990,21 @@ class IMAPClient:
any). These are returned in parsed form as per
``idle_check()``.
"""
- pass
+ if self._idle_tag is None:
+ raise self.Error('Server not in IDLE mode')
+
+ self._imap.send(b'DONE\r\n')
+ try:
+ typ, data = self._imap._command_complete(self._idle_tag)
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+ finally:
+ self._idle_tag = None
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ return data[0].decode('ascii'), self._consume_responses()
def folder_status(self, folder, what=None):
"""Return the status of *folder*.
@@ -607,37 +1016,127 @@ class IMAPClient:
Returns a dictionary of the status items for the folder with
keys matching *what*.
"""
- pass
+ if what is None:
+ what = ('MESSAGES', 'RECENT', 'UIDNEXT', 'UIDVALIDITY', 'UNSEEN')
+
+ if self.folder_encode:
+ folder = encode_utf7(folder)
+
+ try:
+ typ, data = self._imap.status(folder, '(' + ' '.join(what) + ')')
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ response = data[0].decode('ascii')
+ if not response.startswith('* STATUS '):
+ raise self.Error('Invalid STATUS response: %s' % response)
+
+ match = _RE_SELECT_RESPONSE.match(response[9:])
+ if not match:
+ raise self.Error('Invalid STATUS response: %s' % response)
+
+ result = {}
+ for key, value in self._parse_response(match.group('data')):
+ result[key] = value
+
+ return result
def close_folder(self):
"""Close the currently selected folder, returning the server
response string.
"""
- pass
+ try:
+ typ, data = self._imap.close()
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ return data[0].decode('ascii')
def create_folder(self, folder):
"""Create *folder* on the server returning the server response string."""
- pass
+ if self.folder_encode:
+ folder = encode_utf7(folder)
+
+ try:
+ typ, data = self._imap.create(folder)
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ return data[0].decode('ascii')
def rename_folder(self, old_name, new_name):
"""Change the name of a folder on the server."""
- pass
+ if self.folder_encode:
+ old_name = encode_utf7(old_name)
+ new_name = encode_utf7(new_name)
+
+ try:
+ typ, data = self._imap.rename(old_name, new_name)
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ return data[0].decode('ascii')
def delete_folder(self, folder):
"""Delete *folder* on the server returning the server response string."""
- pass
+ if self.folder_encode:
+ folder = encode_utf7(folder)
+
+ try:
+ typ, data = self._imap.delete(folder)
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ return data[0].decode('ascii')
def folder_exists(self, folder):
"""Return ``True`` if *folder* exists on the server."""
- pass
+ return folder in [name for _, _, name in self.list_folders()]
def subscribe_folder(self, folder):
"""Subscribe to *folder*, returning the server response string."""
- pass
+ if self.folder_encode:
+ folder = encode_utf7(folder)
+
+ try:
+ typ, data = self._imap.subscribe(folder)
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ return data[0].decode('ascii')
def unsubscribe_folder(self, folder):
"""Unsubscribe to *folder*, returning the server response string."""
- pass
+ if self.folder_encode:
+ folder = encode_utf7(folder)
+
+ try:
+ typ, data = self._imap.unsubscribe(folder)
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ return data[0].decode('ascii')
def search(self, criteria='ALL', charset=None):
"""Return a list of messages ids from the currently selected
@@ -699,7 +1198,34 @@ class IMAPClient:
in the search).
"""
- pass
+ if not criteria:
+ raise self.Error('No search criteria specified')
+
+ if isinstance(criteria, (str, bytes)):
+ criteria = [criteria]
+
+ criteria = _normalise_search_criteria(criteria, charset)
+
+ try:
+ typ, data = self._imap.search(charset or 'US-ASCII', *criteria)
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ if not data[0]:
+ return []
+
+ message_ids = [int(i) for i in data[0].split()]
+
+ # Check for MODSEQ in response
+ for response in self._consume_responses():
+ if len(response) == 2 and response[0].upper() == b'MODSEQ':
+ message_ids.modseq = response[1]
+ break
+
+ return message_ids
@require_capability('X-GM-EXT-1')
def gmail_search(self, query, charset='UTF-8'):
@@ -716,7 +1242,11 @@ class IMAPClient:
See https://developers.google.com/gmail/imap_extensions#extension_of_the_search_command_x-gm-raw
for more info.
"""
- pass
+ if not query:
+ raise self.Error('No search query specified')
+
+ criteria = ['X-GM-RAW', query]
+ return self.search(criteria, charset)
@require_capability('SORT')
def sort(self, sort_criteria, criteria='ALL', charset='UTF-8'):
@@ -741,7 +1271,34 @@ class IMAPClient:
Note that SORT is an extension to the IMAP4 standard so it may
not be supported by all IMAP servers.
"""
- pass
+ if not sort_criteria:
+ raise self.Error('No sort criteria specified')
+
+ if isinstance(sort_criteria, (str, bytes)):
+ sort_criteria = [sort_criteria]
+
+ sort_criteria = [to_bytes(c) for c in sort_criteria]
+
+ if not criteria:
+ raise self.Error('No search criteria specified')
+
+ if isinstance(criteria, (str, bytes)):
+ criteria = [criteria]
+
+ criteria = _normalise_search_criteria(criteria, charset)
+
+ try:
+ typ, data = self._imap.sort(sort_criteria, charset or 'US-ASCII', *criteria)
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ if not data[0]:
+ return []
+
+ return [int(i) for i in data[0].split()]
def thread(self, algorithm='REFERENCES', criteria='ALL', charset='UTF-8'):
"""Return a list of messages threads from the currently
@@ -760,7 +1317,26 @@ class IMAPClient:
See :rfc:`5256` for more details.
"""
- pass
+ if not criteria:
+ raise self.Error('No search criteria specified')
+
+ if isinstance(criteria, (str, bytes)):
+ criteria = [criteria]
+
+ criteria = _normalise_search_criteria(criteria, charset)
+
+ try:
+ typ, data = self._imap.thread(algorithm.upper(), charset or 'US-ASCII', *criteria)
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ if typ != 'OK':
+ raise self.Error(data[0].decode('ascii'))
+
+ if not data[0]:
+ return []
+
+ return parse_message_list(data[0])
def get_flags(self, messages):
"""Return the flags set for each message in *messages* from
@@ -769,7 +1345,8 @@ class IMAPClient:
The return value is a dictionary structured like this: ``{
msgid1: (flag1, flag2, ... ), }``.
"""
- pass
+ response = self.fetch(messages, ['FLAGS'])
+ return {msg_id: flags for msg_id, flags in response.items()}
def add_flags(self, messages, flags, silent=False):
"""Add *flags* to *messages* in the currently selected folder.
diff --git a/imapclient/util.py b/imapclient/util.py
index 7413bc3..caf6d53 100644
--- a/imapclient/util.py
+++ b/imapclient/util.py
@@ -1,6 +1,29 @@
import logging
-from typing import Iterator, Optional, Tuple, Union
+from typing import Iterator, Optional, Tuple, Union, List, Any
from . import exceptions
logger = logging.getLogger(__name__)
_TupleAtomPart = Union[None, int, bytes]
-_TupleAtom = Tuple[Union[_TupleAtomPart, '_TupleAtom'], ...]
\ No newline at end of file
+_TupleAtom = Tuple[Union[_TupleAtomPart, '_TupleAtom'], ...]
+
+def assert_imap_protocol(condition: bool, message: Optional[bytes] = None) -> None:
+ if not condition:
+ raise exceptions.ProtocolError(message)
+
+def to_unicode(s: Union[str, bytes]) -> str:
+ """Convert a bytes object to a unicode string."""
+ if isinstance(s, bytes):
+ return s.decode('utf-8')
+ return s
+
+def to_bytes(s: Union[str, bytes, int]) -> bytes:
+ """Convert a string, number or bytes to bytes."""
+ if isinstance(s, bytes):
+ return s
+ if isinstance(s, int):
+ return str(s).encode('ascii')
+ return s.encode('utf-8')
+
+def chunk(lst: List[Any], size: int) -> Iterator[List[Any]]:
+ """Split a list into chunks of a given size."""
+ for i in range(0, len(lst), size):
+ yield lst[i:i + size]
\ No newline at end of file