back to Reference (Gold) summary
Reference (Gold): imapclient
Pytest Summary for test tests
status | count |
---|---|
passed | 267 |
total | 267 |
collected | 267 |
Failed pytests:
Patch diff
diff --git a/imapclient/config.py b/imapclient/config.py
index f098591..76e8dc8 100644
--- a/imapclient/config.py
+++ b/imapclient/config.py
@@ -1,3 +1,7 @@
+# Copyright (c) 2015, Menno Smits
+# Released subject to the New BSD License
+# Please see http://en.wikipedia.org/wiki/BSD_licenses
+
import argparse
import configparser
import json
@@ -6,19 +10,210 @@ import ssl
import urllib.parse
import urllib.request
from typing import Any, Callable, Dict, Optional, Tuple, TYPE_CHECKING, TypeVar
+
import imapclient
-def parse_config_file(filename: str) ->argparse.Namespace:
+def getenv(name: str, default: Optional[str]) -> Optional[str]:
+ return os.environ.get("imapclient_" + name, default)
+
+
+def get_config_defaults() -> Dict[str, Any]:
+ return {
+ "username": getenv("username", None),
+ "password": getenv("password", None),
+ "ssl": True,
+ "ssl_check_hostname": True,
+ "ssl_verify_cert": True,
+ "ssl_ca_file": None,
+ "timeout": None,
+ "starttls": False,
+ "stream": False,
+ "oauth2": False,
+ "oauth2_client_id": getenv("oauth2_client_id", None),
+ "oauth2_client_secret": getenv("oauth2_client_secret", None),
+ "oauth2_refresh_token": getenv("oauth2_refresh_token", None),
+ "expect_failure": None,
+ }
+
+
+def parse_config_file(filename: str) -> argparse.Namespace:
"""Parse INI files containing IMAP connection details.
Used by livetest.py and interact.py
"""
- pass
+ parser = configparser.ConfigParser(get_string_config_defaults())
+ parser.read(filename)
+
+ conf = _read_config_section(parser, "DEFAULT")
+ if conf.expect_failure:
+ raise ValueError("expect_failure should not be set for the DEFAULT section")
+
+ conf.alternates = {}
+ for section in parser.sections():
+ # pylint: disable=no-member
+ conf.alternates[section] = _read_config_section(parser, section)
+
+ return conf
+
+
+def get_string_config_defaults() -> Dict[str, str]:
+ out = {}
+ for k, v in get_config_defaults().items():
+ if v is True:
+ v = "true"
+ elif v is False:
+ v = "false"
+ elif not v:
+ v = ""
+ out[k] = v
+ return out
+
+
+T = TypeVar("T")
+
+
+def _read_config_section(
+ parser: configparser.ConfigParser, section: str
+) -> argparse.Namespace:
+ def get(name: str) -> str:
+ return parser.get(section, name)
+
+ def getboolean(name: str) -> bool:
+ return parser.getboolean(section, name)
+
+ def get_allowing_none(name: str, typefunc: Callable[[str], T]) -> Optional[T]:
+ try:
+ v = parser.get(section, name)
+ except configparser.NoOptionError:
+ return None
+ if not v:
+ return None
+ return typefunc(v)
+
+ def getint(name: str) -> Optional[int]:
+ return get_allowing_none(name, int)
+
+ def getfloat(name: str) -> Optional[float]:
+ return get_allowing_none(name, float)
-T = TypeVar('T')
-OAUTH2_REFRESH_URLS = {'imap.gmail.com':
- 'https://accounts.google.com/o/oauth2/token', 'imap.mail.yahoo.com':
- 'https://api.login.yahoo.com/oauth2/get_token'}
+ ssl_ca_file = get("ssl_ca_file")
+ if ssl_ca_file:
+ ssl_ca_file = os.path.expanduser(ssl_ca_file)
+
+ return argparse.Namespace(
+ host=get("host"),
+ port=getint("port"),
+ ssl=getboolean("ssl"),
+ starttls=getboolean("starttls"),
+ ssl_check_hostname=getboolean("ssl_check_hostname"),
+ ssl_verify_cert=getboolean("ssl_verify_cert"),
+ ssl_ca_file=ssl_ca_file,
+ timeout=getfloat("timeout"),
+ stream=getboolean("stream"),
+ username=get("username"),
+ password=get("password"),
+ oauth2=getboolean("oauth2"),
+ oauth2_client_id=get("oauth2_client_id"),
+ oauth2_client_secret=get("oauth2_client_secret"),
+ oauth2_refresh_token=get("oauth2_refresh_token"),
+ expect_failure=get("expect_failure"),
+ )
+
+
+OAUTH2_REFRESH_URLS = {
+ "imap.gmail.com": "https://accounts.google.com/o/oauth2/token",
+ "imap.mail.yahoo.com": "https://api.login.yahoo.com/oauth2/get_token",
+}
+
+
+def refresh_oauth2_token(
+ hostname: str, client_id: str, client_secret: str, refresh_token: str
+) -> str:
+ url = OAUTH2_REFRESH_URLS.get(hostname)
+ if not url:
+ raise ValueError("don't know where to refresh OAUTH2 token for %r" % hostname)
+
+ post = {
+ "client_id": client_id.encode("ascii"),
+ "client_secret": client_secret.encode("ascii"),
+ "refresh_token": refresh_token.encode("ascii"),
+ "grant_type": b"refresh_token",
+ }
+ with urllib.request.urlopen(
+ url, urllib.parse.urlencode(post).encode("ascii")
+ ) as request:
+ response = request.read()
+ result = json.loads(response.decode("ascii"))["access_token"]
+ if TYPE_CHECKING:
+ assert isinstance(result, str)
+ return result
+
+
+# Tokens are expensive to refresh so use the same one for the duration of the process.
_oauth2_cache: Dict[Tuple[str, str, str, str], str] = {}
+
+
+def get_oauth2_token(
+ hostname: str, client_id: str, client_secret: str, refresh_token: str
+) -> str:
+ cache_key = (hostname, client_id, client_secret, refresh_token)
+ token = _oauth2_cache.get(cache_key)
+ if token:
+ return token
+
+ token = refresh_oauth2_token(hostname, client_id, client_secret, refresh_token)
+ _oauth2_cache[cache_key] = token
+ return token
+
+
+def create_client_from_config(
+ conf: argparse.Namespace, login: bool = True
+) -> imapclient.IMAPClient:
+ assert conf.host, "missing host"
+
+ ssl_context = None
+ if conf.ssl:
+ ssl_context = ssl.create_default_context()
+ ssl_context.check_hostname = conf.ssl_check_hostname
+ if not conf.ssl_verify_cert:
+ ssl_context.verify_mode = ssl.CERT_NONE
+ if conf.ssl_ca_file:
+ ssl_context.load_verify_locations(cafile=conf.ssl_ca_file)
+
+ client = imapclient.IMAPClient(
+ conf.host,
+ port=conf.port,
+ ssl=conf.ssl,
+ ssl_context=ssl_context,
+ stream=conf.stream,
+ timeout=conf.timeout,
+ )
+ if not login:
+ return client
+
+ try:
+ if conf.starttls:
+ client.starttls()
+
+ if conf.oauth2:
+ assert conf.oauth2_client_id, "missing oauth2 id"
+ assert conf.oauth2_client_secret, "missing oauth2 secret"
+ assert conf.oauth2_refresh_token, "missing oauth2 refresh token"
+ access_token = get_oauth2_token(
+ conf.host,
+ conf.oauth2_client_id,
+ conf.oauth2_client_secret,
+ conf.oauth2_refresh_token,
+ )
+ client.oauth2_login(conf.username, access_token)
+
+ elif not conf.stream:
+ assert conf.username, "missing username"
+ assert conf.password, "missing password"
+ client.login(conf.username, conf.password)
+ return client
+ except: # noqa: E722
+ client.shutdown()
+ raise
diff --git a/imapclient/datetime_util.py b/imapclient/datetime_util.py
index 57a44c4..060f889 100644
--- a/imapclient/datetime_util.py
+++ b/imapclient/datetime_util.py
@@ -1,11 +1,17 @@
+# Copyright (c) 2014, Menno Smits
+# Released subject to the New BSD License
+# Please see http://en.wikipedia.org/wiki/BSD_licenses
+
import re
from datetime import datetime
from email.utils import parsedate_tz
+
from .fixed_offset import FixedOffset
-_SHORT_MONTHS = ' Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split(' ')
+
+_SHORT_MONTHS = " Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec".split(" ")
-def parse_to_datetime(timestamp: bytes, normalise: bool=True) ->datetime:
+def parse_to_datetime(timestamp: bytes, normalise: bool = True) -> datetime:
"""Convert an IMAP datetime string to a datetime.
If normalise is True (the default), then the returned datetime
@@ -14,22 +20,51 @@ def parse_to_datetime(timestamp: bytes, normalise: bool=True) ->datetime:
If normalise is False, then the returned datetime will be
unadjusted but will contain timezone information as per the input.
"""
- pass
+ time_tuple = parsedate_tz(_munge(timestamp))
+ if time_tuple is None:
+ raise ValueError("couldn't parse datetime %r" % timestamp)
+
+ tz_offset_seconds = time_tuple[-1]
+ tz = None
+ if tz_offset_seconds is not None:
+ tz = FixedOffset(tz_offset_seconds / 60)
+
+ dt = datetime(*time_tuple[:6], tzinfo=tz)
+ if normalise and tz:
+ dt = datetime_to_native(dt)
+ return dt
-def datetime_to_INTERNALDATE(dt: datetime) ->str:
+
+def datetime_to_native(dt: datetime) -> datetime:
+ return dt.astimezone(FixedOffset.for_system()).replace(tzinfo=None)
+
+
+def datetime_to_INTERNALDATE(dt: datetime) -> str:
"""Convert a datetime instance to a IMAP INTERNALDATE string.
If timezone information is missing the current system
timezone is used.
"""
- pass
+ if not dt.tzinfo:
+ dt = dt.replace(tzinfo=FixedOffset.for_system())
+ fmt = "%d-" + _SHORT_MONTHS[dt.month] + "-%Y %H:%M:%S %z"
+ return dt.strftime(fmt)
+
+
+# Matches timestamp strings where the time separator is a dot (see
+# issue #154). For example: 'Sat, 8 May 2010 16.03.09 +0200'
+_rfc822_dotted_time = re.compile(r"\w+, ?\d{1,2} \w+ \d\d(\d\d)? \d\d?\.\d\d?\.\d\d?.*")
-_rfc822_dotted_time = re.compile(
- '\\w+, ?\\d{1,2} \\w+ \\d\\d(\\d\\d)? \\d\\d?\\.\\d\\d?\\.\\d\\d?.*')
+def _munge(timestamp: bytes) -> str:
+ s = timestamp.decode("latin-1") # parsedate_tz only works with strings
+ if _rfc822_dotted_time.match(s):
+ return s.replace(".", ":")
+ return s
-def format_criteria_date(dt: datetime) ->bytes:
+def format_criteria_date(dt: datetime) -> bytes:
"""Format a date or datetime instance for use in IMAP search criteria."""
- pass
+ out = "%02d-%s-%d" % (dt.day, _SHORT_MONTHS[dt.month], dt.year)
+ return out.encode("ascii")
diff --git a/imapclient/exceptions.py b/imapclient/exceptions.py
index a29d919..725af2f 100644
--- a/imapclient/exceptions.py
+++ b/imapclient/exceptions.py
@@ -1,4 +1,10 @@
import imaplib
+
+# Base class allowing to catch any IMAPClient related exceptions
+# To ensure backward compatibility, we "rename" the imaplib general
+# exception class, so we can catch its exceptions without having to
+# deal with it in IMAPClient codebase
+
IMAPClientError = imaplib.IMAP4.error
IMAPClientAbortError = imaplib.IMAP4.abort
IMAPClientReadOnlyError = imaplib.IMAP4.readonly
diff --git a/imapclient/fixed_offset.py b/imapclient/fixed_offset.py
index b9e7df9..344df46 100644
--- a/imapclient/fixed_offset.py
+++ b/imapclient/fixed_offset.py
@@ -1,6 +1,11 @@
+# Copyright (c) 2014, Menno Smits
+# Released subject to the New BSD License
+# Please see http://en.wikipedia.org/wiki/BSD_licenses
+
import datetime
import time
from typing import Optional
+
ZERO = datetime.timedelta(0)
@@ -10,17 +15,31 @@ class FixedOffset(datetime.tzinfo):
east from UTC
"""
- def __init__(self, minutes: float) ->None:
+ def __init__(self, minutes: float) -> None:
self.__offset = datetime.timedelta(minutes=minutes)
- sign = '+'
+
+ sign = "+"
if minutes < 0:
- sign = '-'
+ sign = "-"
hours, remaining_mins = divmod(abs(minutes), 60)
- self.__name = '%s%02d%02d' % (sign, hours, remaining_mins)
+ self.__name = "%s%02d%02d" % (sign, hours, remaining_mins)
+
+ def utcoffset(self, _: Optional[datetime.datetime]) -> datetime.timedelta:
+ return self.__offset
+
+ def tzname(self, _: Optional[datetime.datetime]) -> str:
+ return self.__name
+
+ def dst(self, _: Optional[datetime.datetime]) -> datetime.timedelta:
+ return ZERO
@classmethod
- def for_system(cls) ->'FixedOffset':
+ def for_system(cls) -> "FixedOffset":
"""Return a FixedOffset instance for the current working timezone and
DST conditions.
"""
- pass
+ if time.localtime().tm_isdst and time.daylight:
+ offset = time.altzone
+ else:
+ offset = time.timezone
+ return cls(-offset // 60)
diff --git a/imapclient/imap4.py b/imapclient/imap4.py
index 2a45702..d07515e 100644
--- a/imapclient/imap4.py
+++ b/imapclient/imap4.py
@@ -1,11 +1,27 @@
+# Copyright (c) 2015, Menno Smits
+# Released subject to the New BSD License
+# Please see http://en.wikipedia.org/wiki/BSD_licenses
+
import imaplib
import socket
from typing import Optional
class IMAP4WithTimeout(imaplib.IMAP4):
-
- def __init__(self, address: str, port: int, timeout: Optional[float]
- ) ->None:
+ def __init__(self, address: str, port: int, timeout: Optional[float]) -> None:
self._timeout = timeout
imaplib.IMAP4.__init__(self, address, port)
+
+ def open(
+ self, host: str = "", port: int = 143, timeout: Optional[float] = None
+ ) -> None:
+ # This is overridden to make it consistent across Python versions.
+ self.host = host
+ self.port = port
+ self.sock = self._create_socket(timeout)
+ self.file = self.sock.makefile("rb")
+
+ def _create_socket(self, timeout: Optional[float] = None) -> socket.socket:
+ return socket.create_connection(
+ (self.host, self.port), timeout if timeout is not None else self._timeout
+ )
diff --git a/imapclient/imap_utf7.py b/imapclient/imap_utf7.py
index 7a795b2..021c564 100644
--- a/imapclient/imap_utf7.py
+++ b/imapclient/imap_utf7.py
@@ -1,25 +1,108 @@
+# This file contains two main methods used to encode and decode UTF-7
+# string, described in the RFC 3501. There are some variations specific
+# to IMAP4rev1, so the built-in Python UTF-7 codec can't be used instead.
+#
+# The main difference is the shift character (used to switch from ASCII to
+# base64 encoding context), which is & in this modified UTF-7 convention,
+# since + is considered as mainly used in mailbox names.
+# Other variations and examples can be found in the RFC 3501, section 5.1.3.
+
import binascii
from typing import List, Union
-def encode(s: Union[str, bytes]) ->bytes:
+def encode(s: Union[str, bytes]) -> bytes:
"""Encode a folder name using IMAP modified UTF-7 encoding.
Input is unicode; output is bytes (Python 3) or str (Python 2). If
non-unicode input is provided, the input is returned unchanged.
"""
- pass
+ if not isinstance(s, str):
+ return s
+
+ res = bytearray()
+
+ b64_buffer: List[str] = []
+
+ def consume_b64_buffer(buf: List[str]) -> None:
+ """
+ Consume the buffer by encoding it into a modified base 64 representation
+ and surround it with shift characters & and -
+ """
+ if buf:
+ res.extend(b"&" + base64_utf7_encode(buf) + b"-")
+ del buf[:]
+
+ for c in s:
+ # printable ascii case should not be modified
+ o = ord(c)
+ if 0x20 <= o <= 0x7E:
+ consume_b64_buffer(b64_buffer)
+ # Special case: & is used as shift character so we need to escape it in ASCII
+ if o == 0x26: # & = 0x26
+ res.extend(b"&-")
+ else:
+ res.append(o)
+
+ # Bufferize characters that will be encoded in base64 and append them later
+ # in the result, when iterating over ASCII character or the end of string
+ else:
+ b64_buffer.append(c)
+
+ # Consume the remaining buffer if the string finish with non-ASCII characters
+ consume_b64_buffer(b64_buffer)
+
+ return bytes(res)
-AMPERSAND_ORD = ord('&')
-DASH_ORD = ord('-')
+AMPERSAND_ORD = ord("&")
+DASH_ORD = ord("-")
-def decode(s: Union[bytes, str]) ->str:
+def decode(s: Union[bytes, str]) -> str:
"""Decode a folder name from IMAP modified UTF-7 encoding to unicode.
Input is bytes (Python 3) or str (Python 2); output is always
unicode. If non-bytes/str input is provided, the input is returned
unchanged.
"""
- pass
+ if not isinstance(s, bytes):
+ return s
+
+ res = []
+ # Store base64 substring that will be decoded once stepping on end shift character
+ b64_buffer = bytearray()
+ for c in s:
+ # Shift character without anything in buffer -> starts storing base64 substring
+ if c == AMPERSAND_ORD and not b64_buffer:
+ b64_buffer.append(c)
+ # End shift char. -> append the decoded buffer to the result and reset it
+ elif c == DASH_ORD and b64_buffer:
+ # Special case &-, representing "&" escaped
+ if len(b64_buffer) == 1:
+ res.append("&")
+ else:
+ res.append(base64_utf7_decode(b64_buffer[1:]))
+ b64_buffer = bytearray()
+ # Still buffering between the shift character and the shift back to ASCII
+ elif b64_buffer:
+ b64_buffer.append(c)
+ # No buffer initialized yet, should be an ASCII printable char
+ else:
+ res.append(chr(c))
+
+ # Decode the remaining buffer if any
+ if b64_buffer:
+ res.append(base64_utf7_decode(b64_buffer[1:]))
+
+ return "".join(res)
+
+
+def base64_utf7_encode(buffer: List[str]) -> bytes:
+ s = "".join(buffer).encode("utf-16be")
+ return binascii.b2a_base64(s).rstrip(b"\n=").replace(b"/", b",")
+
+
+def base64_utf7_decode(s: bytearray) -> str:
+ s_utf7 = b"+" + s.replace(b",", b"/") + b"-"
+ return s_utf7.decode("utf-7")
diff --git a/imapclient/imapclient.py b/imapclient/imapclient.py
index 1b399f1..eea281a 100644
--- a/imapclient/imapclient.py
+++ b/imapclient/imapclient.py
@@ -1,3 +1,7 @@
+# Copyright (c) 2015, Menno Smits
+# Released subject to the New BSD License
+# Please see http://en.wikipedia.org/wiki/BSD_licenses
+
import dataclasses
import functools
import imaplib
@@ -12,57 +16,103 @@ from datetime import date, datetime
from logging import getLogger, LoggerAdapter
from operator import itemgetter
from typing import List, Optional
+
from . import exceptions, imap4, response_lexer, tls
from .datetime_util import datetime_to_INTERNALDATE, format_criteria_date
from .imap_utf7 import decode as decode_utf7
from .imap_utf7 import encode as encode_utf7
from .response_parser import parse_fetch_response, parse_message_list, parse_response
from .util import assert_imap_protocol, chunk, to_bytes, to_unicode
-if hasattr(select, 'poll'):
+
+if hasattr(select, "poll"):
POLL_SUPPORT = True
else:
+ # Fallback to select() on systems that don't support poll()
POLL_SUPPORT = False
+
+
logger = getLogger(__name__)
-__all__ = ['IMAPClient', 'SocketTimeout', 'DELETED', 'SEEN', 'ANSWERED',
- 'FLAGGED', 'DRAFT', 'RECENT']
-if 'XLIST' not in imaplib.Commands:
- imaplib.Commands['XLIST'] = 'NONAUTH', 'AUTH', 'SELECTED'
-if 'IDLE' not in imaplib.Commands:
- imaplib.Commands['IDLE'] = 'NONAUTH', 'AUTH', 'SELECTED'
-if 'STARTTLS' not in imaplib.Commands:
- imaplib.Commands['STARTTLS'] = 'NONAUTH',
-if 'ID' not in imaplib.Commands:
- imaplib.Commands['ID'] = 'NONAUTH', 'AUTH', 'SELECTED'
-if 'UNSELECT' not in imaplib.Commands:
- imaplib.Commands['UNSELECT'] = 'AUTH', 'SELECTED'
-if 'ENABLE' not in imaplib.Commands:
- imaplib.Commands['ENABLE'] = 'AUTH',
-if 'MOVE' not in imaplib.Commands:
- imaplib.Commands['MOVE'] = 'AUTH', 'SELECTED'
-DELETED = b'\\Deleted'
-SEEN = b'\\Seen'
-ANSWERED = b'\\Answered'
-FLAGGED = b'\\Flagged'
-DRAFT = b'\\Draft'
-RECENT = b'\\Recent'
-ALL = b'\\All'
-ARCHIVE = b'\\Archive'
-DRAFTS = b'\\Drafts'
-JUNK = b'\\Junk'
-SENT = b'\\Sent'
-TRASH = b'\\Trash'
-_POPULAR_PERSONAL_NAMESPACES = ('', ''), ('INBOX.', '.')
-_POPULAR_SPECIAL_FOLDERS = {SENT: ('Sent', 'Sent Items', 'Sent items'),
- DRAFTS: ('Drafts',), ARCHIVE: ('Archive',), TRASH: ('Trash',
- 'Deleted Items', 'Deleted Messages', 'Deleted'), JUNK: ('Junk', 'Spam')}
-_RE_SELECT_RESPONSE = re.compile(
- b'\\[(?P<key>[A-Z-]+)( \\((?P<data>.*)\\))?\\]')
+__all__ = [
+ "IMAPClient",
+ "SocketTimeout",
+ "DELETED",
+ "SEEN",
+ "ANSWERED",
+ "FLAGGED",
+ "DRAFT",
+ "RECENT",
+]
+
+
+# We also offer the gmail-specific XLIST command...
+if "XLIST" not in imaplib.Commands:
+ imaplib.Commands["XLIST"] = ("NONAUTH", "AUTH", "SELECTED")
+
+# ...and IDLE
+if "IDLE" not in imaplib.Commands:
+ imaplib.Commands["IDLE"] = ("NONAUTH", "AUTH", "SELECTED")
+
+# ..and STARTTLS
+if "STARTTLS" not in imaplib.Commands:
+ imaplib.Commands["STARTTLS"] = ("NONAUTH",)
+
+# ...and ID. RFC2971 says that this command is valid in all states,
+# but not that some servers (*cough* FastMail *cough*) don't seem to
+# accept it in state NONAUTH.
+if "ID" not in imaplib.Commands:
+ imaplib.Commands["ID"] = ("NONAUTH", "AUTH", "SELECTED")
+
+# ... and UNSELECT. RFC3691 does not specify the state but there is no
+# reason to use the command without AUTH state and a mailbox selected.
+if "UNSELECT" not in imaplib.Commands:
+ imaplib.Commands["UNSELECT"] = ("AUTH", "SELECTED")
+
+# .. and ENABLE.
+if "ENABLE" not in imaplib.Commands:
+ imaplib.Commands["ENABLE"] = ("AUTH",)
+
+# .. and MOVE for RFC6851.
+if "MOVE" not in imaplib.Commands:
+ imaplib.Commands["MOVE"] = ("AUTH", "SELECTED")
+
+# System flags
+DELETED = rb"\Deleted"
+SEEN = rb"\Seen"
+ANSWERED = rb"\Answered"
+FLAGGED = rb"\Flagged"
+DRAFT = rb"\Draft"
+RECENT = rb"\Recent" # This flag is read-only
+
+# Special folders, see RFC6154
+# \Flagged is omitted because it is the same as the flag defined above
+ALL = rb"\All"
+ARCHIVE = rb"\Archive"
+DRAFTS = rb"\Drafts"
+JUNK = rb"\Junk"
+SENT = rb"\Sent"
+TRASH = rb"\Trash"
+
+# Personal namespaces that are common among providers
+# used as a fallback when the server does not support the NAMESPACE capability
+_POPULAR_PERSONAL_NAMESPACES = (("", ""), ("INBOX.", "."))
+
+# Names of special folders that are common among providers
+_POPULAR_SPECIAL_FOLDERS = {
+ SENT: ("Sent", "Sent Items", "Sent items"),
+ DRAFTS: ("Drafts",),
+ ARCHIVE: ("Archive",),
+ TRASH: ("Trash", "Deleted Items", "Deleted Messages", "Deleted"),
+ JUNK: ("Junk", "Spam"),
+}
+
+_RE_SELECT_RESPONSE = re.compile(rb"\[(?P<key>[A-Z-]+)( \((?P<data>.*)\))?\]")
-class Namespace(tuple):
+class Namespace(tuple):
def __new__(cls, personal, other, shared):
return tuple.__new__(cls, (personal, other, shared))
+
personal = property(itemgetter(0))
other = property(itemgetter(1))
shared = property(itemgetter(2))
@@ -79,6 +129,7 @@ class SocketTimeout:
timeout if the connection takes more than 15 seconds to establish but
read/write operations can take up to 60 seconds once the connection is done.
"""
+
connect: float
read: float
@@ -92,6 +143,7 @@ class MailboxQuotaRoots:
:ivar mailbox: the mailbox
:ivar quota_roots: list of quota roots associated with the mailbox
"""
+
mailbox: str
quota_roots: List[str]
@@ -107,6 +159,7 @@ class Quota:
:ivar usage: the current usage of the resource
:ivar limit: the maximum allowed usage of the resource
"""
+
quota_root: str
resource: str
usage: bytes
@@ -115,7 +168,19 @@ class Quota:
def require_capability(capability):
"""Decorator raising CapabilityError when a capability is not available."""
- pass
+
+ def actual_decorator(func):
+ @functools.wraps(func)
+ def wrapper(client, *args, **kwargs):
+ if not client.has_capability(capability):
+ raise exceptions.CapabilityError(
+ "Server does not support {} capability".format(capability)
+ )
+ return func(client, *args, **kwargs)
+
+ return wrapper
+
+ return actual_decorator
class IMAPClient:
@@ -168,13 +233,24 @@ class IMAPClient:
... client.login("bar@foo.org", "passwd")
"""
+
+ # Those exceptions are kept for backward-compatibility, since
+ # previous versions included these attributes as references to
+ # imaplib original exceptions
Error = exceptions.IMAPClientError
AbortError = exceptions.IMAPClientAbortError
ReadOnlyError = exceptions.IMAPClientReadOnlyError
- def __init__(self, host: str, port: int=None, use_uid: bool=True, ssl:
- bool=True, stream: bool=False, ssl_context: Optional[ssl_lib.
- SSLContext]=None, timeout: Optional[float]=None):
+ def __init__(
+ self,
+ host: str,
+ port: int = None,
+ use_uid: bool = True,
+ ssl: bool = True,
+ stream: bool = False,
+ ssl_context: Optional[ssl_lib.SSLContext] = None,
+ timeout: Optional[float] = None,
+ ):
if stream:
if port is not None:
raise ValueError("can't set 'port' when 'stream' True")
@@ -182,10 +258,14 @@ class IMAPClient:
raise ValueError("can't use 'ssl' when 'stream' is True")
elif port is None:
port = ssl and 993 or 143
+
if ssl and port == 143:
logger.warning(
- 'Attempting to establish an encrypted connection to a port (143) often used for unencrypted connections'
- )
+ "Attempting to establish an encrypted connection "
+ "to a port (143) often used for unencrypted "
+ "connections"
+ )
+
self.host = host
self.port = port
self.ssl = ssl
@@ -194,18 +274,27 @@ class IMAPClient:
self.use_uid = use_uid
self.folder_encode = True
self.normalise_times = True
+
+ # If the user gives a single timeout value, assume it is the same for
+ # connection and read/write operations
if not isinstance(timeout, SocketTimeout):
timeout = SocketTimeout(timeout, timeout)
+
self._timeout = timeout
self._starttls_done = False
self._cached_capabilities = None
self._idle_tag = None
+
self._imap = self._create_IMAP4()
- logger.debug('Connected to host %s over %s', self.host, 'SSL/TLS' if
- ssl else 'plain text')
+ logger.debug(
+ "Connected to host %s over %s",
+ self.host,
+ "SSL/TLS" if ssl else "plain text",
+ )
+
self._set_read_timeout()
- imaplib_logger = IMAPlibLoggerAdapter(getLogger(
- 'imapclient.imaplib'), {})
+ # Small hack to make imaplib log everything to its own logger
+ imaplib_logger = IMAPlibLoggerAdapter(getLogger("imapclient.imaplib"), {})
self._imap.debug = 5
self._imap._mesg = imaplib_logger.debug
@@ -224,7 +313,32 @@ class IMAPClient:
try:
self.shutdown()
except Exception as e:
- logger.info('Could not close the connection cleanly: %s', e)
+ logger.info("Could not close the connection cleanly: %s", e)
+
+ def _create_IMAP4(self):
+ if self.stream:
+ return imaplib.IMAP4_stream(self.host)
+
+ connect_timeout = getattr(self._timeout, "connect", None)
+
+ if self.ssl:
+ return tls.IMAP4_TLS(
+ self.host,
+ self.port,
+ self.ssl_context,
+ connect_timeout,
+ )
+
+ return imap4.IMAP4WithTimeout(self.host, self.port, connect_timeout)
+
+ def _set_read_timeout(self):
+ if self._timeout is not None:
+ self.socket().settimeout(self._timeout.read)
+
+ @property
+ def _sock(self):
+ warnings.warn("_sock is deprecated. Use socket().", DeprecationWarning)
+ return self.socket()
def socket(self):
"""Returns socket used to connect to server.
@@ -239,9 +353,11 @@ class IMAPClient:
This includes reading from and writing to the socket,
as they are likely to break internal bookkeeping of messages.
"""
- pass
+ # In py2, imaplib has sslobj (for SSL connections), and sock for non-SSL.
+ # In the py3 version it's just sock.
+ return getattr(self._imap, "sslobj", self._imap.sock)
- @require_capability('STARTTLS')
+ @require_capability("STARTTLS")
def starttls(self, ssl_context=None):
"""Switch to an SSL encrypted connection by sending a STARTTLS command.
@@ -259,22 +375,55 @@ class IMAPClient:
Raises :py:exc:`AbortError` if the server does not support STARTTLS
or an SSL connection is already established.
"""
- pass
+ if self.ssl or self._starttls_done:
+ raise exceptions.IMAPClientAbortError("TLS session already established")
+
+ typ, data = self._imap._simple_command("STARTTLS")
+ self._checkok("starttls", typ, data)
+
+ self._starttls_done = True
+
+ self._imap.sock = tls.wrap_socket(self._imap.sock, ssl_context, self.host)
+ self._imap.file = self._imap.sock.makefile("rb")
+ return data[0]
def login(self, username: str, password: str):
"""Login using *username* and *password*, returning the
server response.
"""
- pass
-
- def oauth2_login(self, user: str, access_token: str, mech: str=
- 'XOAUTH2', vendor: Optional[str]=None):
+ try:
+ rv = self._command_and_check(
+ "login",
+ to_unicode(username),
+ to_unicode(password),
+ unpack=True,
+ )
+ except exceptions.IMAPClientError as e:
+ raise exceptions.LoginError(str(e))
+
+ logger.debug("Logged in as %s", username)
+ return rv
+
+ def oauth2_login(
+ self,
+ user: str,
+ access_token: str,
+ mech: str = "XOAUTH2",
+ vendor: Optional[str] = None,
+ ):
"""Authenticate using the OAUTH2 or XOAUTH2 methods.
Gmail and Yahoo both support the 'XOAUTH2' mechanism, but Yahoo requires
the 'vendor' portion in the payload.
"""
- pass
+ auth_string = "user=%s\1auth=Bearer %s\1" % (user, access_token)
+ if vendor:
+ auth_string += "vendor=%s\1" % vendor
+ auth_string += "\1"
+ try:
+ return self._command_and_check("authenticate", mech, lambda x: auth_string)
+ except exceptions.IMAPClientError as e:
+ raise exceptions.LoginError(str(e))
def oauthbearer_login(self, identity, access_token):
"""Authenticate using the OAUTHBEARER method.
@@ -282,11 +431,35 @@ class IMAPClient:
This is supported by Gmail and is meant to supersede the non-standard
'OAUTH2' and 'XOAUTH2' mechanisms.
"""
- pass
+ # https://tools.ietf.org/html/rfc5801#section-4
+ # Technically this is the authorization_identity, but at least for Gmail it's
+ # mandatory and practically behaves like the regular username/identity.
+ if identity:
+ gs2_header = "n,a=%s," % identity.replace("=", "=3D").replace(",", "=2C")
+ else:
+ gs2_header = "n,,"
+ # https://tools.ietf.org/html/rfc6750#section-2.1
+ http_authz = "Bearer %s" % access_token
+ # https://tools.ietf.org/html/rfc7628#section-3.1
+ auth_string = "%s\1auth=%s\1\1" % (gs2_header, http_authz)
+ try:
+ return self._command_and_check(
+ "authenticate", "OAUTHBEARER", lambda x: auth_string
+ )
+ except exceptions.IMAPClientError as e:
+ raise exceptions.LoginError(str(e))
def plain_login(self, identity, password, authorization_identity=None):
"""Authenticate using the PLAIN method (requires server support)."""
- pass
+ if not authorization_identity:
+ authorization_identity = ""
+ auth_string = "%s\0%s\0%s" % (authorization_identity, identity, password)
+ try:
+ return self._command_and_check(
+ "authenticate", "PLAIN", lambda _: auth_string, unpack=True
+ )
+ except exceptions.IMAPClientError as e:
+ raise exceptions.LoginError(str(e))
def sasl_login(self, mech_name, mech_callable):
"""Authenticate using a provided SASL mechanism (requires server support).
@@ -337,21 +510,30 @@ class IMAPClient:
imap.sasl_login("SCRAM-SHA-256", scram_mech)
"""
- pass
+ try:
+ return self._command_and_check(
+ "authenticate", mech_name, mech_callable, unpack=True
+ )
+ except exceptions.IMAPClientError as e:
+ raise exceptions.LoginError(str(e))
def logout(self):
"""Logout, returning the server response."""
- pass
+ typ, data = self._imap.logout()
+ self._check_resp("BYE", "logout", typ, data)
+ logger.debug("Logged out, connection closed")
+ return data[0]
- def shutdown(self) ->None:
+ def shutdown(self) -> None:
"""Close the connection to the IMAP server (without logging out)
In most cases, :py:meth:`.logout` should be used instead of
this. The logout method also shutdown down the connection.
"""
- pass
+ self._imap.shutdown()
+ logger.info("Connection closed")
- @require_capability('ENABLE')
+ @require_capability("ENABLE")
def enable(self, *capabilities):
"""Activate one or more server side capability extensions.
@@ -368,9 +550,23 @@ class IMAPClient:
See :rfc:`5161` for more details.
"""
- pass
-
- @require_capability('ID')
+ if self._imap.state != "AUTH":
+ raise exceptions.IllegalStateError(
+ "ENABLE command illegal in state %s" % self._imap.state
+ )
+
+ resp = self._raw_command_untagged(
+ b"ENABLE",
+ [to_bytes(c) for c in capabilities],
+ uid=False,
+ response_name="ENABLED",
+ unpack=True,
+ )
+ if not resp:
+ return []
+ return resp.split()
+
+ @require_capability("ID")
def id_(self, parameters=None):
"""Issue the ID command, returning a dict of server implementation
fields.
@@ -378,7 +574,19 @@ class IMAPClient:
*parameters* should be specified as a dictionary of field/value pairs,
for example: ``{"name": "IMAPClient", "version": "0.12"}``
"""
- pass
+ if parameters is None:
+ args = "NIL"
+ else:
+ if not isinstance(parameters, dict):
+ raise TypeError("'parameters' should be a dictionary")
+ args = seq_to_parenstr(
+ _quote(v) for v in itertools.chain.from_iterable(parameters.items())
+ )
+
+ typ, data = self._imap._simple_command("ID", args)
+ self._checkok("id", typ, data)
+ typ, data = self._imap._untagged_response(typ, data, "ID")
+ return parse_response(data)
def capabilities(self):
"""Returns the server capability list.
@@ -392,13 +600,51 @@ class IMAPClient:
If the session is not yet authenticated, the capabilities
requested at connection time will be returned.
"""
- pass
+ # Ensure cached capabilities aren't used post-STARTTLS. As per
+ # https://tools.ietf.org/html/rfc2595#section-3.1
+ if self._starttls_done and self._imap.state == "NONAUTH":
+ self._cached_capabilities = None
+ return self._do_capabilites()
+
+ # If a capability response has been cached, use that.
+ if self._cached_capabilities:
+ return self._cached_capabilities
+
+ # If the server returned an untagged CAPABILITY response
+ # (during authentication), cache it and return that.
+ untagged = _dict_bytes_normaliser(self._imap.untagged_responses)
+ response = untagged.pop("CAPABILITY", None)
+ if response:
+ self._cached_capabilities = self._normalise_capabilites(response[0])
+ return self._cached_capabilities
+
+ # If authenticated, but don't have a capability response, ask for one
+ if self._imap.state in ("SELECTED", "AUTH"):
+ self._cached_capabilities = self._do_capabilites()
+ return self._cached_capabilities
+
+ # Return capabilities that imaplib requested at connection
+ # time (pre-auth)
+ return tuple(to_bytes(c) for c in self._imap.capabilities)
+
+ def _do_capabilites(self):
+ raw_response = self._command_and_check("capability", unpack=True)
+ return self._normalise_capabilites(raw_response)
+
+ def _normalise_capabilites(self, raw_response):
+ raw_response = to_bytes(raw_response)
+ return tuple(raw_response.upper().split())
def has_capability(self, capability):
"""Return ``True`` if the IMAP server has the given *capability*."""
- pass
-
- @require_capability('NAMESPACE')
+ # FIXME: this will not detect capabilities that are backwards
+ # compatible with the current level. For instance the SORT
+ # capabilities may in the future be named SORT2 which is
+ # still compatible with the current standard and will not
+ # be detected by this method.
+ return to_bytes(capability).upper() in self.capabilities()
+
+ @require_capability("NAMESPACE")
def namespace(self):
"""Return the namespace for the account as a (personal, other,
shared) tuple.
@@ -412,9 +658,21 @@ class IMAPClient:
See :rfc:`2342` for more details.
"""
- pass
-
- def list_folders(self, directory='', pattern='*'):
+ data = self._command_and_check("namespace")
+ parts = []
+ for item in parse_response(data):
+ if item is None:
+ parts.append(item)
+ else:
+ converted = []
+ for prefix, separator in item:
+ if self.folder_encode:
+ prefix = decode_utf7(prefix)
+ converted.append((prefix, to_unicode(separator)))
+ parts.append(tuple(converted))
+ return Namespace(*parts)
+
+ def list_folders(self, directory="", pattern="*"):
"""Get a listing of folders on the server as a list of
``(flags, delimiter, name)`` tuples.
@@ -435,10 +693,10 @@ class IMAPClient:
decoded from modified UTF-7, except if folder_decode is not
set.
"""
- pass
+ return self._do_list("LIST", directory, pattern)
- @require_capability('XLIST')
- def xlist_folders(self, directory='', pattern='*'):
+ @require_capability("XLIST")
+ def xlist_folders(self, directory="", pattern="*"):
"""Execute the XLIST command, returning ``(flags, delimiter,
name)`` tuples.
@@ -467,16 +725,44 @@ class IMAPClient:
The *directory* and *pattern* arguments are as per
list_folders().
"""
- pass
+ return self._do_list("XLIST", directory, pattern)
- def list_sub_folders(self, directory='', pattern='*'):
+ def list_sub_folders(self, directory="", pattern="*"):
"""Return a list of subscribed folders on the server as
``(flags, delimiter, name)`` tuples.
The default behaviour will list all subscribed folders. The
*directory* and *pattern* arguments are as per list_folders().
"""
- pass
+ return self._do_list("LSUB", directory, pattern)
+
+ def _do_list(self, cmd, directory, pattern):
+ directory = self._normalise_folder(directory)
+ pattern = self._normalise_folder(pattern)
+ typ, dat = self._imap._simple_command(cmd, directory, pattern)
+ self._checkok(cmd, typ, dat)
+ typ, dat = self._imap._untagged_response(typ, dat, cmd)
+ return self._proc_folder_list(dat)
+
+ def _proc_folder_list(self, folder_data):
+ # Filter out empty strings and None's.
+ # This also deals with the special case of - no 'untagged'
+ # responses (ie, no folders). This comes back as [None].
+ folder_data = [item for item in folder_data if item not in (b"", None)]
+
+ ret = []
+ parsed = parse_response(folder_data)
+ for flags, delim, name in chunk(parsed, size=3):
+ if isinstance(name, int):
+ # Some IMAP implementations return integer folder names
+ # with quotes. These get parsed to ints so convert them
+ # back to strings.
+ name = str(name)
+ elif self.folder_encode:
+ name = decode_utf7(name)
+
+ ret.append((flags, delim, name))
+ return ret
def find_special_folder(self, folder_flag):
"""Try to locate a special folder, like the Sent or Trash folder.
@@ -490,7 +776,27 @@ class IMAPClient:
Returns the name of the folder if found, or None otherwise.
"""
- pass
+ # Detect folder by looking for known attributes
+ # TODO: avoid listing all folders by using extended LIST (RFC6154)
+ for folder in self.list_folders():
+ if folder and len(folder[0]) > 0 and folder_flag in folder[0]:
+ return folder[2]
+
+ # Detect folder by looking for common names
+ # We only look for folders in the "personal" namespace of the user
+ if self.has_capability("NAMESPACE"):
+ personal_namespaces = self.namespace().personal
+ else:
+ personal_namespaces = _POPULAR_PERSONAL_NAMESPACES
+
+ for personal_namespace in personal_namespaces:
+ for pattern in _POPULAR_SPECIAL_FOLDERS.get(folder_flag, tuple()):
+ pattern = personal_namespace[0] + pattern
+ sent_folders = self.list_folders(pattern=pattern)
+ if sent_folders:
+ return sent_folders[0][2]
+
+ return None
def select_folder(self, folder, readonly=False):
"""Set the current folder on the server.
@@ -510,18 +816,54 @@ class IMAPClient:
b'UIDNEXT': 11,
b'UIDVALIDITY': 1239278212}
"""
- pass
+ self._command_and_check("select", self._normalise_folder(folder), readonly)
+ return self._process_select_response(self._imap.untagged_responses)
- @require_capability('UNSELECT')
+ @require_capability("UNSELECT")
def unselect_folder(self):
- """Unselect the current folder and release associated resources.
+ r"""Unselect the current folder and release associated resources.
Unlike ``close_folder``, the ``UNSELECT`` command does not expunge
- the mailbox, keeping messages with \\Deleted flag set for example.
+ the mailbox, keeping messages with \Deleted flag set for example.
Returns the UNSELECT response string returned by the server.
"""
- pass
+ logger.debug("< UNSELECT")
+ # IMAP4 class has no `unselect` method so we can't use `_command_and_check` there
+ _typ, data = self._imap._simple_command("UNSELECT")
+ return data[0]
+
+ def _process_select_response(self, resp):
+ untagged = _dict_bytes_normaliser(resp)
+ out = {}
+
+ # imaplib doesn't parse these correctly (broken regex) so replace
+ # with the raw values out of the OK section
+ for line in untagged.get("OK", []):
+ match = _RE_SELECT_RESPONSE.match(line)
+ if match:
+ key = match.group("key")
+ if key == b"PERMANENTFLAGS":
+ out[key] = tuple(match.group("data").split())
+
+ for key, value in untagged.items():
+ key = key.upper()
+ if key in (b"OK", b"PERMANENTFLAGS"):
+ continue # already handled above
+ if key in (
+ b"EXISTS",
+ b"RECENT",
+ b"UIDNEXT",
+ b"UIDVALIDITY",
+ b"HIGHESTMODSEQ",
+ ):
+ value = int(value[0])
+ elif key == b"READ-WRITE":
+ value = True
+ elif key == b"FLAGS":
+ value = tuple(value[0][1:-1].split())
+ out[key] = value
+ return out
def noop(self):
"""Execute the NOOP command.
@@ -539,9 +881,10 @@ class IMAPClient:
(6, b'FETCH', (b'FLAGS', (b'sne',)))])
"""
- pass
+ tag = self._imap._command("NOOP")
+ return self._consume_until_tagged_response(tag, "NOOP")
- @require_capability('IDLE')
+ @require_capability("IDLE")
def idle(self):
"""Put the server into IDLE mode.
@@ -557,7 +900,10 @@ class IMAPClient:
See :rfc:`2177` for more information about the IDLE extension.
"""
- pass
+ self._idle_tag = self._imap._command("IDLE")
+ resp = self._imap._get_response()
+ if resp is not None:
+ raise exceptions.IMAPClientError("Unexpected IDLE response: %s" % resp)
def _poll_socket(self, sock, timeout=None):
"""
@@ -565,7 +911,10 @@ class IMAPClient:
This implementation is more scalable because it ALLOWS your process
to have more than 1024 file descriptors.
"""
- pass
+ poller = select.poll()
+ poller.register(sock.fileno(), select.POLLIN)
+ timeout = timeout * 1000 if timeout is not None else None
+ return poller.poll(timeout)
def _select_poll_socket(self, sock, timeout=None):
"""
@@ -574,9 +923,9 @@ class IMAPClient:
has more than 1024 file descriptors.
We still need this for Windows and some other niche systems.
"""
- pass
+ return select.select([sock], [], [], timeout)[0]
- @require_capability('IDLE')
+ @require_capability("IDLE")
def idle_check(self, timeout=None):
"""Check for any IDLE responses sent by the server.
@@ -595,9 +944,42 @@ class IMAPClient:
(1, b'EXISTS'),
(1, b'FETCH', (b'FLAGS', (b'\\NotJunk',)))]
"""
- pass
+ sock = self.socket()
+
+ # make the socket non-blocking so the timeout can be
+ # implemented for this call
+ sock.settimeout(None)
+ sock.setblocking(0)
+
+ if POLL_SUPPORT:
+ poll_func = self._poll_socket
+ else:
+ poll_func = self._select_poll_socket
- @require_capability('IDLE')
+ try:
+ resps = []
+ events = poll_func(sock, timeout)
+ if events:
+ while True:
+ try:
+ line = self._imap._get_line()
+ except (socket.timeout, socket.error):
+ break
+ except IMAPClient.AbortError:
+ # An imaplib.IMAP4.abort with "EOF" is raised
+ # under Python 3
+ err = sys.exc_info()[1]
+ if "EOF" in err.args[0]:
+ break
+ raise
+ else:
+ resps.append(_parse_untagged_response(line))
+ return resps
+ finally:
+ sock.setblocking(1)
+ self._set_read_timeout()
+
+ @require_capability("IDLE")
def idle_done(self):
"""Take the server out of IDLE mode.
@@ -612,7 +994,9 @@ class IMAPClient:
any). These are returned in parsed form as per
``idle_check()``.
"""
- pass
+ logger.debug("< DONE")
+ self._imap.send(b"DONE\r\n")
+ return self._consume_until_tagged_response(self._idle_tag, "IDLE")
def folder_status(self, folder, what=None):
"""Return the status of *folder*.
@@ -624,39 +1008,58 @@ class IMAPClient:
Returns a dictionary of the status items for the folder with
keys matching *what*.
"""
- pass
+ if what is None:
+ what = ("MESSAGES", "RECENT", "UIDNEXT", "UIDVALIDITY", "UNSEEN")
+ else:
+ what = normalise_text_list(what)
+ what_ = "(%s)" % (" ".join(what))
+
+ fname = self._normalise_folder(folder)
+ data = self._command_and_check("status", fname, what_)
+ response = parse_response(data)
+ status_items = response[-1]
+ return dict(as_pairs(status_items))
def close_folder(self):
"""Close the currently selected folder, returning the server
response string.
"""
- pass
+ return self._command_and_check("close", unpack=True)
def create_folder(self, folder):
"""Create *folder* on the server returning the server response string."""
- pass
+ return self._command_and_check(
+ "create", self._normalise_folder(folder), unpack=True
+ )
def rename_folder(self, old_name, new_name):
"""Change the name of a folder on the server."""
- pass
+ return self._command_and_check(
+ "rename",
+ self._normalise_folder(old_name),
+ self._normalise_folder(new_name),
+ unpack=True,
+ )
def delete_folder(self, folder):
"""Delete *folder* on the server returning the server response string."""
- pass
+ return self._command_and_check(
+ "delete", self._normalise_folder(folder), unpack=True
+ )
def folder_exists(self, folder):
"""Return ``True`` if *folder* exists on the server."""
- pass
+ return len(self.list_folders("", folder)) > 0
def subscribe_folder(self, folder):
"""Subscribe to *folder*, returning the server response string."""
- pass
+ return self._command_and_check("subscribe", self._normalise_folder(folder))
def unsubscribe_folder(self, folder):
"""Unsubscribe to *folder*, returning the server response string."""
- pass
+ return self._command_and_check("unsubscribe", self._normalise_folder(folder))
- def search(self, criteria='ALL', charset=None):
+ def search(self, criteria="ALL", charset=None):
"""Return a list of messages ids from the currently selected
folder matching *criteria*.
@@ -716,10 +1119,10 @@ class IMAPClient:
in the search).
"""
- pass
+ return self._search(criteria, charset)
- @require_capability('X-GM-EXT-1')
- def gmail_search(self, query, charset='UTF-8'):
+ @require_capability("X-GM-EXT-1")
+ def gmail_search(self, query, charset="UTF-8"):
"""Search using Gmail's X-GM-RAW attribute.
*query* should be a valid Gmail search query string. For
@@ -733,10 +1136,40 @@ class IMAPClient:
See https://developers.google.com/gmail/imap_extensions#extension_of_the_search_command_x-gm-raw
for more info.
"""
- pass
+ return self._search([b"X-GM-RAW", query], charset)
+
+ def _search(self, criteria, charset):
+ args = []
+ if charset:
+ args.extend([b"CHARSET", to_bytes(charset)])
+ args.extend(_normalise_search_criteria(criteria, charset))
+
+ try:
+ data = self._raw_command_untagged(b"SEARCH", args)
+ except imaplib.IMAP4.error as e:
+ # Make BAD IMAP responses easier to understand to the user, with a link to the docs
+ m = re.match(r"SEARCH command error: BAD \[(.+)\]", str(e))
+ if m:
+ raise exceptions.InvalidCriteriaError(
+ "{original_msg}\n\n"
+ "This error may have been caused by a syntax error in the criteria: "
+ "{criteria}\nPlease refer to the documentation for more information "
+ "about search criteria syntax..\n"
+ "https://imapclient.readthedocs.io/en/master/#imapclient.IMAPClient.search".format(
+ original_msg=m.group(1),
+ criteria='"%s"' % criteria
+ if not isinstance(criteria, list)
+ else criteria,
+ )
+ )
+
+ # If the exception is not from a BAD IMAP response, re-raise as-is
+ raise
- @require_capability('SORT')
- def sort(self, sort_criteria, criteria='ALL', charset='UTF-8'):
+ return parse_message_list(data)
+
+ @require_capability("SORT")
+ def sort(self, sort_criteria, criteria="ALL", charset="UTF-8"):
"""Return a list of message ids from the currently selected
folder, sorted by *sort_criteria* and optionally filtered by
*criteria*.
@@ -758,9 +1191,15 @@ class IMAPClient:
Note that SORT is an extension to the IMAP4 standard so it may
not be supported by all IMAP servers.
"""
- pass
-
- def thread(self, algorithm='REFERENCES', criteria='ALL', charset='UTF-8'):
+ args = [
+ _normalise_sort_criteria(sort_criteria),
+ to_bytes(charset),
+ ]
+ args.extend(_normalise_search_criteria(criteria, charset))
+ ids = self._raw_command_untagged(b"SORT", args, unpack=True)
+ return [int(i) for i in ids.split()]
+
+ def thread(self, algorithm="REFERENCES", criteria="ALL", charset="UTF-8"):
"""Return a list of messages threads from the currently
selected folder which match *criteria*.
@@ -777,7 +1216,17 @@ class IMAPClient:
See :rfc:`5256` for more details.
"""
- pass
+ algorithm = to_bytes(algorithm)
+ if not self.has_capability(b"THREAD=" + algorithm):
+ raise exceptions.CapabilityError(
+ "The server does not support %s threading algorithm" % algorithm
+ )
+
+ args = [algorithm, to_bytes(charset)] + _normalise_search_criteria(
+ criteria, charset
+ )
+ data = self._raw_command_untagged(b"THREAD", args)
+ return parse_response(data)
def get_flags(self, messages):
"""Return the flags set for each message in *messages* from
@@ -786,7 +1235,8 @@ class IMAPClient:
The return value is a dictionary structured like this: ``{
msgid1: (flag1, flag2, ... ), }``.
"""
- pass
+ response = self.fetch(messages, ["FLAGS"])
+ return self._filter_fetch_dict(response, b"FLAGS")
def add_flags(self, messages, flags, silent=False):
"""Add *flags* to *messages* in the currently selected folder.
@@ -796,7 +1246,7 @@ class IMAPClient:
Returns the flags set for each modified message (see
*get_flags*), or None if *silent* is true.
"""
- pass
+ return self._store(b"+FLAGS", messages, flags, b"FLAGS", silent=silent)
def remove_flags(self, messages, flags, silent=False):
"""Remove one or more *flags* from *messages* in the currently
@@ -807,7 +1257,7 @@ class IMAPClient:
Returns the flags set for each modified message (see
*get_flags*), or None if *silent* is true.
"""
- pass
+ return self._store(b"-FLAGS", messages, flags, b"FLAGS", silent=silent)
def set_flags(self, messages, flags, silent=False):
"""Set the *flags* for *messages* in the currently selected
@@ -818,7 +1268,7 @@ class IMAPClient:
Returns the flags set for each modified message (see
*get_flags*), or None if *silent* is true.
"""
- pass
+ return self._store(b"FLAGS", messages, flags, b"FLAGS", silent=silent)
def get_gmail_labels(self, messages):
"""Return the label set for each message in *messages* in the
@@ -830,7 +1280,9 @@ class IMAPClient:
This only works with IMAP servers that support the X-GM-LABELS
attribute (eg. Gmail).
"""
- pass
+ response = self.fetch(messages, [b"X-GM-LABELS"])
+ response = self._filter_fetch_dict(response, b"X-GM-LABELS")
+ return {msg: utf7_decode_sequence(labels) for msg, labels in response.items()}
def add_gmail_labels(self, messages, labels, silent=False):
"""Add *labels* to *messages* in the currently selected folder.
@@ -843,7 +1295,7 @@ class IMAPClient:
This only works with IMAP servers that support the X-GM-LABELS
attribute (eg. Gmail).
"""
- pass
+ return self._gm_label_store(b"+X-GM-LABELS", messages, labels, silent=silent)
def remove_gmail_labels(self, messages, labels, silent=False):
"""Remove one or more *labels* from *messages* in the
@@ -857,7 +1309,7 @@ class IMAPClient:
This only works with IMAP servers that support the X-GM-LABELS
attribute (eg. Gmail).
"""
- pass
+ return self._gm_label_store(b"-X-GM-LABELS", messages, labels, silent=silent)
def set_gmail_labels(self, messages, labels, silent=False):
"""Set the *labels* for *messages* in the currently selected
@@ -871,7 +1323,7 @@ class IMAPClient:
This only works with IMAP servers that support the X-GM-LABELS
attribute (eg. Gmail).
"""
- pass
+ return self._gm_label_store(b"X-GM-LABELS", messages, labels, silent=silent)
def delete_messages(self, messages, silent=False):
"""Delete one or more *messages* from the currently selected
@@ -880,7 +1332,7 @@ class IMAPClient:
Returns the flags set for each modified message (see
*get_flags*).
"""
- pass
+ return self.add_flags(messages, DELETED, silent=silent)
def fetch(self, messages, data, modifiers=None):
"""Retrieve selected *data* associated with one or more
@@ -922,7 +1374,22 @@ class IMAPClient:
b'SEQ': 110}}
"""
- pass
+ if not messages:
+ return {}
+
+ args = [
+ "FETCH",
+ join_message_ids(messages),
+ seq_to_parenstr_upper(data),
+ seq_to_parenstr_upper(modifiers) if modifiers else None,
+ ]
+ if self.use_uid:
+ args.insert(0, "UID")
+ tag = self._imap._command(*args)
+ typ, data = self._imap._command_complete("FETCH", tag)
+ self._checkok("fetch", typ, data)
+ typ, data = self._imap._untagged_response(typ, data, "FETCH")
+ return parse_fetch_response(data, self.normalise_times, self.use_uid)
def append(self, folder, msg, flags=(), msg_time=None):
"""Append a message to *folder*.
@@ -941,9 +1408,21 @@ class IMAPClient:
Returns the APPEND response as returned by the server.
"""
- pass
-
- @require_capability('MULTIAPPEND')
+ if msg_time:
+ time_val = '"%s"' % datetime_to_INTERNALDATE(msg_time)
+ time_val = to_unicode(time_val)
+ else:
+ time_val = None
+ return self._command_and_check(
+ "append",
+ self._normalise_folder(folder),
+ seq_to_parenstr(flags),
+ time_val,
+ to_bytes(msg),
+ unpack=True,
+ )
+
+ @require_capability("MULTIAPPEND")
def multiappend(self, folder, msgs):
"""Append messages to *folder* using the MULTIAPPEND feature from :rfc:`3502`.
@@ -955,16 +1434,40 @@ class IMAPClient:
Returns the APPEND response from the server.
"""
- pass
+
+ def chunks():
+ for m in msgs:
+ if isinstance(m, dict):
+ if "flags" in m:
+ yield to_bytes(seq_to_parenstr(m["flags"]))
+ if "date" in m:
+ yield to_bytes('"%s"' % datetime_to_INTERNALDATE(m["date"]))
+ yield _literal(to_bytes(m["msg"]))
+ else:
+ yield _literal(to_bytes(m))
+
+ msgs = list(chunks())
+
+ return self._raw_command(
+ b"APPEND",
+ [self._normalise_folder(folder)] + msgs,
+ uid=False,
+ )
def copy(self, messages, folder):
"""Copy one or more messages from the current folder to
*folder*. Returns the COPY response string returned by the
server.
"""
- pass
-
- @require_capability('MOVE')
+ return self._command_and_check(
+ "copy",
+ join_message_ids(messages),
+ self._normalise_folder(folder),
+ uid=True,
+ unpack=True,
+ )
+
+ @require_capability("MOVE")
def move(self, messages, folder):
"""Atomically move messages to another folder.
@@ -973,7 +1476,13 @@ class IMAPClient:
:param messages: List of message UIDs to move.
:param folder: The destination folder name.
"""
- pass
+ return self._command_and_check(
+ "move",
+ join_message_ids(messages),
+ self._normalise_folder(folder),
+ uid=True,
+ unpack=True,
+ )
def expunge(self, messages=None):
"""Use of the *messages* argument is discouraged.
@@ -1008,9 +1517,16 @@ class IMAPClient:
See :rfc:`4315#section-2.1` section 2.1 for more details.
"""
- pass
-
- @require_capability('UIDPLUS')
+ if messages:
+ if not self.use_uid:
+ raise ValueError("cannot EXPUNGE by ID when not using uids")
+ return self._command_and_check(
+ "EXPUNGE", join_message_ids(messages), uid=True
+ )
+ tag = self._imap._command("EXPUNGE")
+ return self._consume_until_tagged_response(tag, "EXPUNGE")
+
+ @require_capability("UIDPLUS")
def uid_expunge(self, messages):
"""Expunge deleted messages with the specified message ids from the
folder.
@@ -1019,34 +1535,39 @@ class IMAPClient:
See :rfc:`4315#section-2.1` section 2.1 for more details.
"""
- pass
+ return self._command_and_check("EXPUNGE", join_message_ids(messages), uid=True)
- @require_capability('ACL')
+ @require_capability("ACL")
def getacl(self, folder):
"""Returns a list of ``(who, acl)`` tuples describing the
access controls for *folder*.
"""
- pass
+ data = self._command_and_check("getacl", self._normalise_folder(folder))
+ parts = list(response_lexer.TokenSource(data))
+ parts = parts[1:] # First item is folder name
+ return [(parts[i], parts[i + 1]) for i in range(0, len(parts), 2)]
- @require_capability('ACL')
+ @require_capability("ACL")
def setacl(self, folder, who, what):
"""Set an ACL (*what*) for user (*who*) for a folder.
Set *what* to an empty string to remove an ACL. Returns the
server response string.
"""
- pass
+ return self._command_and_check(
+ "setacl", self._normalise_folder(folder), who, what, unpack=True
+ )
- @require_capability('QUOTA')
- def get_quota(self, mailbox='INBOX'):
+ @require_capability("QUOTA")
+ def get_quota(self, mailbox="INBOX"):
"""Get the quotas associated with a mailbox.
Returns a list of Quota objects.
"""
- pass
+ return self.get_quota_root(mailbox)[1]
- @require_capability('QUOTA')
- def _get_quota(self, quota_root=''):
+ @require_capability("QUOTA")
+ def _get_quota(self, quota_root=""):
"""Get the quotas associated with a quota root.
This method is not private but put behind an underscore to show that
@@ -1055,9 +1576,9 @@ class IMAPClient:
Returns a list of Quota objects.
"""
- pass
+ return _parse_quota(self._command_and_check("getquota", _quote(quota_root)))
- @require_capability('QUOTA')
+ @require_capability("QUOTA")
def get_quota_root(self, mailbox):
"""Get the quota roots for a mailbox.
@@ -1068,22 +1589,78 @@ class IMAPClient:
Return a tuple of MailboxQuotaRoots and list of Quota associated
"""
- pass
-
- @require_capability('QUOTA')
+ quota_root_rep = self._raw_command_untagged(
+ b"GETQUOTAROOT", to_bytes(mailbox), uid=False, response_name="QUOTAROOT"
+ )
+ quota_rep = self._imap.untagged_responses.pop("QUOTA", [])
+ quota_root_rep = parse_response(quota_root_rep)
+ quota_root = MailboxQuotaRoots(
+ to_unicode(quota_root_rep[0]), [to_unicode(q) for q in quota_root_rep[1:]]
+ )
+ return quota_root, _parse_quota(quota_rep)
+
+ @require_capability("QUOTA")
def set_quota(self, quotas):
"""Set one or more quotas on resources.
:param quotas: list of Quota objects
"""
- pass
+ if not quotas:
+ return
+
+ quota_root = None
+ set_quota_args = []
+
+ for quota in quotas:
+ if quota_root is None:
+ quota_root = quota.quota_root
+ elif quota_root != quota.quota_root:
+ raise ValueError("set_quota only accepts a single quota root")
+
+ set_quota_args.append("{} {}".format(quota.resource, quota.limit))
+
+ set_quota_args = " ".join(set_quota_args)
+ args = [to_bytes(_quote(quota_root)), to_bytes("({})".format(set_quota_args))]
+
+ response = self._raw_command_untagged(
+ b"SETQUOTA", args, uid=False, response_name="QUOTA"
+ )
+ return _parse_quota(response)
def _check_resp(self, expected, command, typ, data):
"""Check command responses for errors.
Raises IMAPClient.Error if the command fails.
"""
- pass
+ if typ != expected:
+ raise exceptions.IMAPClientError(
+ "%s failed: %s" % (command, to_unicode(data[0]))
+ )
+
+ def _consume_until_tagged_response(self, tag, command):
+ tagged_commands = self._imap.tagged_commands
+ resps = []
+ while True:
+ line = self._imap._get_response()
+ if tagged_commands[tag]:
+ break
+ resps.append(_parse_untagged_response(line))
+ typ, data = tagged_commands.pop(tag)
+ self._checkok(command, typ, data)
+ return data[0], resps
+
+ def _raw_command_untagged(
+ self, command, args, response_name=None, unpack=False, uid=True
+ ):
+ # TODO: eventually this should replace _command_and_check (call it _command)
+ typ, data = self._raw_command(command, args, uid=uid)
+ if response_name is None:
+ response_name = command
+ typ, data = self._imap._untagged_response(typ, data, to_unicode(response_name))
+ self._checkok(to_unicode(command), typ, data)
+ if unpack:
+ return data[0]
+ return data
def _raw_command(self, command, args, uid=True):
"""Run the specific command with the arguments given. 8-bit arguments
@@ -1096,23 +1673,184 @@ class IMAPClient:
*command* should be specified as bytes.
*args* should be specified as a list of bytes.
"""
- pass
+ command = command.upper()
+
+ if isinstance(args, tuple):
+ args = list(args)
+ if not isinstance(args, list):
+ args = [args]
+
+ tag = self._imap._new_tag()
+ prefix = [to_bytes(tag)]
+ if uid and self.use_uid:
+ prefix.append(b"UID")
+ prefix.append(command)
+
+ line = []
+ for item, is_last in _iter_with_last(prefix + args):
+ if not isinstance(item, bytes):
+ raise ValueError("command args must be passed as bytes")
+
+ if _is8bit(item):
+ # If a line was already started send it
+ if line:
+ out = b" ".join(line)
+ logger.debug("> %s", out)
+ self._imap.send(out)
+ line = []
+
+ # Now send the (unquoted) literal
+ if isinstance(item, _quoted):
+ item = item.original
+ self._send_literal(tag, item)
+ if not is_last:
+ self._imap.send(b" ")
+ else:
+ line.append(item)
+
+ if line:
+ out = b" ".join(line)
+ logger.debug("> %s", out)
+ self._imap.send(out)
+
+ self._imap.send(b"\r\n")
+
+ return self._imap._command_complete(to_unicode(command), tag)
def _send_literal(self, tag, item):
"""Send a single literal for the command with *tag*."""
- pass
+ if b"LITERAL+" in self._cached_capabilities:
+ out = b" {" + str(len(item)).encode("ascii") + b"+}\r\n" + item
+ logger.debug("> %s", debug_trunc(out, 64))
+ self._imap.send(out)
+ return
+
+ out = b" {" + str(len(item)).encode("ascii") + b"}\r\n"
+ logger.debug("> %s", out)
+ self._imap.send(out)
+
+ # Wait for continuation response
+ while self._imap._get_response():
+ tagged_resp = self._imap.tagged_commands.get(tag)
+ if tagged_resp:
+ raise exceptions.IMAPClientAbortError(
+ "unexpected response while waiting for continuation response: "
+ + repr(tagged_resp)
+ )
+
+ logger.debug(" (literal) > %s", debug_trunc(item, 256))
+ self._imap.send(item)
+
+ def _command_and_check(
+ self, command, *args, unpack: bool = False, uid: bool = False
+ ):
+ if uid and self.use_uid:
+ command = to_unicode(command) # imaplib must die
+ typ, data = self._imap.uid(command, *args)
+ else:
+ meth = getattr(self._imap, to_unicode(command))
+ typ, data = meth(*args)
+ self._checkok(command, typ, data)
+ if unpack:
+ return data[0]
+ return data
+
+ def _checkok(self, command, typ, data):
+ self._check_resp("OK", command, typ, data)
+
+ def _gm_label_store(self, cmd, messages, labels, silent):
+ response = self._store(
+ cmd, messages, self._normalise_labels(labels), b"X-GM-LABELS", silent=silent
+ )
+ return (
+ {msg: utf7_decode_sequence(labels) for msg, labels in response.items()}
+ if response
+ else None
+ )
def _store(self, cmd, messages, flags, fetch_key, silent):
"""Worker function for the various flag manipulation methods.
*cmd* is the STORE command to use (eg. '+FLAGS').
"""
- pass
+ if not messages:
+ return {}
+ if silent:
+ cmd += b".SILENT"
+
+ data = self._command_and_check(
+ "store", join_message_ids(messages), cmd, seq_to_parenstr(flags), uid=True
+ )
+ if silent:
+ return None
+ return self._filter_fetch_dict(parse_fetch_response(data), fetch_key)
+
+ def _filter_fetch_dict(self, fetch_dict, key):
+ return dict((msgid, data[key]) for msgid, data in fetch_dict.items())
+
+ def _normalise_folder(self, folder_name):
+ if isinstance(folder_name, bytes):
+ folder_name = folder_name.decode("ascii")
+ if self.folder_encode:
+ folder_name = encode_utf7(folder_name)
+ return _quote(folder_name)
+
+ def _normalise_labels(self, labels):
+ if isinstance(labels, (str, bytes)):
+ labels = (labels,)
+ return [_quote(encode_utf7(label)) for label in labels]
@property
def welcome(self):
"""access the server greeting message"""
- pass
+ try:
+ return self._imap.welcome
+ except AttributeError:
+ pass
+
+
+def _quote(arg):
+ if isinstance(arg, str):
+ arg = arg.replace("\\", "\\\\")
+ arg = arg.replace('"', '\\"')
+ q = '"'
+ else:
+ arg = arg.replace(b"\\", b"\\\\")
+ arg = arg.replace(b'"', b'\\"')
+ q = b'"'
+ return q + arg + q
+
+
+def _normalise_search_criteria(criteria, charset=None):
+ if not criteria:
+ raise exceptions.InvalidCriteriaError("no criteria specified")
+ if not charset:
+ charset = "us-ascii"
+
+ if isinstance(criteria, (str, bytes)):
+ return [to_bytes(criteria, charset)]
+
+ out = []
+ for item in criteria:
+ if isinstance(item, int):
+ out.append(str(item).encode("ascii"))
+ elif isinstance(item, (datetime, date)):
+ out.append(format_criteria_date(item))
+ elif isinstance(item, (list, tuple)):
+ # Process nested criteria list and wrap in parens.
+ inner = _normalise_search_criteria(item)
+ inner[0] = b"(" + inner[0]
+ inner[-1] = inner[-1] + b")"
+ out.extend(inner) # flatten
+ else:
+ out.append(_quoted.maybe(to_bytes(item, charset)))
+ return out
+
+
+def _normalise_sort_criteria(criteria, charset=None):
+ if isinstance(criteria, (str, bytes)):
+ criteria = [criteria]
+ return b"(" + b" ".join(to_bytes(item).upper() for item in criteria) + b")"
class _literal(bytes):
@@ -1137,14 +1875,87 @@ class _quoted(bytes):
holds the quoted version of the input while also providing
access to the original unquoted source.
"""
- pass
+ quoted = original.replace(b"\\", b"\\\\")
+ quoted = quoted.replace(b'"', b'\\"')
+ if quoted != original or b" " in quoted or not quoted:
+ out = cls(b'"' + quoted + b'"')
+ out.original = original
+ return out
+ return original
+
+
+# normalise_text_list, seq_to_parentstr etc have to return unicode
+# because imaplib handles flags and sort criteria assuming these are
+# passed as unicode
+def normalise_text_list(items):
+ return list(_normalise_text_list(items))
+
+
+def seq_to_parenstr(items):
+ return _join_and_paren(_normalise_text_list(items))
+
+
+def seq_to_parenstr_upper(items):
+ return _join_and_paren(item.upper() for item in _normalise_text_list(items))
+
+
+def _join_and_paren(items):
+ return "(" + " ".join(items) + ")"
+
+
+def _normalise_text_list(items):
+ if isinstance(items, (str, bytes)):
+ items = (items,)
+ return (to_unicode(c) for c in items)
def join_message_ids(messages):
"""Convert a sequence of messages ids or a single integer message id
into an id byte string for use with IMAP commands
"""
- pass
+ if isinstance(messages, (str, bytes, int)):
+ messages = (to_bytes(messages),)
+ return b",".join(_maybe_int_to_bytes(m) for m in messages)
+
+
+def _maybe_int_to_bytes(val):
+ if isinstance(val, int):
+ return str(val).encode("us-ascii")
+ return to_bytes(val)
+
+
+def _parse_untagged_response(text):
+ assert_imap_protocol(text.startswith(b"* "))
+ text = text[2:]
+ if text.startswith((b"OK ", b"NO ")):
+ return tuple(text.split(b" ", 1))
+ return parse_response([text])
+
+
+def as_pairs(items):
+ i = 0
+ last_item = None
+ for item in items:
+ if i % 2:
+ yield last_item, item
+ else:
+ last_item = item
+ i += 1
+
+
+def as_triplets(items):
+ a = iter(items)
+ return zip(a, a, a)
+
+
+def _is8bit(data):
+ return isinstance(data, _literal) or any(b > 127 for b in data)
+
+
+def _iter_with_last(items):
+ last_i = len(items) - 1
+ for i, item in enumerate(items):
+ yield item, i == last_i
_not_present = object()
@@ -1157,6 +1968,12 @@ class _dict_bytes_normaliser:
def __init__(self, d):
self._d = d
+
+ def iteritems(self):
+ for key, value in self._d.items():
+ yield to_bytes(key), value
+
+ # For Python 3 compatibility.
items = iteritems
def __contains__(self, ink):
@@ -1165,6 +1982,73 @@ class _dict_bytes_normaliser:
return True
return False
+ def get(self, ink, default=_not_present):
+ for k in self._gen_keys(ink):
+ try:
+ return self._d[k]
+ except KeyError:
+ pass
+ if default == _not_present:
+ raise KeyError(ink)
+ return default
+
+ def pop(self, ink, default=_not_present):
+ for k in self._gen_keys(ink):
+ try:
+ return self._d.pop(k)
+ except KeyError:
+ pass
+ if default == _not_present:
+ raise KeyError(ink)
+ return default
+
+ def _gen_keys(self, k):
+ yield k
+ if isinstance(k, bytes):
+ yield to_unicode(k)
+ else:
+ yield to_bytes(k)
+
+
+def debug_trunc(v, maxlen):
+ if len(v) < maxlen:
+ return repr(v)
+ hl = maxlen // 2
+ return repr(v[:hl]) + "..." + repr(v[-hl:])
+
+
+def utf7_decode_sequence(seq):
+ return [decode_utf7(s) for s in seq]
+
+
+def _parse_quota(quota_rep):
+ quota_rep = parse_response(quota_rep)
+ rv = []
+ for quota_root, quota_resource_infos in as_pairs(quota_rep):
+ for quota_resource_info in as_triplets(quota_resource_infos):
+ rv.append(
+ Quota(
+ quota_root=to_unicode(quota_root),
+ resource=to_unicode(quota_resource_info[0]),
+ usage=quota_resource_info[1],
+ limit=quota_resource_info[2],
+ )
+ )
+ return rv
+
class IMAPlibLoggerAdapter(LoggerAdapter):
"""Adapter preventing IMAP secrets from going to the logging facility."""
+
+ def process(self, msg, kwargs):
+ # msg is usually unicode but see #367. Convert bytes to
+ # unicode if required.
+ if isinstance(msg, bytes):
+ msg = msg.decode("ascii", "ignore")
+
+ for command in ("LOGIN", "AUTHENTICATE"):
+ if msg.startswith(">") and command in msg:
+ msg_start = msg.split(command)[0]
+ msg = "{}{} **REDACTED**".format(msg_start, command)
+ break
+ return super().process(msg, kwargs)
diff --git a/imapclient/interact.py b/imapclient/interact.py
index a57a825..7abdbaa 100644
--- a/imapclient/interact.py
+++ b/imapclient/interact.py
@@ -1,6 +1,155 @@
+#!/usr/bin/python
+
+# Copyright (c) 2020, Menno Smits
+# Released subject to the New BSD License
+# Please see http://en.wikipedia.org/wiki/BSD_licenses
+
import argparse
from getpass import getpass
+
from . import imapclient
from .config import create_client_from_config, get_config_defaults, parse_config_file
-if __name__ == '__main__':
+
+
+def command_line() -> argparse.Namespace:
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "-H", "--host", dest="host", action="store", help="IMAP host connect to"
+ )
+ parser.add_argument(
+ "-u",
+ "--username",
+ dest="username",
+ action="store",
+ help="Username to login with",
+ )
+ parser.add_argument(
+ "-p",
+ "--password",
+ dest="password",
+ action="store",
+ help="Password to login with",
+ )
+ parser.add_argument(
+ "-P",
+ "--port",
+ dest="port",
+ action="store",
+ type=int,
+ default=None,
+ help="IMAP port to use (default is 993 for TLS, or 143 otherwise)",
+ )
+
+ ssl_group = parser.add_mutually_exclusive_group()
+ ssl_group.add_argument(
+ "-s",
+ "--ssl",
+ dest="ssl",
+ action="store_true",
+ default=None,
+ help="Use SSL/TLS connection (default)",
+ )
+ ssl_group.add_argument(
+ "--insecure",
+ dest="insecure",
+ action="store_true",
+ default=False,
+ help="Use insecure connection (i.e. without SSL/TLS)",
+ )
+
+ parser.add_argument(
+ "-f",
+ "--file",
+ dest="file",
+ action="store",
+ default=None,
+ help="Config file (same as livetest)",
+ )
+
+ args = parser.parse_args()
+
+ if args.file:
+ if (
+ args.host
+ or args.username
+ or args.password
+ or args.port
+ or args.ssl
+ or args.insecure
+ ):
+ parser.error("If -f/--file is given no other options can be used")
+ # Use the options in the config file
+ args = parse_config_file(args.file)
+ return args
+
+ args.ssl = not args.insecure
+
+ # Scan through arguments, filling in defaults and prompting when
+ # a compulsory argument wasn't provided.
+ compulsory_args = ("host", "username", "password")
+ for name, default_value in get_config_defaults().items():
+ value = getattr(args, name, default_value)
+ if name in compulsory_args and value is None:
+ value = getpass(name + ": ")
+ setattr(args, name, value)
+
+ return args
+
+
+def main() -> int:
+ args = command_line()
+ print("Connecting...")
+ client = create_client_from_config(args)
+ print("Connected.")
+ banner = '\nIMAPClient instance is "c"'
+
+ def ptpython(c: imapclient.IMAPClient) -> None:
+ from ptpython.repl import embed # type: ignore[import-not-found]
+
+ embed(globals(), locals())
+
+ def ipython_400(c: imapclient.IMAPClient) -> None:
+ from IPython.terminal.embed import ( # type: ignore[import-not-found]
+ InteractiveShellEmbed,
+ )
+
+ ipshell = InteractiveShellEmbed(banner1=banner)
+ ipshell("")
+
+ def ipython_011(c: imapclient.IMAPClient) -> None:
+ from IPython.frontend.terminal.embed import ( # type: ignore[import-not-found]
+ InteractiveShellEmbed,
+ )
+
+ ipshell = InteractiveShellEmbed(banner1=banner)
+ ipshell("")
+
+ def ipython_010(c: imapclient.IMAPClient) -> None:
+ from IPython.Shell import IPShellEmbed # type: ignore[import-not-found]
+
+ IPShellEmbed("", banner=banner)()
+
+ def builtin(c: imapclient.IMAPClient) -> None:
+ import code
+
+ code.interact(banner, local={"c": c})
+
+ shell_attempts = (
+ ptpython,
+ ipython_400,
+ ipython_011,
+ ipython_010,
+ builtin,
+ )
+ for shell in shell_attempts:
+ try:
+ shell(client)
+ except ImportError:
+ pass
+ else:
+ break
+ return 0
+
+
+if __name__ == "__main__":
main()
diff --git a/imapclient/response_lexer.py b/imapclient/response_lexer.py
index cd54a2b..1b7d8da 100644
--- a/imapclient/response_lexer.py
+++ b/imapclient/response_lexer.py
@@ -1,20 +1,29 @@
+# Copyright (c) 2014, Menno Smits
+# Released subject to the New BSD License
+# Please see http://en.wikipedia.org/wiki/BSD_licenses
+
"""
A lexical analyzer class for IMAP responses.
Although Lexer does all the work, TokenSource is the class to use for
external callers.
"""
+
from typing import Iterator, List, Optional, Tuple, TYPE_CHECKING, Union
+
from .util import assert_imap_protocol
-__all__ = ['TokenSource']
+
+__all__ = ["TokenSource"]
+
CTRL_CHARS = frozenset(c for c in range(32))
ALL_CHARS = frozenset(c for c in range(256))
SPECIALS = frozenset(c for c in b' ()%"[')
NON_SPECIALS = ALL_CHARS - SPECIALS - CTRL_CHARS
-WHITESPACE = frozenset(c for c in b' \t\r\n')
-BACKSLASH = ord('\\')
-OPEN_SQUARE = ord('[')
-CLOSE_SQUARE = ord(']')
+WHITESPACE = frozenset(c for c in b" \t\r\n")
+
+BACKSLASH = ord("\\")
+OPEN_SQUARE = ord("[")
+CLOSE_SQUARE = ord("]")
DOUBLE_QUOTE = ord('"')
@@ -28,7 +37,13 @@ class TokenSource:
self.lex = Lexer(text)
self.src = iter(self.lex)
- def __iter__(self) ->Iterator[bytes]:
+ @property
+ def current_literal(self) -> Optional[bytes]:
+ if TYPE_CHECKING:
+ assert self.lex.current_source is not None
+ return self.lex.current_source.literal
+
+ def __iter__(self) -> Iterator[bytes]:
return self.src
@@ -41,26 +56,99 @@ class Lexer:
self.sources = (LiteralHandlingIter(chunk) for chunk in text)
self.current_source: Optional[LiteralHandlingIter] = None
- def __iter__(self) ->Iterator[bytes]:
+ def read_until(
+ self, stream_i: "PushableIterator", end_char: int, escape: bool = True
+ ) -> bytearray:
+ token = bytearray()
+ try:
+ for nextchar in stream_i:
+ if escape and nextchar == BACKSLASH:
+ escaper = nextchar
+ nextchar = next(stream_i)
+ if nextchar not in (escaper, end_char):
+ token.append(escaper) # Don't touch invalid escaping
+ elif nextchar == end_char:
+ break
+ token.append(nextchar)
+ else:
+ raise ValueError("No closing '%s'" % chr(end_char))
+ except StopIteration:
+ raise ValueError("No closing '%s'" % chr(end_char))
+ token.append(end_char)
+ return token
+
+ def read_token_stream(self, stream_i: "PushableIterator") -> Iterator[bytearray]:
+ whitespace = WHITESPACE
+ wordchars = NON_SPECIALS
+ read_until = self.read_until
+
+ while True:
+ # Whitespace
+ for nextchar in stream_i:
+ if nextchar not in whitespace:
+ stream_i.push(nextchar)
+ break # done skipping over the whitespace
+
+ # Non-whitespace
+ token = bytearray()
+ for nextchar in stream_i:
+ if nextchar in wordchars:
+ token.append(nextchar)
+ elif nextchar == OPEN_SQUARE:
+ token.append(nextchar)
+ token.extend(read_until(stream_i, CLOSE_SQUARE, escape=False))
+ else:
+ if nextchar in whitespace:
+ yield token
+ elif nextchar == DOUBLE_QUOTE:
+ assert_imap_protocol(not token)
+ token.append(nextchar)
+ token.extend(read_until(stream_i, nextchar))
+ yield token
+ else:
+ # Other punctuation, eg. "(". This ends the current token.
+ if token:
+ yield token
+ yield bytearray([nextchar])
+ break
+ else:
+ if token:
+ yield token
+ break
+
+ def __iter__(self) -> Iterator[bytes]:
for source in self.sources:
self.current_source = source
for tok in self.read_token_stream(iter(source)):
yield bytes(tok)
+# imaplib has poor handling of 'literals' - it both fails to remove the
+# {size} marker, and fails to keep responses grouped into the same logical
+# 'line'. What we end up with is a list of response 'records', where each
+# record is either a simple string, or tuple of (str_with_lit, literal) -
+# where str_with_lit is a string with the {xxx} marker at its end. Note
+# that each element of this list does *not* correspond 1:1 with the
+# untagged responses.
+# (http://bugs.python.org/issue5045 also has comments about this)
+# So: we have a special object for each of these records. When a
+# string literal is processed, we peek into this object to grab the
+# literal.
class LiteralHandlingIter:
-
def __init__(self, resp_record: Union[Tuple[bytes, bytes], bytes]):
self.literal: Optional[bytes]
if isinstance(resp_record, tuple):
+ # A 'record' with a string which includes a literal marker, and
+ # the literal itself.
self.src_text = resp_record[0]
- assert_imap_protocol(self.src_text.endswith(b'}'), self.src_text)
+ assert_imap_protocol(self.src_text.endswith(b"}"), self.src_text)
self.literal = resp_record[1]
else:
+ # just a line with no literals.
self.src_text = resp_record
self.literal = None
- def __iter__(self) ->'PushableIterator':
+ def __iter__(self) -> "PushableIterator":
return PushableIterator(self.src_text)
@@ -71,11 +159,16 @@ class PushableIterator:
self.it = iter(it)
self.pushed: List[int] = []
- def __iter__(self) ->'PushableIterator':
+ def __iter__(self) -> "PushableIterator":
return self
- def __next__(self) ->int:
+ def __next__(self) -> int:
if self.pushed:
return self.pushed.pop()
return next(self.it)
+
+ # For Python 2 compatibility
next = __next__
+
+ def push(self, item: int) -> None:
+ self.pushed.append(item)
diff --git a/imapclient/response_parser.py b/imapclient/response_parser.py
index f632411..9f29e4c 100644
--- a/imapclient/response_parser.py
+++ b/imapclient/response_parser.py
@@ -1,34 +1,45 @@
+# Copyright (c) 2014, Menno Smits
+# Released subject to the New BSD License
+# Please see http://en.wikipedia.org/wiki/BSD_licenses
+
"""
Parsing for IMAP command responses with focus on FETCH responses as
returned by imaplib.
Initially inspired by http://effbot.org/zone/simple-iterator-parser.htm
"""
+
+# TODO more exact error reporting
+
import datetime
import re
import sys
from collections import defaultdict
from typing import cast, Dict, Iterator, List, Optional, Tuple, TYPE_CHECKING, Union
+
from .datetime_util import parse_to_datetime
from .exceptions import ProtocolError
from .response_lexer import TokenSource
from .response_types import Address, BodyData, Envelope, SearchIds
from .typing_imapclient import _Atom
-__all__ = ['parse_response', 'parse_message_list']
+
+__all__ = ["parse_response", "parse_message_list"]
-def parse_response(data: List[bytes]) ->Tuple[_Atom, ...]:
+def parse_response(data: List[bytes]) -> Tuple[_Atom, ...]:
"""Pull apart IMAP command responses.
Returns nested tuples of appropriately typed objects.
"""
- pass
+ if data == [None]:
+ return tuple()
+ return tuple(gen_parsed_response(data))
-_msg_id_pattern = re.compile('(\\d+(?: +\\d+)*)')
+_msg_id_pattern = re.compile(r"(\d+(?: +\d+)*)")
-def parse_message_list(data: List[Union[bytes, str]]) ->SearchIds:
+def parse_message_list(data: List[Union[bytes, str]]) -> SearchIds:
"""Parse a list of message ids and return them as a list.
parse_response is also capable of doing this but this is
@@ -39,18 +50,238 @@ def parse_message_list(data: List[Union[bytes, str]]) ->SearchIds:
attribute which contains the MODSEQ response (if returned by the
server).
"""
- pass
+ if len(data) != 1:
+ raise ValueError("unexpected message list data")
+
+ message_data = data[0]
+ if not message_data:
+ return SearchIds()
+
+ if isinstance(message_data, bytes):
+ message_data = message_data.decode("ascii")
+
+ m = _msg_id_pattern.match(message_data)
+ if not m:
+ raise ValueError("unexpected message list format")
+
+ ids = SearchIds(int(n) for n in m.group(1).split())
+ # Parse any non-numeric part on the end using parse_response (this
+ # is likely to be the MODSEQ section).
+ extra = message_data[m.end(1) :]
+ if extra:
+ for item in parse_response([extra.encode("ascii")]):
+ if (
+ isinstance(item, tuple)
+ and len(item) == 2
+ and cast(bytes, item[0]).lower() == b"modseq"
+ ):
+ if TYPE_CHECKING:
+ assert isinstance(item[1], int)
+ ids.modseq = item[1]
+ elif isinstance(item, int):
+ ids.append(item)
+ return ids
-_ParseFetchResponseInnerDict = Dict[bytes, Optional[Union[datetime.datetime,
- int, BodyData, Envelope, _Atom]]]
+def gen_parsed_response(text: List[bytes]) -> Iterator[_Atom]:
+ if not text:
+ return
+ src = TokenSource(text)
-def parse_fetch_response(text: List[bytes], normalise_times: bool=True,
- uid_is_key: bool=True) ->'defaultdict[int, _ParseFetchResponseInnerDict]':
+ token = None
+ try:
+ for token in src:
+ yield atom(src, token)
+ except ProtocolError:
+ raise
+ except ValueError:
+ _, err, _ = sys.exc_info()
+ raise ProtocolError("%s: %r" % (str(err), token))
+
+
+_ParseFetchResponseInnerDict = Dict[
+ bytes, Optional[Union[datetime.datetime, int, BodyData, Envelope, _Atom]]
+]
+
+
+def parse_fetch_response(
+ text: List[bytes], normalise_times: bool = True, uid_is_key: bool = True
+) -> "defaultdict[int, _ParseFetchResponseInnerDict]":
"""Pull apart IMAP FETCH responses as returned by imaplib.
Returns a dictionary, keyed by message ID. Each value a dictionary
keyed by FETCH field type (eg."RFC822").
"""
- pass
+ if text == [None]:
+ return defaultdict()
+ response = gen_parsed_response(text)
+
+ parsed_response: "defaultdict[int, _ParseFetchResponseInnerDict]" = defaultdict(
+ dict
+ )
+ while True:
+ try:
+ msg_id = seq = _int_or_error(next(response), "invalid message ID")
+ except StopIteration:
+ break
+
+ try:
+ msg_response = next(response)
+ except StopIteration:
+ raise ProtocolError("unexpected EOF")
+
+ if not isinstance(msg_response, tuple):
+ raise ProtocolError("bad response type: %s" % repr(msg_response))
+ if len(msg_response) % 2:
+ raise ProtocolError(
+ "uneven number of response items: %s" % repr(msg_response)
+ )
+
+ # always return the sequence of the message, so it is available
+ # even if we return keyed by UID.
+ msg_data: _ParseFetchResponseInnerDict = {b"SEQ": seq}
+ for i in range(0, len(msg_response), 2):
+ msg_attribute = msg_response[i]
+ if TYPE_CHECKING:
+ assert isinstance(msg_attribute, bytes)
+ word = msg_attribute.upper()
+ value = msg_response[i + 1]
+
+ if word == b"UID":
+ uid = _int_or_error(value, "invalid UID")
+ if uid_is_key:
+ msg_id = uid
+ else:
+ msg_data[word] = uid
+ elif word == b"INTERNALDATE":
+ msg_data[word] = _convert_INTERNALDATE(value, normalise_times)
+ elif word == b"ENVELOPE":
+ msg_data[word] = _convert_ENVELOPE(value, normalise_times)
+ elif word in (b"BODY", b"BODYSTRUCTURE"):
+ if TYPE_CHECKING:
+ assert isinstance(value, tuple)
+ msg_data[word] = BodyData.create(value)
+ else:
+ msg_data[word] = value
+
+ parsed_response[msg_id].update(msg_data)
+
+ return parsed_response
+
+
+def _int_or_error(value: _Atom, error_text: str) -> int:
+ try:
+ return int(value) # type: ignore[arg-type]
+ except (TypeError, ValueError):
+ raise ProtocolError("%s: %s" % (error_text, repr(value)))
+
+
+def _convert_INTERNALDATE(
+ date_string: _Atom, normalise_times: bool = True
+) -> Optional[datetime.datetime]:
+ if date_string is None:
+ return None
+
+ try:
+ if TYPE_CHECKING:
+ assert isinstance(date_string, bytes)
+ return parse_to_datetime(date_string, normalise=normalise_times)
+ except ValueError:
+ return None
+
+
+def _convert_ENVELOPE(
+ envelope_response: _Atom, normalise_times: bool = True
+) -> Envelope:
+ if TYPE_CHECKING:
+ assert isinstance(envelope_response, tuple)
+ dt = None
+ if envelope_response[0]:
+ try:
+ if TYPE_CHECKING:
+ assert isinstance(envelope_response[0], bytes)
+ dt = parse_to_datetime(
+ envelope_response[0],
+ normalise=normalise_times,
+ )
+ except ValueError:
+ pass
+
+ subject = envelope_response[1]
+ in_reply_to = envelope_response[8]
+ message_id = envelope_response[9]
+ if TYPE_CHECKING:
+ assert isinstance(subject, bytes)
+ assert isinstance(in_reply_to, bytes)
+ assert isinstance(message_id, bytes)
+
+ # addresses contains a tuple of addresses
+ # from, sender, reply_to, to, cc, bcc headers
+ addresses: List[Optional[Tuple[Address, ...]]] = []
+ for addr_list in envelope_response[2:8]:
+ addrs = []
+ if addr_list:
+ if TYPE_CHECKING:
+ assert isinstance(addr_list, tuple)
+ for addr_tuple in addr_list:
+ if TYPE_CHECKING:
+ assert isinstance(addr_tuple, tuple)
+ if addr_tuple:
+ if TYPE_CHECKING:
+ addr_tuple = cast(Tuple[bytes, bytes, bytes, bytes], addr_tuple)
+ addrs.append(Address(*addr_tuple))
+ addresses.append(tuple(addrs))
+ else:
+ addresses.append(None)
+
+ return Envelope(
+ date=dt,
+ subject=subject,
+ from_=addresses[0],
+ sender=addresses[1],
+ reply_to=addresses[2],
+ to=addresses[3],
+ cc=addresses[4],
+ bcc=addresses[5],
+ in_reply_to=in_reply_to,
+ message_id=message_id,
+ )
+
+
+def atom(src: TokenSource, token: bytes) -> _Atom:
+ if token == b"(":
+ return parse_tuple(src)
+ if token == b"NIL":
+ return None
+ if token[:1] == b"{":
+ literal_len = int(token[1:-1])
+ literal_text = src.current_literal
+ if literal_text is None:
+ raise ProtocolError("No literal corresponds to %r" % token)
+ if len(literal_text) != literal_len:
+ raise ProtocolError(
+ "Expecting literal of size %d, got %d"
+ % (literal_len, len(literal_text))
+ )
+ return literal_text
+ if len(token) >= 2 and (token[:1] == token[-1:] == b'"'):
+ return token[1:-1]
+ if token.isdigit() and (token[:1] != b"0" or len(token) == 1):
+ # this prevents converting items like 0123 to 123
+ return int(token)
+ return token
+
+
+def parse_tuple(src: TokenSource) -> _Atom:
+ out: List[_Atom] = []
+ for token in src:
+ if token == b")":
+ return tuple(out)
+ out.append(atom(src, token))
+ # no terminator
+ raise ProtocolError('Tuple incomplete before "(%s"' % _fmt_tuple(out))
+
+
+def _fmt_tuple(t: List[_Atom]) -> str:
+ return " ".join(str(item) for item in t)
diff --git a/imapclient/response_types.py b/imapclient/response_types.py
index 7d95b73..cd4631d 100644
--- a/imapclient/response_types.py
+++ b/imapclient/response_types.py
@@ -1,25 +1,30 @@
+# Copyright (c) 2014, Menno Smits
+# Released subject to the New BSD License
+# Please see http://en.wikipedia.org/wiki/BSD_licenses
+
import dataclasses
import datetime
from email.utils import formataddr
from typing import Any, List, Optional, Tuple, TYPE_CHECKING, Union
+
from .typing_imapclient import _Atom
from .util import to_unicode
@dataclasses.dataclass
class Envelope:
- """Represents envelope structures of messages. Returned when parsing
+ r"""Represents envelope structures of messages. Returned when parsing
ENVELOPE responses.
:ivar date: A datetime instance that represents the "Date" header.
:ivar subject: A string that contains the "Subject" header.
- :ivar from\\_: A tuple of Address objects that represent one or more
+ :ivar from\_: A tuple of Address objects that represent one or more
addresses from the "From" header, or None if header does not exist.
- :ivar sender: As for from\\_ but represents the "Sender" header.
- :ivar reply_to: As for from\\_ but represents the "Reply-To" header.
- :ivar to: As for from\\_ but represents the "To" header.
- :ivar cc: As for from\\_ but represents the "Cc" header.
- :ivar bcc: As for from\\_ but represents the "Bcc" recipients.
+ :ivar sender: As for from\_ but represents the "Sender" header.
+ :ivar reply_to: As for from\_ but represents the "Reply-To" header.
+ :ivar to: As for from\_ but represents the "To" header.
+ :ivar cc: As for from\_ but represents the "Cc" header.
+ :ivar bcc: As for from\_ but represents the "Bcc" recipients.
:ivar in_reply_to: A string that contains the "In-Reply-To" header.
:ivar message_id: A string that contains the "Message-Id" header.
@@ -50,12 +55,12 @@ class Envelope:
"""
date: Optional[datetime.datetime]
subject: bytes
- from_: Optional[Tuple['Address', ...]]
- sender: Optional[Tuple['Address', ...]]
- reply_to: Optional[Tuple['Address', ...]]
- to: Optional[Tuple['Address', ...]]
- cc: Optional[Tuple['Address', ...]]
- bcc: Optional[Tuple['Address', ...]]
+ from_: Optional[Tuple["Address", ...]]
+ sender: Optional[Tuple["Address", ...]]
+ reply_to: Optional[Tuple["Address", ...]]
+ to: Optional[Tuple["Address", ...]]
+ cc: Optional[Tuple["Address", ...]]
+ bcc: Optional[Tuple["Address", ...]]
in_reply_to: bytes
message_id: bytes
@@ -83,16 +88,18 @@ class Address:
See also :py:class:`Envelope` for information about handling of
"group syntax".
"""
+
name: bytes
route: bytes
mailbox: bytes
host: bytes
- def __str__(self) ->str:
+ def __str__(self) -> str:
if self.mailbox and self.host:
- address = to_unicode(self.mailbox) + '@' + to_unicode(self.host)
+ address = to_unicode(self.mailbox) + "@" + to_unicode(self.host)
else:
address = to_unicode(self.mailbox or self.host)
+
return formataddr((to_unicode(self.name), address))
@@ -110,10 +117,32 @@ class SearchIds(List[int]):
self.modseq: Optional[int] = None
-_BodyDataType = Tuple[Union[bytes, int, 'BodyData'], '_BodyDataType']
+_BodyDataType = Tuple[Union[bytes, int, "BodyData"], "_BodyDataType"]
class BodyData(_BodyDataType):
"""
Returned when parsing BODY and BODYSTRUCTURE responses.
"""
+
+ @classmethod
+ def create(cls, response: Tuple[_Atom, ...]) -> "BodyData":
+ # In case of multipart messages we will see at least 2 tuples
+ # at the start. Nest these in to a list so that the returned
+ # response tuple always has a consistent number of elements
+ # regardless of whether the message is multipart or not.
+ if isinstance(response[0], tuple):
+ # Multipart, find where the message part tuples stop
+ parts = []
+ for i, part in enumerate(response):
+ if isinstance(part, bytes):
+ break
+ if TYPE_CHECKING:
+ assert isinstance(part, tuple)
+ parts.append(part)
+ return cls(([cls.create(part) for part in parts],) + response[i:])
+ return cls(response)
+
+ @property
+ def is_multipart(self) -> bool:
+ return isinstance(self[0], list)
diff --git a/imapclient/testable_imapclient.py b/imapclient/testable_imapclient.py
index c605b90..c583274 100644
--- a/imapclient/testable_imapclient.py
+++ b/imapclient/testable_imapclient.py
@@ -1,5 +1,10 @@
+# Copyright (c) 2014, Menno Smits
+# Released subject to the New BSD License
+# Please see http://en.wikipedia.org/wiki/BSD_licenses
+
from typing import Any, Dict
from unittest.mock import Mock
+
from .imapclient import IMAPClient
@@ -12,15 +17,23 @@ class TestableIMAPClient(IMAPClient):
IMAP account.
"""
- def __init__(self) ->None:
- super().__init__('somehost')
+ def __init__(self) -> None:
+ super().__init__("somehost")
+ def _create_IMAP4(self) -> "MockIMAP4":
+ return MockIMAP4()
-class MockIMAP4(Mock):
+class MockIMAP4(Mock):
def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
self.use_uid = True
- self.sent = b''
+ self.sent = b"" # Accumulates what was given to send()
self.tagged_commands: Dict[Any, Any] = {}
self._starttls_done = False
+
+ def send(self, data: bytes) -> None:
+ self.sent += data
+
+ def _new_tag(self) -> str:
+ return "tag"
diff --git a/imapclient/tls.py b/imapclient/tls.py
index a700b1a..fe9671e 100644
--- a/imapclient/tls.py
+++ b/imapclient/tls.py
@@ -1,25 +1,68 @@
+# Copyright (c) 2023, Menno Smits
+# Released subject to the New BSD License
+# Please see http://en.wikipedia.org/wiki/BSD_licenses
+
"""
This module contains IMAPClient's functionality related to Transport
Layer Security (TLS a.k.a. SSL).
"""
+
import imaplib
import io
import socket
import ssl
from typing import Optional, TYPE_CHECKING
+
if TYPE_CHECKING:
from typing_extensions import Buffer
+def wrap_socket(
+ sock: socket.socket, ssl_context: Optional[ssl.SSLContext], host: str
+) -> socket.socket:
+ if ssl_context is None:
+ ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
+
+ return ssl_context.wrap_socket(sock, server_hostname=host)
+
+
class IMAP4_TLS(imaplib.IMAP4):
"""IMAP4 client class for TLS/SSL connections.
Adapted from imaplib.IMAP4_SSL.
"""
- def __init__(self, host: str, port: int, ssl_context: Optional[ssl.
- SSLContext], timeout: Optional[float]=None):
+ def __init__(
+ self,
+ host: str,
+ port: int,
+ ssl_context: Optional[ssl.SSLContext],
+ timeout: Optional[float] = None,
+ ):
self.ssl_context = ssl_context
self._timeout = timeout
imaplib.IMAP4.__init__(self, host, port)
self.file: io.BufferedReader
+
+ def open(
+ self, host: str = "", port: int = 993, timeout: Optional[float] = None
+ ) -> None:
+ self.host = host
+ self.port = port
+ sock = socket.create_connection(
+ (host, port), timeout if timeout is not None else self._timeout
+ )
+ self.sock = wrap_socket(sock, self.ssl_context, host)
+ self.file = self.sock.makefile("rb")
+
+ def read(self, size: int) -> bytes:
+ return self.file.read(size)
+
+ def readline(self) -> bytes:
+ return self.file.readline()
+
+ def send(self, data: "Buffer") -> None:
+ self.sock.sendall(data)
+
+ def shutdown(self) -> None:
+ imaplib.IMAP4.shutdown(self)
diff --git a/imapclient/typing_imapclient.py b/imapclient/typing_imapclient.py
index 2fcbe01..a9fc1af 100644
--- a/imapclient/typing_imapclient.py
+++ b/imapclient/typing_imapclient.py
@@ -1,3 +1,4 @@
from typing import Tuple, Union
+
_AtomPart = Union[None, int, bytes]
-_Atom = Union[_AtomPart, Tuple['_Atom', ...]]
+_Atom = Union[_AtomPart, Tuple["_Atom", ...]]
diff --git a/imapclient/util.py b/imapclient/util.py
index 5e3fab3..8f9aa35 100644
--- a/imapclient/util.py
+++ b/imapclient/util.py
@@ -1,6 +1,50 @@
+# Copyright (c) 2015, Menno Smits
+# Released subject to the New BSD License
+# Please see http://en.wikipedia.org/wiki/BSD_licenses
+
import logging
from typing import Iterator, Optional, Tuple, Union
+
from . import exceptions
+
logger = logging.getLogger(__name__)
+
+
+def to_unicode(s: Union[bytes, str]) -> str:
+ if isinstance(s, bytes):
+ try:
+ return s.decode("ascii")
+ except UnicodeDecodeError:
+ logger.warning(
+ "An error occurred while decoding %s in ASCII 'strict' mode. Fallback to "
+ "'ignore' errors handling, some characters might have been stripped",
+ s,
+ )
+ return s.decode("ascii", "ignore")
+ return s
+
+
+def to_bytes(s: Union[bytes, str], charset: str = "ascii") -> bytes:
+ if isinstance(s, str):
+ return s.encode(charset)
+ return s
+
+
+def assert_imap_protocol(condition: bool, message: Optional[bytes] = None) -> None:
+ if not condition:
+ msg = "Server replied with a response that violates the IMAP protocol"
+ if message:
+ # FIXME(jlvillal): This looks wrong as it repeats `msg` twice
+ msg += "{}: {}".format(
+ msg, message.decode(encoding="ascii", errors="ignore")
+ )
+ raise exceptions.ProtocolError(msg)
+
+
_TupleAtomPart = Union[None, int, bytes]
-_TupleAtom = Tuple[Union[_TupleAtomPart, '_TupleAtom'], ...]
+_TupleAtom = Tuple[Union[_TupleAtomPart, "_TupleAtom"], ...]
+
+
+def chunk(lst: _TupleAtom, size: int) -> Iterator[_TupleAtom]:
+ for i in range(0, len(lst), size):
+ yield lst[i : i + size]
diff --git a/imapclient/version.py b/imapclient/version.py
index 9e7d8dc..c97dfb6 100644
--- a/imapclient/version.py
+++ b/imapclient/version.py
@@ -1,7 +1,24 @@
+# Copyright (c) 2022, Menno Smits
+# Released subject to the New BSD License
+# Please see http://en.wikipedia.org/wiki/BSD_licenses
+
from typing import Tuple
-version_info = 3, 0, 1, 'final'
+
+version_info = (3, 0, 1, "final")
+
+
+def _imapclient_version_string(vinfo: Tuple[int, int, int, str]) -> str:
+ major, minor, micro, releaselevel = vinfo
+ v = "%d.%d.%d" % (major, minor, micro)
+ if releaselevel != "final":
+ v += "-" + releaselevel
+ return v
+
+
version = _imapclient_version_string(version_info)
-maintainer = 'IMAPClient Maintainers'
-maintainer_email = 'imapclient@groups.io'
-author = 'Menno Finlay-Smits'
-author_email = 'inbox@menno.io'
+
+maintainer = "IMAPClient Maintainers"
+maintainer_email = "imapclient@groups.io"
+
+author = "Menno Finlay-Smits"
+author_email = "inbox@menno.io"