back to Claude Sonnet 3.5 - Base summary
Claude Sonnet 3.5 - Base: imapclient
Failed to run pytests for test tests
Pytest collection failure.
Patch diff
diff --git a/imapclient/config.py b/imapclient/config.py
index f098591..43fc51c 100644
--- a/imapclient/config.py
+++ b/imapclient/config.py
@@ -14,7 +14,25 @@ def parse_config_file(filename: str) ->argparse.Namespace:
Used by livetest.py and interact.py
"""
- pass
+ config = configparser.ConfigParser()
+ config.read(filename)
+
+ if 'DEFAULT' not in config:
+ raise ValueError(f"Config file {filename} must have a DEFAULT section")
+
+ ns = argparse.Namespace()
+ for key, value in config['DEFAULT'].items():
+ setattr(ns, key, value)
+
+ # Convert certain values to appropriate types
+ if hasattr(ns, 'port'):
+ ns.port = int(ns.port)
+ if hasattr(ns, 'ssl'):
+ ns.ssl = config['DEFAULT'].getboolean('ssl')
+ if hasattr(ns, 'timeout'):
+ ns.timeout = float(ns.timeout)
+
+ return ns
T = TypeVar('T')
diff --git a/imapclient/datetime_util.py b/imapclient/datetime_util.py
index 57a44c4..6533952 100644
--- a/imapclient/datetime_util.py
+++ b/imapclient/datetime_util.py
@@ -14,7 +14,19 @@ def parse_to_datetime(timestamp: bytes, normalise: bool=True) ->datetime:
If normalise is False, then the returned datetime will be
unadjusted but will contain timezone information as per the input.
"""
- pass
+ if isinstance(timestamp, bytes):
+ timestamp = timestamp.decode('ascii')
+
+ tt = parsedate_tz(timestamp)
+ if tt is None:
+ raise ValueError("Could not parse datetime string: %r" % timestamp)
+
+ tz = tt[-1]
+ dt = datetime(*tt[:6], tzinfo=FixedOffset(tz) if tz else None)
+
+ if normalise:
+ return dt.astimezone().replace(tzinfo=None)
+ return dt
def datetime_to_INTERNALDATE(dt: datetime) ->str:
@@ -23,7 +35,10 @@ def datetime_to_INTERNALDATE(dt: datetime) ->str:
If timezone information is missing the current system
timezone is used.
"""
- pass
+ if dt.tzinfo is None:
+ dt = dt.astimezone()
+
+ return dt.strftime("%d-%b-%Y %H:%M:%S %z")
_rfc822_dotted_time = re.compile(
@@ -32,4 +47,6 @@ _rfc822_dotted_time = re.compile(
def format_criteria_date(dt: datetime) ->bytes:
"""Format a date or datetime instance for use in IMAP search criteria."""
- pass
+ if isinstance(dt, datetime):
+ dt = dt.date()
+ return dt.strftime("%d-%b-%Y").encode('ascii')
diff --git a/imapclient/fixed_offset.py b/imapclient/fixed_offset.py
index b9e7df9..33e7b01 100644
--- a/imapclient/fixed_offset.py
+++ b/imapclient/fixed_offset.py
@@ -23,4 +23,8 @@ class FixedOffset(datetime.tzinfo):
"""Return a FixedOffset instance for the current working timezone and
DST conditions.
"""
- pass
+ if time.daylight:
+ offset = time.altzone
+ else:
+ offset = time.timezone
+ return cls(-offset // 60)
diff --git a/imapclient/imap_utf7.py b/imapclient/imap_utf7.py
index 7a795b2..a2876fc 100644
--- a/imapclient/imap_utf7.py
+++ b/imapclient/imap_utf7.py
@@ -1,4 +1,5 @@
import binascii
+import base64
from typing import List, Union
@@ -8,7 +9,27 @@ def encode(s: Union[str, bytes]) ->bytes:
Input is unicode; output is bytes (Python 3) or str (Python 2). If
non-unicode input is provided, the input is returned unchanged.
"""
- pass
+ if isinstance(s, bytes):
+ return s
+ if not isinstance(s, str):
+ raise ValueError("Input must be str or bytes")
+
+ result = bytearray()
+ utf7_buffer = bytearray()
+
+ for char in s:
+ if ord(char) in range(0x20, 0x7f) and char != '&':
+ if utf7_buffer:
+ result.extend(b'&' + base64.b64encode(utf7_buffer).rstrip(b'=').replace(b'/', b',') + b'-')
+ utf7_buffer = bytearray()
+ result.extend(char.encode('ascii'))
+ else:
+ utf7_buffer.extend(char.encode('utf-16be'))
+
+ if utf7_buffer:
+ result.extend(b'&' + base64.b64encode(utf7_buffer).rstrip(b'=').replace(b'/', b',') + b'-')
+
+ return bytes(result)
AMPERSAND_ORD = ord('&')
@@ -22,4 +43,33 @@ def decode(s: Union[bytes, str]) ->str:
unicode. If non-bytes/str input is provided, the input is returned
unchanged.
"""
- pass
+ if isinstance(s, str):
+ s = s.encode('ascii')
+ if not isinstance(s, bytes):
+ raise ValueError("Input must be str or bytes")
+
+ result = []
+ utf7_buffer = bytearray()
+ in_utf7 = False
+
+ for byte in s:
+ if in_utf7:
+ if byte == DASH_ORD:
+ if utf7_buffer:
+ utf16_bytes = base64.b64decode(utf7_buffer.replace(b',', b'/') + b'===')
+ result.append(utf16_bytes.decode('utf-16be'))
+ in_utf7 = False
+ utf7_buffer = bytearray()
+ elif byte in (AMPERSAND_ORD, DASH_ORD):
+ utf7_buffer.append(byte)
+ else:
+ utf7_buffer.append(byte)
+ elif byte == AMPERSAND_ORD:
+ in_utf7 = True
+ else:
+ result.append(chr(byte))
+
+ if in_utf7:
+ raise ValueError("Invalid IMAP UTF-7 encoding")
+
+ return ''.join(result)
diff --git a/imapclient/imapclient.py b/imapclient/imapclient.py
index 1b399f1..c2fd28c 100644
--- a/imapclient/imapclient.py
+++ b/imapclient/imapclient.py
@@ -239,7 +239,7 @@ class IMAPClient:
This includes reading from and writing to the socket,
as they are likely to break internal bookkeeping of messages.
"""
- pass
+ return self._imap.sock
@require_capability('STARTTLS')
def starttls(self, ssl_context=None):
@@ -259,13 +259,34 @@ class IMAPClient:
Raises :py:exc:`AbortError` if the server does not support STARTTLS
or an SSL connection is already established.
"""
- pass
+ if self._starttls_done:
+ raise self.AbortError('STARTTLS has already been called')
+
+ if ssl_context is None:
+ ssl_context = ssl_lib.create_default_context()
+
+ typ, data = self._imap._simple_command('STARTTLS')
+ self._checkok('starttls', typ, data)
+
+ self._imap.sock = ssl_context.wrap_socket(self._imap.sock,
+ server_hostname=self.host)
+ self._imap.file = self._imap.sock.makefile('rb')
+ self._starttls_done = True
+
+ # Reissue CAPABILITY command after STARTTLS
+ self._cached_capabilities = None
+ self.capabilities()
def login(self, username: str, password: str):
"""Login using *username* and *password*, returning the
server response.
"""
- pass
+ try:
+ typ, data = self._imap.login(username, password)
+ self._checkok('login', typ, data)
+ return data[0].decode()
+ except imaplib.IMAP4.error as e:
+ raise self.Error(f'Login failed: {str(e)}')
def oauth2_login(self, user: str, access_token: str, mech: str=
'XOAUTH2', vendor: Optional[str]=None):
@@ -274,7 +295,17 @@ class IMAPClient:
Gmail and Yahoo both support the 'XOAUTH2' mechanism, but Yahoo requires
the 'vendor' portion in the payload.
"""
- pass
+ auth_string = f'user={user}\1auth=Bearer {access_token}\1'
+ if vendor:
+ auth_string += f'\1vendor={vendor}'
+ auth_string += '\1\1'
+
+ try:
+ typ, data = self._imap.authenticate(mech, lambda x: auth_string)
+ self._checkok('oauth2_login', typ, data)
+ return data[0].decode()
+ except imaplib.IMAP4.error as e:
+ raise self.Error(f'OAuth2 login failed: {str(e)}')
def oauthbearer_login(self, identity, access_token):
"""Authenticate using the OAUTHBEARER method.
diff --git a/imapclient/response_parser.py b/imapclient/response_parser.py
index f632411..e8b489d 100644
--- a/imapclient/response_parser.py
+++ b/imapclient/response_parser.py
@@ -22,7 +22,19 @@ def parse_response(data: List[bytes]) ->Tuple[_Atom, ...]:
Returns nested tuples of appropriately typed objects.
"""
- pass
+ lexer = TokenSource(data)
+ return tuple(_parse_tokens(lexer))
+
+def _parse_tokens(lexer: TokenSource) ->Iterator[_Atom]:
+ for token in lexer:
+ if token == b'(':
+ yield tuple(_parse_tokens(lexer))
+ elif token == b')':
+ return
+ elif isinstance(token, bytes):
+ yield token.decode('ascii')
+ else:
+ yield token
_msg_id_pattern = re.compile('(\\d+(?: +\\d+)*)')
@@ -39,7 +51,17 @@ def parse_message_list(data: List[Union[bytes, str]]) ->SearchIds:
attribute which contains the MODSEQ response (if returned by the
server).
"""
- pass
+ data = [item.decode('ascii') if isinstance(item, bytes) else item for item in data]
+ data = ' '.join(data)
+
+ modseq = None
+ if 'MODSEQ' in data:
+ modseq_index = data.index('MODSEQ')
+ modseq = int(data[modseq_index + 1])
+ data = data[:modseq_index]
+
+ ids = [int(num) for num in _msg_id_pattern.findall(data)]
+ return SearchIds(ids, modseq)
_ParseFetchResponseInnerDict = Dict[bytes, Optional[Union[datetime.datetime,
@@ -53,4 +75,62 @@ def parse_fetch_response(text: List[bytes], normalise_times: bool=True,
Returns a dictionary, keyed by message ID. Each value a dictionary
keyed by FETCH field type (eg."RFC822").
"""
- pass
+ response = defaultdict(dict)
+ lexer = TokenSource(text)
+
+ while True:
+ try:
+ msg_id = int(next(lexer))
+ except StopIteration:
+ break
+
+ if next(lexer) != b'(':
+ raise ProtocolError('Expected "(" in FETCH response')
+
+ for key, value in _parse_fetch_pairs(lexer, normalise_times):
+ if uid_is_key and key == b'UID':
+ msg_id = value
+ else:
+ response[msg_id][key] = value
+
+ if next(lexer) != b')':
+ raise ProtocolError('Expected ")" in FETCH response')
+
+ return response
+
+def _parse_fetch_pairs(lexer: TokenSource, normalise_times: bool) ->Iterator[Tuple[bytes, Union[datetime.datetime, int, BodyData, Envelope, _Atom]]]:
+ while True:
+ try:
+ key = next(lexer)
+ except StopIteration:
+ return
+
+ if key == b')':
+ lexer.push(key)
+ return
+
+ value = _parse_fetch_value(lexer, key, normalise_times)
+ yield key, value
+
+def _parse_fetch_value(lexer: TokenSource, key: bytes, normalise_times: bool) ->Union[datetime.datetime, int, BodyData, Envelope, _Atom]:
+ if key in (b'INTERNALDATE', b'ENVELOPE'):
+ value = next(lexer)
+ if key == b'INTERNALDATE' and normalise_times:
+ return parse_to_datetime(value.decode('ascii'))
+ elif key == b'ENVELOPE':
+ return Envelope(*parse_response(value))
+ elif key == b'BODY' and next(lexer) == b'[':
+ section = b''
+ while True:
+ token = next(lexer)
+ if token == b']':
+ break
+ section += token
+ next(lexer) # Consume the space
+ value = next(lexer)
+ return BodyData(section, value)
+ else:
+ value = next(lexer)
+ if isinstance(value, int):
+ return value
+ return value.decode('ascii')