back to Claude Sonnet 3.5 - Fill-in summary
Claude Sonnet 3.5 - Fill-in: imapclient
Failed to run pytests for test tests
Pytest collection failure.
Patch diff
diff --git a/imapclient/config.py b/imapclient/config.py
index f098591..e083f43 100644
--- a/imapclient/config.py
+++ b/imapclient/config.py
@@ -14,7 +14,27 @@ def parse_config_file(filename: str) ->argparse.Namespace:
Used by livetest.py and interact.py
"""
- pass
+ config = configparser.ConfigParser()
+ config.read(filename)
+
+ if 'DEFAULT' not in config:
+ raise ValueError(f"Config file {filename} must have a DEFAULT section")
+
+ ns = argparse.Namespace()
+ ns.host = config['DEFAULT'].get('host', 'imap.gmail.com')
+ ns.port = config['DEFAULT'].getint('port', 993)
+ ns.ssl = config['DEFAULT'].getboolean('ssl', True)
+ ns.username = config['DEFAULT'].get('username')
+ ns.password = config['DEFAULT'].get('password')
+ ns.oauth2 = config['DEFAULT'].getboolean('oauth2', False)
+ ns.oauth2_client_id = config['DEFAULT'].get('oauth2_client_id')
+ ns.oauth2_client_secret = config['DEFAULT'].get('oauth2_client_secret')
+ ns.oauth2_refresh_token = config['DEFAULT'].get('oauth2_refresh_token')
+
+ if ns.oauth2 and (not ns.oauth2_client_id or not ns.oauth2_client_secret or not ns.oauth2_refresh_token):
+ raise ValueError("oauth2_client_id, oauth2_client_secret, and oauth2_refresh_token must be provided when oauth2 is True")
+
+ return ns
T = TypeVar('T')
diff --git a/imapclient/datetime_util.py b/imapclient/datetime_util.py
index 57a44c4..468e80e 100644
--- a/imapclient/datetime_util.py
+++ b/imapclient/datetime_util.py
@@ -14,7 +14,19 @@ def parse_to_datetime(timestamp: bytes, normalise: bool=True) ->datetime:
If normalise is False, then the returned datetime will be
unadjusted but will contain timezone information as per the input.
"""
- pass
+ if isinstance(timestamp, bytes):
+ timestamp = timestamp.decode('ascii')
+
+ time_tuple = parsedate_tz(timestamp)
+ if time_tuple is None:
+ raise ValueError("Invalid timestamp format")
+
+ tz_offset = time_tuple[-1]
+ dt = datetime(*time_tuple[:6], tzinfo=FixedOffset(tz_offset) if tz_offset else None)
+
+ if normalise:
+ return dt.astimezone().replace(tzinfo=None)
+ return dt
def datetime_to_INTERNALDATE(dt: datetime) ->str:
@@ -23,7 +35,10 @@ def datetime_to_INTERNALDATE(dt: datetime) ->str:
If timezone information is missing the current system
timezone is used.
"""
- pass
+ if dt.tzinfo is None:
+ dt = dt.astimezone()
+
+ return dt.strftime("%d-%b-%Y %H:%M:%S %z").strip()
_rfc822_dotted_time = re.compile(
@@ -32,4 +47,9 @@ _rfc822_dotted_time = re.compile(
def format_criteria_date(dt: datetime) ->bytes:
"""Format a date or datetime instance for use in IMAP search criteria."""
- pass
+ if isinstance(dt, datetime):
+ dt_str = dt.strftime("%d-%b-%Y")
+ else:
+ dt_str = dt.strftime("%d-%b-%Y")
+
+ return dt_str.encode('ascii')
diff --git a/imapclient/fixed_offset.py b/imapclient/fixed_offset.py
index b9e7df9..33e7b01 100644
--- a/imapclient/fixed_offset.py
+++ b/imapclient/fixed_offset.py
@@ -23,4 +23,8 @@ class FixedOffset(datetime.tzinfo):
"""Return a FixedOffset instance for the current working timezone and
DST conditions.
"""
- pass
+ if time.daylight:
+ offset = time.altzone
+ else:
+ offset = time.timezone
+ return cls(-offset // 60)
diff --git a/imapclient/imap_utf7.py b/imapclient/imap_utf7.py
index 7a795b2..c806338 100644
--- a/imapclient/imap_utf7.py
+++ b/imapclient/imap_utf7.py
@@ -1,3 +1,4 @@
+import base64
import binascii
from typing import List, Union
@@ -8,7 +9,27 @@ def encode(s: Union[str, bytes]) ->bytes:
Input is unicode; output is bytes (Python 3) or str (Python 2). If
non-unicode input is provided, the input is returned unchanged.
"""
- pass
+ if isinstance(s, bytes):
+ return s
+ if not isinstance(s, str):
+ raise ValueError("Input must be str or bytes")
+
+ result = bytearray()
+ utf7_buffer = bytearray()
+
+ for char in s:
+ if ord(char) in range(0x20, 0x7f) and char != '&':
+ if utf7_buffer:
+ result.extend(b'&' + base64.b64encode(utf7_buffer).rstrip(b'=').replace(b'/', b',') + b'-')
+ utf7_buffer = bytearray()
+ result.extend(char.encode('ascii'))
+ else:
+ utf7_buffer.extend(char.encode('utf-16be'))
+
+ if utf7_buffer:
+ result.extend(b'&' + base64.b64encode(utf7_buffer).rstrip(b'=').replace(b'/', b',') + b'-')
+
+ return bytes(result)
AMPERSAND_ORD = ord('&')
@@ -22,4 +43,32 @@ def decode(s: Union[bytes, str]) ->str:
unicode. If non-bytes/str input is provided, the input is returned
unchanged.
"""
- pass
+ if isinstance(s, str):
+ s = s.encode('ascii')
+ if not isinstance(s, bytes):
+ raise ValueError("Input must be str or bytes")
+
+ result = []
+ utf7_buffer = bytearray()
+ is_utf7 = False
+
+ for byte in s:
+ if is_utf7:
+ if byte == DASH_ORD:
+ if utf7_buffer:
+ padded = utf7_buffer + b'=' * ((4 - len(utf7_buffer) % 4) % 4)
+ decoded = base64.b64decode(padded.replace(b',', b'/'))
+ result.append(decoded.decode('utf-16be'))
+ utf7_buffer = bytearray()
+ is_utf7 = False
+ elif byte == AMPERSAND_ORD:
+ utf7_buffer.append(byte)
+ result.append('&')
+ else:
+ utf7_buffer.append(byte)
+ elif byte == AMPERSAND_ORD:
+ is_utf7 = True
+ else:
+ result.append(chr(byte))
+
+ return ''.join(result)
diff --git a/imapclient/imapclient.py b/imapclient/imapclient.py
index 1b399f1..388dc2d 100644
--- a/imapclient/imapclient.py
+++ b/imapclient/imapclient.py
@@ -239,7 +239,7 @@ class IMAPClient:
This includes reading from and writing to the socket,
as they are likely to break internal bookkeeping of messages.
"""
- pass
+ return self._imap.sock
@require_capability('STARTTLS')
def starttls(self, ssl_context=None):
@@ -259,13 +259,37 @@ class IMAPClient:
Raises :py:exc:`AbortError` if the server does not support STARTTLS
or an SSL connection is already established.
"""
- pass
+ if self.ssl:
+ raise self.AbortError('SSL connection already established')
+
+ if ssl_context is None:
+ ssl_context = ssl_lib.create_default_context()
+
+ typ, data = self._imap._simple_command('STARTTLS')
+ if typ != 'OK':
+ raise self.Error('STARTTLS failed: %s' % data[0].decode())
+
+ self._imap.sock = ssl_context.wrap_socket(self._imap.sock,
+ server_hostname=self.host)
+ self._imap.file = self._imap.sock.makefile('rb')
+ self.ssl = True
+ self._starttls_done = True
+
+ # Reissue CAPABILITY command after STARTTLS
+ self._cached_capabilities = None
+ self.capabilities()
def login(self, username: str, password: str):
"""Login using *username* and *password*, returning the
server response.
"""
- pass
+ try:
+ rv = self._imap.login(username, password)
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ self._cached_capabilities = None
+ return rv
def oauth2_login(self, user: str, access_token: str, mech: str=
'XOAUTH2', vendor: Optional[str]=None):
@@ -274,7 +298,21 @@ class IMAPClient:
Gmail and Yahoo both support the 'XOAUTH2' mechanism, but Yahoo requires
the 'vendor' portion in the payload.
"""
- pass
+ auth_string = 'user=%s\1auth=Bearer %s\1' % (user, access_token)
+ if vendor:
+ auth_string += 'vendor=%s\1' % vendor
+ auth_string += '\1'
+
+ try:
+ if mech == 'XOAUTH2':
+ rv = self._imap.authenticate('XOAUTH2', lambda x: auth_string)
+ else:
+ rv = self._imap.authenticate('OAUTH2', lambda x: auth_string)
+ except imaplib.IMAP4.error as e:
+ raise self.Error(str(e))
+
+ self._cached_capabilities = None
+ return rv
def oauthbearer_login(self, identity, access_token):
"""Authenticate using the OAUTHBEARER method.
diff --git a/imapclient/response_parser.py b/imapclient/response_parser.py
index f632411..a3a087a 100644
--- a/imapclient/response_parser.py
+++ b/imapclient/response_parser.py
@@ -22,7 +22,27 @@ def parse_response(data: List[bytes]) ->Tuple[_Atom, ...]:
Returns nested tuples of appropriately typed objects.
"""
- pass
+ lexer = TokenSource(data)
+ return tuple(_parse_tokens(lexer))
+
+def _parse_tokens(lexer: TokenSource) ->Iterator[_Atom]:
+ for token in lexer:
+ if token == b'(':
+ yield tuple(_parse_tokens(lexer))
+ elif token == b')':
+ return
+ elif isinstance(token, bytes):
+ yield _convert_token(token, lexer.current_literal)
+ else:
+ raise ProtocolError(f'Unexpected token: {token}')
+
+def _convert_token(token: bytes, literal: Optional[bytes]) ->_Atom:
+ if literal is not None:
+ return literal
+ try:
+ return int(token)
+ except ValueError:
+ return token
_msg_id_pattern = re.compile('(\\d+(?: +\\d+)*)')
@@ -39,7 +59,22 @@ def parse_message_list(data: List[Union[bytes, str]]) ->SearchIds:
attribute which contains the MODSEQ response (if returned by the
server).
"""
- pass
+ data = [item.decode('ascii') if isinstance(item, bytes) else item for item in data]
+ data = ' '.join(data)
+
+ modseq = None
+ if 'MODSEQ' in data:
+ modseq_index = data.index('MODSEQ')
+ modseq = int(data[modseq_index + 1])
+ data = data[:modseq_index]
+
+ ids = [int(num) for num in _msg_id_pattern.findall(data)]
+ search_ids = SearchIds(ids)
+
+ if modseq:
+ search_ids.modseq = modseq
+
+ return search_ids
_ParseFetchResponseInnerDict = Dict[bytes, Optional[Union[datetime.datetime,
@@ -53,4 +88,30 @@ def parse_fetch_response(text: List[bytes], normalise_times: bool=True,
Returns a dictionary, keyed by message ID. Each value a dictionary
keyed by FETCH field type (eg."RFC822").
"""
- pass
+ response = defaultdict(dict)
+ for response_item in parse_response(text):
+ msg_id, fetch_data = response_item
+ msg_id = int(msg_id)
+
+ for field, value in _parse_fetch_pairs(fetch_data):
+ field = field.upper()
+
+ if field == b'UID' and uid_is_key:
+ msg_id = value
+ elif field == b'INTERNALDATE' and normalise_times:
+ value = parse_to_datetime(value)
+ elif field in (b'BODY', b'BODY.PEEK'):
+ value = BodyData(value)
+ elif field == b'ENVELOPE':
+ value = Envelope(*value)
+
+ response[msg_id][field] = value
+
+ return response
+
+def _parse_fetch_pairs(fetch_data: Tuple[_Atom, ...]) ->Iterator[Tuple[bytes, _Atom]]:
+ for i in range(0, len(fetch_data), 2):
+ field = fetch_data[i]
+ if not isinstance(field, bytes):
+ raise ProtocolError(f'Field name must be bytes: {field}')
+ yield field, fetch_data[i + 1]