Skip to content

back to Claude Sonnet 3.5 - Fill-in summary

Claude Sonnet 3.5 - Fill-in: imapclient

Failed to run pytests for test tests

Pytest collection failure.

Patch diff

diff --git a/imapclient/config.py b/imapclient/config.py
index f098591..e083f43 100644
--- a/imapclient/config.py
+++ b/imapclient/config.py
@@ -14,7 +14,27 @@ def parse_config_file(filename: str) ->argparse.Namespace:

     Used by livetest.py and interact.py
     """
-    pass
+    config = configparser.ConfigParser()
+    config.read(filename)
+
+    if 'DEFAULT' not in config:
+        raise ValueError(f"Config file {filename} must have a DEFAULT section")
+
+    ns = argparse.Namespace()
+    ns.host = config['DEFAULT'].get('host', 'imap.gmail.com')
+    ns.port = config['DEFAULT'].getint('port', 993)
+    ns.ssl = config['DEFAULT'].getboolean('ssl', True)
+    ns.username = config['DEFAULT'].get('username')
+    ns.password = config['DEFAULT'].get('password')
+    ns.oauth2 = config['DEFAULT'].getboolean('oauth2', False)
+    ns.oauth2_client_id = config['DEFAULT'].get('oauth2_client_id')
+    ns.oauth2_client_secret = config['DEFAULT'].get('oauth2_client_secret')
+    ns.oauth2_refresh_token = config['DEFAULT'].get('oauth2_refresh_token')
+
+    if ns.oauth2 and (not ns.oauth2_client_id or not ns.oauth2_client_secret or not ns.oauth2_refresh_token):
+        raise ValueError("oauth2_client_id, oauth2_client_secret, and oauth2_refresh_token must be provided when oauth2 is True")
+
+    return ns


 T = TypeVar('T')
diff --git a/imapclient/datetime_util.py b/imapclient/datetime_util.py
index 57a44c4..468e80e 100644
--- a/imapclient/datetime_util.py
+++ b/imapclient/datetime_util.py
@@ -14,7 +14,19 @@ def parse_to_datetime(timestamp: bytes, normalise: bool=True) ->datetime:
     If normalise is False, then the returned datetime will be
     unadjusted but will contain timezone information as per the input.
     """
-    pass
+    if isinstance(timestamp, bytes):
+        timestamp = timestamp.decode('ascii')
+    
+    time_tuple = parsedate_tz(timestamp)
+    if time_tuple is None:
+        raise ValueError("Invalid timestamp format")
+    
+    tz_offset = time_tuple[-1]
+    dt = datetime(*time_tuple[:6], tzinfo=FixedOffset(tz_offset) if tz_offset else None)
+    
+    if normalise:
+        return dt.astimezone().replace(tzinfo=None)
+    return dt


 def datetime_to_INTERNALDATE(dt: datetime) ->str:
@@ -23,7 +35,10 @@ def datetime_to_INTERNALDATE(dt: datetime) ->str:
     If timezone information is missing the current system
     timezone is used.
     """
-    pass
+    if dt.tzinfo is None:
+        dt = dt.astimezone()
+    
+    return dt.strftime("%d-%b-%Y %H:%M:%S %z").strip()


 _rfc822_dotted_time = re.compile(
@@ -32,4 +47,9 @@ _rfc822_dotted_time = re.compile(

 def format_criteria_date(dt: datetime) ->bytes:
     """Format a date or datetime instance for use in IMAP search criteria."""
-    pass
+    if isinstance(dt, datetime):
+        dt_str = dt.strftime("%d-%b-%Y")
+    else:
+        dt_str = dt.strftime("%d-%b-%Y")
+    
+    return dt_str.encode('ascii')
diff --git a/imapclient/fixed_offset.py b/imapclient/fixed_offset.py
index b9e7df9..33e7b01 100644
--- a/imapclient/fixed_offset.py
+++ b/imapclient/fixed_offset.py
@@ -23,4 +23,8 @@ class FixedOffset(datetime.tzinfo):
         """Return a FixedOffset instance for the current working timezone and
         DST conditions.
         """
-        pass
+        if time.daylight:
+            offset = time.altzone
+        else:
+            offset = time.timezone
+        return cls(-offset // 60)
diff --git a/imapclient/imap_utf7.py b/imapclient/imap_utf7.py
index 7a795b2..c806338 100644
--- a/imapclient/imap_utf7.py
+++ b/imapclient/imap_utf7.py
@@ -1,3 +1,4 @@
+import base64
 import binascii
 from typing import List, Union

@@ -8,7 +9,27 @@ def encode(s: Union[str, bytes]) ->bytes:
     Input is unicode; output is bytes (Python 3) or str (Python 2). If
     non-unicode input is provided, the input is returned unchanged.
     """
-    pass
+    if isinstance(s, bytes):
+        return s
+    if not isinstance(s, str):
+        raise ValueError("Input must be str or bytes")
+    
+    result = bytearray()
+    utf7_buffer = bytearray()
+    
+    for char in s:
+        if ord(char) in range(0x20, 0x7f) and char != '&':
+            if utf7_buffer:
+                result.extend(b'&' + base64.b64encode(utf7_buffer).rstrip(b'=').replace(b'/', b',') + b'-')
+                utf7_buffer = bytearray()
+            result.extend(char.encode('ascii'))
+        else:
+            utf7_buffer.extend(char.encode('utf-16be'))
+    
+    if utf7_buffer:
+        result.extend(b'&' + base64.b64encode(utf7_buffer).rstrip(b'=').replace(b'/', b',') + b'-')
+    
+    return bytes(result)


 AMPERSAND_ORD = ord('&')
@@ -22,4 +43,32 @@ def decode(s: Union[bytes, str]) ->str:
     unicode. If non-bytes/str input is provided, the input is returned
     unchanged.
     """
-    pass
+    if isinstance(s, str):
+        s = s.encode('ascii')
+    if not isinstance(s, bytes):
+        raise ValueError("Input must be str or bytes")
+    
+    result = []
+    utf7_buffer = bytearray()
+    is_utf7 = False
+    
+    for byte in s:
+        if is_utf7:
+            if byte == DASH_ORD:
+                if utf7_buffer:
+                    padded = utf7_buffer + b'=' * ((4 - len(utf7_buffer) % 4) % 4)
+                    decoded = base64.b64decode(padded.replace(b',', b'/'))
+                    result.append(decoded.decode('utf-16be'))
+                    utf7_buffer = bytearray()
+                is_utf7 = False
+            elif byte == AMPERSAND_ORD:
+                utf7_buffer.append(byte)
+                result.append('&')
+            else:
+                utf7_buffer.append(byte)
+        elif byte == AMPERSAND_ORD:
+            is_utf7 = True
+        else:
+            result.append(chr(byte))
+    
+    return ''.join(result)
diff --git a/imapclient/imapclient.py b/imapclient/imapclient.py
index 1b399f1..388dc2d 100644
--- a/imapclient/imapclient.py
+++ b/imapclient/imapclient.py
@@ -239,7 +239,7 @@ class IMAPClient:
            This includes reading from and writing to the socket,
            as they are likely to break internal bookkeeping of messages.
         """
-        pass
+        return self._imap.sock

     @require_capability('STARTTLS')
     def starttls(self, ssl_context=None):
@@ -259,13 +259,37 @@ class IMAPClient:
         Raises :py:exc:`AbortError` if the server does not support STARTTLS
         or an SSL connection is already established.
         """
-        pass
+        if self.ssl:
+            raise self.AbortError('SSL connection already established')
+
+        if ssl_context is None:
+            ssl_context = ssl_lib.create_default_context()
+
+        typ, data = self._imap._simple_command('STARTTLS')
+        if typ != 'OK':
+            raise self.Error('STARTTLS failed: %s' % data[0].decode())
+
+        self._imap.sock = ssl_context.wrap_socket(self._imap.sock,
+                                                  server_hostname=self.host)
+        self._imap.file = self._imap.sock.makefile('rb')
+        self.ssl = True
+        self._starttls_done = True
+
+        # Reissue CAPABILITY command after STARTTLS
+        self._cached_capabilities = None
+        self.capabilities()

     def login(self, username: str, password: str):
         """Login using *username* and *password*, returning the
         server response.
         """
-        pass
+        try:
+            rv = self._imap.login(username, password)
+        except imaplib.IMAP4.error as e:
+            raise self.Error(str(e))
+
+        self._cached_capabilities = None
+        return rv

     def oauth2_login(self, user: str, access_token: str, mech: str=
         'XOAUTH2', vendor: Optional[str]=None):
@@ -274,7 +298,21 @@ class IMAPClient:
         Gmail and Yahoo both support the 'XOAUTH2' mechanism, but Yahoo requires
         the 'vendor' portion in the payload.
         """
-        pass
+        auth_string = 'user=%s\1auth=Bearer %s\1' % (user, access_token)
+        if vendor:
+            auth_string += 'vendor=%s\1' % vendor
+        auth_string += '\1'
+        
+        try:
+            if mech == 'XOAUTH2':
+                rv = self._imap.authenticate('XOAUTH2', lambda x: auth_string)
+            else:
+                rv = self._imap.authenticate('OAUTH2', lambda x: auth_string)
+        except imaplib.IMAP4.error as e:
+            raise self.Error(str(e))
+
+        self._cached_capabilities = None
+        return rv

     def oauthbearer_login(self, identity, access_token):
         """Authenticate using the OAUTHBEARER method.
diff --git a/imapclient/response_parser.py b/imapclient/response_parser.py
index f632411..a3a087a 100644
--- a/imapclient/response_parser.py
+++ b/imapclient/response_parser.py
@@ -22,7 +22,27 @@ def parse_response(data: List[bytes]) ->Tuple[_Atom, ...]:

     Returns nested tuples of appropriately typed objects.
     """
-    pass
+    lexer = TokenSource(data)
+    return tuple(_parse_tokens(lexer))
+
+def _parse_tokens(lexer: TokenSource) ->Iterator[_Atom]:
+    for token in lexer:
+        if token == b'(':
+            yield tuple(_parse_tokens(lexer))
+        elif token == b')':
+            return
+        elif isinstance(token, bytes):
+            yield _convert_token(token, lexer.current_literal)
+        else:
+            raise ProtocolError(f'Unexpected token: {token}')
+
+def _convert_token(token: bytes, literal: Optional[bytes]) ->_Atom:
+    if literal is not None:
+        return literal
+    try:
+        return int(token)
+    except ValueError:
+        return token


 _msg_id_pattern = re.compile('(\\d+(?: +\\d+)*)')
@@ -39,7 +59,22 @@ def parse_message_list(data: List[Union[bytes, str]]) ->SearchIds:
     attribute which contains the MODSEQ response (if returned by the
     server).
     """
-    pass
+    data = [item.decode('ascii') if isinstance(item, bytes) else item for item in data]
+    data = ' '.join(data)
+    
+    modseq = None
+    if 'MODSEQ' in data:
+        modseq_index = data.index('MODSEQ')
+        modseq = int(data[modseq_index + 1])
+        data = data[:modseq_index]
+    
+    ids = [int(num) for num in _msg_id_pattern.findall(data)]
+    search_ids = SearchIds(ids)
+    
+    if modseq:
+        search_ids.modseq = modseq
+    
+    return search_ids


 _ParseFetchResponseInnerDict = Dict[bytes, Optional[Union[datetime.datetime,
@@ -53,4 +88,30 @@ def parse_fetch_response(text: List[bytes], normalise_times: bool=True,
     Returns a dictionary, keyed by message ID. Each value a dictionary
     keyed by FETCH field type (eg."RFC822").
     """
-    pass
+    response = defaultdict(dict)
+    for response_item in parse_response(text):
+        msg_id, fetch_data = response_item
+        msg_id = int(msg_id)
+        
+        for field, value in _parse_fetch_pairs(fetch_data):
+            field = field.upper()
+            
+            if field == b'UID' and uid_is_key:
+                msg_id = value
+            elif field == b'INTERNALDATE' and normalise_times:
+                value = parse_to_datetime(value)
+            elif field in (b'BODY', b'BODY.PEEK'):
+                value = BodyData(value)
+            elif field == b'ENVELOPE':
+                value = Envelope(*value)
+            
+            response[msg_id][field] = value
+    
+    return response
+
+def _parse_fetch_pairs(fetch_data: Tuple[_Atom, ...]) ->Iterator[Tuple[bytes, _Atom]]:
+    for i in range(0, len(fetch_data), 2):
+        field = fetch_data[i]
+        if not isinstance(field, bytes):
+            raise ProtocolError(f'Field name must be bytes: {field}')
+        yield field, fetch_data[i + 1]