back to Claude Sonnet 3.5 - Fill-in summary
Claude Sonnet 3.5 - Fill-in: pypdf
Failed to run pytests for test tests
ImportError while loading conftest '/testbed/tests/conftest.py'.
tests/__init__.py:10: in <module>
from pypdf.generic import DictionaryObject, IndirectObject
pypdf/__init__.py:10: in <module>
from ._crypt_providers import crypt_provider
pypdf/_crypt_providers/__init__.py:62: in <module>
from pypdf._crypt_providers._fallback import ( # type: ignore
E ImportError: cannot import name 'aes_cbc_decrypt' from 'pypdf._crypt_providers._fallback' (/testbed/pypdf/_crypt_providers/_fallback.py)
Patch diff
diff --git a/pypdf/_cmap.py b/pypdf/_cmap.py
index 77c2f84..703c77e 100644
--- a/pypdf/_cmap.py
+++ b/pypdf/_cmap.py
@@ -21,7 +21,26 @@ def build_char_map(font_name: str, space_width: float, obj: DictionaryObject
Font sub-type, space_width criteria (50% of width), encoding, map character-map, font-dictionary.
The font-dictionary itself is suitable for the curious.
"""
- pass
+ if "/Resources" not in obj:
+ return unknown_char_map + (NullObject(),)
+
+ resources = obj["/Resources"]
+ if "/Font" not in resources:
+ return unknown_char_map + (NullObject(),)
+
+ fonts = resources["/Font"]
+ if font_name not in fonts:
+ return unknown_char_map + (NullObject(),)
+
+ font = fonts[font_name]
+ if isinstance(font, IndirectObject):
+ font = font.get_object()
+
+ if not isinstance(font, DictionaryObject):
+ return unknown_char_map + (NullObject(),)
+
+ subtype, space_width, encoding, char_map = build_char_map_from_dict(space_width, font)
+ return subtype, space_width, encoding, char_map, font
def build_char_map_from_dict(space_width: float, ft: DictionaryObject) ->Tuple[
@@ -38,7 +57,35 @@ def build_char_map_from_dict(space_width: float, ft: DictionaryObject) ->Tuple[
Font sub-type, space_width criteria(50% of width), encoding, map character-map.
The font-dictionary itself is suitable for the curious.
"""
- pass
+ subtype = ft.get("/Subtype", "")
+ encoding = ft.get("/Encoding", "")
+ char_map = {}
+
+ if "/ToUnicode" in ft:
+ try:
+ char_map = _build_to_unicode_map(ft["/ToUnicode"])
+ except Exception as e:
+ logger_warning(f"Error building ToUnicode map: {e}")
+
+ if not char_map and isinstance(encoding, DictionaryObject):
+ char_map = _build_encoding_map(encoding)
+
+ if not char_map and isinstance(encoding, str):
+ if encoding in charset_encoding:
+ char_map = dict(zip(range(256), charset_encoding[encoding]))
+ elif encoding in _predefined_cmap:
+ char_map = _predefined_cmap[encoding]
+
+ if "/FirstChar" in ft and "/LastChar" in ft and "/Widths" in ft:
+ first_char = ft["/FirstChar"]
+ last_char = ft["/LastChar"]
+ widths = ft["/Widths"]
+ if 32 in range(first_char, last_char + 1):
+ space_width = widths[32 - first_char]
+
+ space_width_criteria = space_width * 0.5
+
+ return subtype, space_width_criteria, encoding, char_map
unknown_char_map: Tuple[str, float, Union[str, Dict[int, str]], Dict[Any, Any]
diff --git a/pypdf/_crypt_providers/_base.py b/pypdf/_crypt_providers/_base.py
index 916f3fd..2136f91 100644
--- a/pypdf/_crypt_providers/_base.py
+++ b/pypdf/_crypt_providers/_base.py
@@ -1,6 +1,17 @@
class CryptBase:
- pass
+ def __init__(self):
+ pass
+
+ def encrypt(self, data):
+ raise NotImplementedError("Subclasses must implement the encrypt method")
+
+ def decrypt(self, data):
+ raise NotImplementedError("Subclasses must implement the decrypt method")
class CryptIdentity(CryptBase):
- pass
+ def encrypt(self, data):
+ return data
+
+ def decrypt(self, data):
+ return data
diff --git a/pypdf/_crypt_providers/_fallback.py b/pypdf/_crypt_providers/_fallback.py
index f69eb28..566f0e7 100644
--- a/pypdf/_crypt_providers/_fallback.py
+++ b/pypdf/_crypt_providers/_fallback.py
@@ -12,9 +12,44 @@ class CryptRC4(CryptBase):
for i in range(256):
j = (j + self.s[i] + key[i % len(key)]) % 256
self.s[i], self.s[j] = self.s[j], self.s[i]
+ self.i = 0
+ self.j = 0
+
+ def encrypt(self, data: bytes) ->bytes:
+ return self.decrypt(data)
+
+ def decrypt(self, data: bytes) ->bytes:
+ out = bytearray()
+ for byte in data:
+ self.i = (self.i + 1) % 256
+ self.j = (self.j + self.s[self.i]) % 256
+ self.s[self.i], self.s[self.j] = self.s[self.j], self.s[self.i]
+ out.append(byte ^ self.s[(self.s[self.i] + self.s[self.j]) % 256])
+ return bytes(out)
class CryptAES(CryptBase):
def __init__(self, key: bytes) ->None:
- pass
+ try:
+ from Crypto.Cipher import AES
+ from Crypto.Util.Padding import pad, unpad
+ except ImportError:
+ raise DependencyError(_DEPENDENCY_ERROR_STR)
+
+ self.key = key
+ self.aes = AES
+ self.pad = pad
+ self.unpad = unpad
+
+ def encrypt(self, data: bytes) ->bytes:
+ cipher = self.aes.new(self.key, self.aes.MODE_CBC)
+ padded_data = self.pad(data, self.aes.block_size)
+ return cipher.iv + cipher.encrypt(padded_data)
+
+ def decrypt(self, data: bytes) ->bytes:
+ iv = data[:16]
+ encrypted_data = data[16:]
+ cipher = self.aes.new(self.key, self.aes.MODE_CBC, iv)
+ padded_data = cipher.decrypt(encrypted_data)
+ return self.unpad(padded_data, self.aes.block_size)
diff --git a/pypdf/_doc_common.py b/pypdf/_doc_common.py
index eaec7b5..5a7246f 100644
--- a/pypdf/_doc_common.py
+++ b/pypdf/_doc_common.py
@@ -47,12 +47,14 @@ class DocumentInformation(DictionaryObject):
Returns a ``TextStringObject`` or ``None`` if the title is not
specified.
"""
- pass
+ if DI.TITLE in self:
+ return self[DI.TITLE]
+ return None
@property
def title_raw(self) ->Optional[str]:
"""The "raw" version of title; can return a ``ByteStringObject``."""
- pass
+ return self.get(DI.TITLE)
@property
def author(self) ->Optional[str]:
@@ -62,12 +64,14 @@ class DocumentInformation(DictionaryObject):
Returns a ``TextStringObject`` or ``None`` if the author is not
specified.
"""
- pass
+ if DI.AUTHOR in self:
+ return self[DI.AUTHOR]
+ return None
@property
def author_raw(self) ->Optional[str]:
"""The "raw" version of author; can return a ``ByteStringObject``."""
- pass
+ return self.get(DI.AUTHOR)
@property
def subject(self) ->Optional[str]:
@@ -77,12 +81,14 @@ class DocumentInformation(DictionaryObject):
Returns a ``TextStringObject`` or ``None`` if the subject is not
specified.
"""
- pass
+ if DI.SUBJECT in self:
+ return self[DI.SUBJECT]
+ return None
@property
def subject_raw(self) ->Optional[str]:
"""The "raw" version of subject; can return a ``ByteStringObject``."""
- pass
+ return self.get(DI.SUBJECT)
@property
def creator(self) ->Optional[str]:
@@ -94,12 +100,14 @@ class DocumentInformation(DictionaryObject):
document from which it was converted. Returns a ``TextStringObject`` or
``None`` if the creator is not specified.
"""
- pass
+ if DI.CREATOR in self:
+ return self[DI.CREATOR]
+ return None
@property
def creator_raw(self) ->Optional[str]:
"""The "raw" version of creator; can return a ``ByteStringObject``."""
- pass
+ return self.get(DI.CREATOR)
@property
def producer(self) ->Optional[str]:
@@ -111,17 +119,21 @@ class DocumentInformation(DictionaryObject):
PDF. Returns a ``TextStringObject`` or ``None`` if the producer is not
specified.
"""
- pass
+ if DI.PRODUCER in self:
+ return self[DI.PRODUCER]
+ return None
@property
def producer_raw(self) ->Optional[str]:
"""The "raw" version of producer; can return a ``ByteStringObject``."""
- pass
+ return self.get(DI.PRODUCER)
@property
def creation_date(self) ->Optional[datetime]:
"""Read-only property accessing the document's creation date."""
- pass
+ if DI.CREATION_DATE in self:
+ return parse_iso8824_date(self[DI.CREATION_DATE])
+ return None
@property
def creation_date_raw(self) ->Optional[str]:
@@ -131,7 +143,7 @@ class DocumentInformation(DictionaryObject):
Typically in the format ``D:YYYYMMDDhhmmss[+Z-]hh'mm`` where the suffix
is the offset from UTC.
"""
- pass
+ return self.get(DI.CREATION_DATE)
@property
def modification_date(self) ->Optional[datetime]:
@@ -140,7 +152,9 @@ class DocumentInformation(DictionaryObject):
The date and time the document was most recently modified.
"""
- pass
+ if DI.MOD_DATE in self:
+ return parse_iso8824_date(self[DI.MOD_DATE])
+ return None
@property
def modification_date_raw(self) ->Optional[str]:
@@ -151,7 +165,7 @@ class DocumentInformation(DictionaryObject):
Typically in the format ``D:YYYYMMDDhhmmss[+Z-]hh'mm`` where the suffix
is the offset from UTC.
"""
- pass
+ return self.get(DI.MOD_DATE)
class PdfDocCommon:
diff --git a/pypdf/_encryption.py b/pypdf/_encryption.py
index 515e35b..4bb9492 100644
--- a/pypdf/_encryption.py
+++ b/pypdf/_encryption.py
@@ -147,7 +147,20 @@ class AlgV4:
Returns:
The RC4 encrypted
"""
- pass
+ # Step e: Pad or truncate the user password
+ padded_user_password = user_password[:32] + _PADDING[:max(0, 32 - len(user_password))]
+
+ # Step f: Encrypt the padded user password using RC4
+ o_value = rc4_encrypt(rc4_key, padded_user_password)
+
+ # Step g: Additional encryption for rev 3 or greater
+ if rev >= 3:
+ for i in range(1, 20):
+ new_key = bytes(b ^ i for b in rc4_key)
+ o_value = rc4_encrypt(new_key, o_value)
+
+ # Step h: The final o_value is the result
+ return o_value
@staticmethod
def compute_U_value(key: bytes, rev: int, id1_entry: bytes) ->bytes:
diff --git a/pypdf/_merger.py b/pypdf/_merger.py
index db1073f..9e3e5fc 100644
--- a/pypdf/_merger.py
+++ b/pypdf/_merger.py
@@ -81,7 +81,31 @@ class PdfMerger:
outline (collection of outline items, previously referred to as
'bookmarks') from being imported by specifying this as ``False``.
"""
- pass
+ if isinstance(fileobj, PdfReader):
+ reader = fileobj
+ else:
+ reader = PdfReader(fileobj)
+
+ if pages is None:
+ pages = range(len(reader.pages))
+ elif isinstance(pages, tuple):
+ pages = range(*pages)
+ elif isinstance(pages, int):
+ pages = [pages]
+
+ for i, page in enumerate(pages):
+ self.pages.insert(page_number + i, _MergedPage(reader.pages[page], reader, self.id_count))
+ self.id_count += 1
+
+ if outline_item is not None:
+ self.add_outline_item(outline_item, page_number)
+
+ if import_outline:
+ outline = reader.outline
+ if outline:
+ self.outline.extend(self._trim_outline(reader, outline, pages))
+
+ self.inputs.append((fileobj, reader))
def append(self, fileobj: Union[StrByteType, PdfReader, Path],
outline_item: Optional[str]=None, pages: Union[None, PageRange,
@@ -108,7 +132,7 @@ class PdfMerger:
outline (collection of outline items, previously referred to as
'bookmarks') from being imported by specifying this as ``False``.
"""
- pass
+ self.merge(len(self.pages), fileobj, outline_item, pages, import_outline)
def write(self, fileobj: Union[Path, StrByteType]) ->None:
"""
@@ -118,11 +142,31 @@ class PdfMerger:
fileobj: Output file. Can be a filename or any kind of
file-like object.
"""
- pass
+ if self.output is None:
+ raise ValueError(ERR_CLOSED_WRITER)
+
+ for page in self.pages:
+ self.output.add_page(page.pagedata)
+
+ if self.outline:
+ self.output.add_outline_item_dict(self.outline)
+
+ for dest in self.named_dests:
+ self.output.add_named_destination_object(dest)
+
+ self.output.write(fileobj)
def close(self) ->None:
"""Shut all file descriptors (input and output) and clear all memory usage."""
- pass
+ if self.output:
+ self.output.close()
+ for _, reader in self.inputs:
+ reader.stream.close()
+ self.inputs.clear()
+ self.pages.clear()
+ self.output = None
+ self.outline.clear()
+ self.named_dests.clear()
def add_metadata(self, infos: Dict[str, Any]) ->None:
"""
@@ -133,7 +177,9 @@ class PdfMerger:
and each value is your new metadata.
An example is ``{'/Title': 'My title'}``
"""
- pass
+ if self.output is None:
+ raise ValueError(ERR_CLOSED_WRITER)
+ self.output.add_metadata(infos)
def set_page_layout(self, layout: LayoutType) ->None:
"""
@@ -160,7 +206,9 @@ class PdfMerger:
* - /TwoPageRight
- Show two pages at a time, odd-numbered pages on the right
"""
- pass
+ if self.output is None:
+ raise ValueError(ERR_CLOSED_WRITER)
+ self.output.set_page_layout(layout)
def set_page_mode(self, mode: PagemodeType) ->None:
"""
@@ -185,7 +233,9 @@ class PdfMerger:
* - /UseAttachments
- Show attachments panel
"""
- pass
+ if self.output is None:
+ raise ValueError(ERR_CLOSED_WRITER)
+ self.output.set_page_mode(mode)
@property
def page_mode(self) ->Optional[PagemodeType]:
@@ -224,7 +274,13 @@ class PdfMerger:
dests:
pages:
"""
- pass
+ pages_set = set(pages)
+ trimmed_dests = []
+ for k, v in dests.items():
+ if isinstance(v, Dict):
+ if '/Page' in v and pdf.get_page_number(v['/Page']) in pages_set:
+ trimmed_dests.append({k: v})
+ return trimmed_dests
def _trim_outline(self, pdf: PdfReader, outline: OutlineType, pages:
Union[Tuple[int, int], Tuple[int, int, int], List[int]]) ->OutlineType:
@@ -239,7 +295,20 @@ class PdfMerger:
Returns:
An outline type
"""
- pass
+ pages_set = set(pages)
+ new_outline = []
+ for entry in outline:
+ if isinstance(entry, list):
+ sub_outline = self._trim_outline(pdf, entry, pages)
+ if sub_outline:
+ new_outline.append(sub_outline)
+ elif isinstance(entry, dict):
+ if '/Page' in entry:
+ if pdf.get_page_number(entry['/Page']) in pages_set:
+ new_outline.append(entry)
+ else:
+ new_outline.append(entry)
+ return new_outline
def add_outline_item(self, title: str, page_number: int, parent: Union[
None, TreeObject, IndirectObject]=None, color: Optional[Tuple[float,
@@ -259,7 +328,18 @@ class PdfMerger:
italic: Outline item font is italic
fit: The fit of the destination page.
"""
- pass
+ if self.output is None:
+ raise ValueError(ERR_CLOSED_WRITER)
+
+ page = self.pages[page_number].pagedata
+ outline_item = OutlineItem(title, page, parent, color, bold, italic, fit)
+
+ if parent is None:
+ self.outline.append(outline_item)
+ else:
+ parent.children.append(outline_item)
+
+ return self.output.add_object(outline_item)
def add_named_destination(self, title: str, page_number: int) ->None:
"""
@@ -269,4 +349,9 @@ class PdfMerger:
title: Title to use
page_number: Page number this destination points at.
"""
- pass
+ if self.output is None:
+ raise ValueError(ERR_CLOSED_WRITER)
+
+ page = self.pages[page_number].pagedata
+ dest = Destination(TextStringObject(title), page)
+ self.named_dests.append(dest)
diff --git a/pypdf/_page.py b/pypdf/_page.py
index 9d11d1c..4c835d4 100644
--- a/pypdf/_page.py
+++ b/pypdf/_page.py
@@ -58,7 +58,8 @@ class Transformation:
((a, b, 0), (c, d, 0), (e, f, 1))
"""
- pass
+ a, b, c, d, e, f = self.ctm
+ return ((a, b, 0), (c, d, 0), (e, f, 1))
@staticmethod
def compress(matrix: TransformationMatrixType
@@ -72,7 +73,7 @@ class Transformation:
Returns:
A tuple representing the transformation matrix as (a, b, c, d, e, f)
"""
- pass
+ return (matrix[0][0], matrix[0][1], matrix[1][0], matrix[1][1], matrix[2][0], matrix[2][1])
def transform(self, m: 'Transformation') ->'Transformation':
"""
@@ -90,7 +91,16 @@ class Transformation:
>>> op = Transformation().transform(Transformation((-1, 0, 0, 1, iwidth, 0))) # horizontal mirror
>>> page.add_transformation(op)
"""
- pass
+ a1, b1, c1, d1, e1, f1 = self.ctm
+ a2, b2, c2, d2, e2, f2 = m.ctm
+ return Transformation((
+ a1*a2 + b1*c2,
+ a1*b2 + b1*d2,
+ c1*a2 + d1*c2,
+ c1*b2 + d1*d2,
+ e1*a2 + f1*c2 + e2,
+ e1*b2 + f1*d2 + f2
+ ))
def translate(self, tx: float=0, ty: float=0) ->'Transformation':
"""
@@ -103,7 +113,7 @@ class Transformation:
Returns:
A new ``Transformation`` instance
"""
- pass
+ return Transformation((1, 0, 0, 1, tx, ty)).transform(self)
def scale(self, sx: Optional[float]=None, sy: Optional[float]=None
) ->'Transformation':
@@ -120,7 +130,13 @@ class Transformation:
Returns:
A new Transformation instance with the scaled matrix.
"""
- pass
+ if sx is None and sy is None:
+ return self
+ if sx is None:
+ sx = sy
+ if sy is None:
+ sy = sx
+ return Transformation((sx, 0, 0, sy, 0, 0)).transform(self)
def rotate(self, rotation: float) ->'Transformation':
"""
@@ -132,7 +148,10 @@ class Transformation:
Returns:
A new ``Transformation`` instance with the rotated matrix.
"""
- pass
+ rotation_rad = math.radians(rotation)
+ cos_theta = math.cos(rotation_rad)
+ sin_theta = math.sin(rotation_rad)
+ return Transformation((cos_theta, sin_theta, -sin_theta, cos_theta, 0, 0)).transform(self)
def __repr__(self) ->str:
return f'Transformation(ctm={self.ctm})'
@@ -144,11 +163,16 @@ class Transformation:
Args:
pt: A tuple or list representing the point in the form (x, y)
+ as_object: If True, return the result as a list, otherwise as a tuple
Returns:
A tuple or list representing the transformed point in the form (x', y')
"""
- pass
+ x, y = pt
+ a, b, c, d, e, f = self.ctm
+ x_new = a * x + c * y + e
+ y_new = b * x + d * y + f
+ return [x_new, y_new] if as_object else (x_new, y_new)
class PageObject(DictionaryObject):
@@ -184,7 +208,7 @@ class PageObject(DictionaryObject):
space unit is 1/72 inch, and a value of 3 means that a user
space unit is 3/72 inch.
"""
- pass
+ return self.get("/UserUnit", 1)
@staticmethod
def create_blank_page(pdf: Optional[PdfCommonDocProtocol]=None, width:
@@ -210,7 +234,25 @@ class PageObject(DictionaryObject):
PageSizeNotDefinedError: if ``pdf`` is ``None`` or contains
no page
"""
- pass
+ page = PageObject(pdf)
+
+ # Get the page size from the last page of PDF
+ if pdf is not None and len(pdf.pages) > 0:
+ last_page = pdf.pages[-1]
+ if width is None:
+ width = last_page.mediabox.width
+ if height is None:
+ height = last_page.mediabox.height
+
+ if width is None or height is None:
+ raise PageSizeNotDefinedError
+
+ page[NameObject(PG.MEDIABOX)] = RectangleObject((0, 0, width, height))
+
+ page[NameObject(PG.RESOURCES)] = DictionaryObject()
+ page[NameObject(PG.CONTENTS)] = ArrayObject()
+
+ return page
@property
def _old_images(self) ->List[File]:
@@ -222,7 +264,14 @@ class PageObject(DictionaryObject):
For the moment, this does NOT include inline images. They will be added
in future.
"""
- pass
+ images = []
+ resources = self.get("/Resources", {})
+ if "/XObject" in resources:
+ xobjects = resources["/XObject"].get_object()
+ for obj in xobjects:
+ if xobjects[obj]["/Subtype"] == "/Image":
+ images.append(_xobj_to_image(xobjects[obj]))
+ return images
@property
def images(self) ->List[ImageFile]:
diff --git a/pypdf/_page_labels.py b/pypdf/_page_labels.py
index f061301..3082fb3 100644
--- a/pypdf/_page_labels.py
+++ b/pypdf/_page_labels.py
@@ -74,7 +74,57 @@ def index2label(reader: PdfCommonDocProtocol, index: int) ->str:
Returns:
The label of the page, e.g. "iv" or "4".
"""
- pass
+ if "/PageLabels" not in reader.root_object:
+ return str(index + 1)
+
+ nums = reader.root_object["/PageLabels"]["/Nums"]
+ label_dict = None
+ start_index = 0
+
+ for i in range(0, len(nums), 2):
+ if nums[i] > index:
+ break
+ start_index = nums[i]
+ label_dict = reader.get_object(nums[i + 1])
+
+ if label_dict is None:
+ return str(index + 1)
+
+ style = label_dict.get("/S", "D")
+ prefix = label_dict.get("/P", "")
+ start = label_dict.get("/St", 1)
+
+ num = index - start_index + start
+
+ if style == "/D":
+ return f"{prefix}{num}"
+ elif style == "/R":
+ return f"{prefix}{to_roman(num).upper()}"
+ elif style == "/r":
+ return f"{prefix}{to_roman(num).lower()}"
+ elif style == "/A":
+ return f"{prefix}{to_alpha(num).upper()}"
+ elif style == "/a":
+ return f"{prefix}{to_alpha(num).lower()}"
+ else:
+ return str(index + 1)
+
+def to_roman(num: int) -> str:
+ roman = ["M", "CM", "D", "CD", "C", "XC", "L", "XL", "X", "IX", "V", "IV", "I"]
+ arabic = [1000, 900, 500, 400, 100, 90, 50, 40, 10, 9, 5, 4, 1]
+ result = ""
+ for i, value in enumerate(arabic):
+ while num >= value:
+ result += roman[i]
+ num -= value
+ return result
+
+def to_alpha(num: int) -> str:
+ result = ""
+ while num > 0:
+ num, remainder = divmod(num - 1, 26)
+ result = chr(65 + remainder) + result
+ return result
def nums_insert(key: NumberObject, value: DictionaryObject, nums: ArrayObject
@@ -89,7 +139,15 @@ def nums_insert(key: NumberObject, value: DictionaryObject, nums: ArrayObject
value: value of the entry
nums: Nums array to modify
"""
- pass
+ for i in range(0, len(nums), 2):
+ if nums[i] == key:
+ nums[i + 1] = value
+ return
+ elif nums[i] > key:
+ nums.insert(i, value)
+ nums.insert(i, key)
+ return
+ nums.extend([key, value])
def nums_clear_range(key: NumberObject, page_index_to: int, nums: ArrayObject
@@ -104,7 +162,13 @@ def nums_clear_range(key: NumberObject, page_index_to: int, nums: ArrayObject
page_index_to: The page index of the upper limit of the range
nums: Nums array to modify
"""
- pass
+ start_index = nums.index(key) if key in nums else -2
+ end_index = start_index + 2
+
+ while end_index < len(nums) and nums[end_index] <= page_index_to:
+ end_index += 2
+
+ del nums[start_index + 2:end_index]
def nums_next(key: NumberObject, nums: ArrayObject) ->Tuple[Optional[
@@ -118,4 +182,7 @@ def nums_next(key: NumberObject, nums: ArrayObject) ->Tuple[Optional[
key: number key of the entry
nums: Nums array
"""
- pass
+ for i in range(0, len(nums), 2):
+ if nums[i] > key:
+ return nums[i], cast(DictionaryObject, nums[i + 1])
+ return None, None
diff --git a/pypdf/_protocols.py b/pypdf/_protocols.py
index 2ae0694..8ff0afc 100644
--- a/pypdf/_protocols.py
+++ b/pypdf/_protocols.py
@@ -12,19 +12,57 @@ from ._utils import StrByteType, StreamType
class PdfObjectProtocol(Protocol):
indirect_reference: Any
+ @abstractmethod
+ def get_object(self) -> Any:
+ pass
+
+ @abstractmethod
+ def write_to_stream(self, stream: StreamType) -> None:
+ pass
+
class XmpInformationProtocol(PdfObjectProtocol):
- pass
+ @abstractmethod
+ def get_element(self, name: str) -> Optional[str]:
+ pass
+
+ @abstractmethod
+ def get_dc_metadata(self) -> Dict[str, Any]:
+ pass
+
+ @abstractmethod
+ def get_pdf_metadata(self) -> Dict[str, Any]:
+ pass
class PdfCommonDocProtocol(Protocol):
- pass
+ @abstractmethod
+ def get_page(self, page_number: int) -> Any:
+ pass
+
+ @abstractmethod
+ def pages(self) -> Any:
+ pass
class PdfReaderProtocol(PdfCommonDocProtocol, Protocol):
- pass
+ @abstractmethod
+ def read(self) -> None:
+ pass
+
+ @abstractmethod
+ def decrypt(self, password: Union[str, bytes]) -> int:
+ pass
class PdfWriterProtocol(PdfCommonDocProtocol, Protocol):
_objects: List[Any]
_id_translated: Dict[int, Dict[int, int]]
+
+ @abstractmethod
+ def add_page(self, page: Any) -> None:
+ pass
+
+ @abstractmethod
+ def write(self, stream: Union[Path, StrByteType]) -> Tuple[bool, IO[Any]]:
+ pass
diff --git a/pypdf/_reader.py b/pypdf/_reader.py
index aa95113..514bb57 100644
--- a/pypdf/_reader.py
+++ b/pypdf/_reader.py
@@ -82,12 +82,15 @@ class PdfReader(PdfDocCommon):
def close(self) ->None:
"""Close the stream if opened in __init__ and clear memory."""
- pass
+ if self._stream_opened:
+ self.stream.close()
+ self.resolved_objects.clear()
+ self.flattened_pages = None
@property
def root_object(self) ->DictionaryObject:
"""Provide access to "/Root". Standardized with PdfWriter."""
- pass
+ return cast(DictionaryObject, self.trailer[TK.ROOT].get_object())
@property
def _info(self) ->Optional[DictionaryObject]:
@@ -97,7 +100,9 @@ class PdfReader(PdfDocCommon):
Returns:
/Info Dictionary; None if the entry does not exist
"""
- pass
+ if TK.INFO not in self.trailer:
+ return None
+ return cast(DictionaryObject, self.trailer[TK.INFO].get_object())
@property
def _ID(self) ->Optional[ArrayObject]:
@@ -107,7 +112,9 @@ class PdfReader(PdfDocCommon):
Returns:
/ID array; None if the entry does not exist
"""
- pass
+ if TK.ID not in self.trailer:
+ return None
+ return cast(ArrayObject, self.trailer[TK.ID].get_object())
def _repr_mimebundle_(self, include: Union[None, Iterable[str]]=None,
exclude: Union[None, Iterable[str]]=None) ->Dict[str, Any]:
@@ -119,7 +126,21 @@ class PdfReader(PdfDocCommon):
See https://ipython.readthedocs.io/en/stable/config/integrating.html
"""
- pass
+ from PIL import Image
+ import io
+
+ if self.pages:
+ first_page = self.pages[0]
+ img = first_page.render()
+ img_byte_arr = io.BytesIO()
+ img.save(img_byte_arr, format='PNG')
+ img_byte_arr = img_byte_arr.getvalue()
+
+ return {
+ 'image/png': img_byte_arr,
+ 'text/plain': f'PDF document with {len(self.pages)} page{"s" if len(self.pages) > 1 else ""}'
+ }
+ return {'text/plain': 'Empty PDF document'}
@property
def pdf_header(self) ->str:
@@ -129,12 +150,16 @@ class PdfReader(PdfDocCommon):
This is typically something like ``'%PDF-1.6'`` and can be used to
detect if the file is actually a PDF file and which version it is.
"""
- pass
+ self.stream.seek(0)
+ return self.stream.read(8).decode('ascii')
@property
def xmp_metadata(self) ->Optional[XmpInformation]:
"""XMP (Extensible Metadata Platform) data."""
- pass
+ try:
+ return XmpInformation(self)
+ except Exception:
+ return None
def _get_page(self, page_number: int) ->PageObject:
"""
@@ -147,7 +172,9 @@ class PdfReader(PdfDocCommon):
Returns:
A :class:`PageObject<pypdf._page.PageObject>` instance.
"""
- pass
+ if self.flattened_pages is None:
+ self._flatten()
+ return self.flattened_pages[page_number]
def _get_page_number_by_indirect(self, indirect_reference: Union[None,
int, NullObject, IndirectObject]) ->Optional[int]:
@@ -160,11 +187,22 @@ class PdfReader(PdfDocCommon):
Returns:
The page number or None
"""
- pass
+ if self._page_id2num is None:
+ self._page_id2num = {}
+ for i, page in enumerate(self.pages):
+ if page.indirect_reference is not None:
+ self._page_id2num[page.indirect_reference.idnum] = i
+ if indirect_reference is None or isinstance(indirect_reference, NullObject):
+ return None
+ if isinstance(indirect_reference, int):
+ return self._page_id2num.get(indirect_reference)
+ return self._page_id2num.get(indirect_reference.idnum)
def _basic_validation(self, stream: StreamType) ->None:
"""Ensure file is not empty. Read at most 5 bytes."""
- pass
+ stream.seek(0)
+ if not stream.read(5):
+ raise EmptyFileError("Cannot read an empty file")
def _find_eof_marker(self, stream: StreamType) ->None:
"""
@@ -174,7 +212,12 @@ class PdfReader(PdfDocCommon):
the file. Hence for standard-compliant PDF documents this function will
read only the last part (DEFAULT_BUFFER_SIZE).
"""
- pass
+ stream.seek(-1024, 2)
+ end = stream.read().lower()
+ idx = end.rfind(b"%%eof")
+ if idx == -1:
+ raise PdfReadError("EOF marker not found")
+ stream.seek(stream.tell() - len(end) + idx)
def _find_startxref_pos(self, stream: StreamType) ->int:
"""
@@ -186,7 +229,11 @@ class PdfReader(PdfDocCommon):
Returns:
The bytes offset
"""
- pass
+ stream.seek(-1024, 2)
+ line = b""
+ while b"startxref" not in line:
+ line = read_previous_line(stream)
+ return int(read_previous_line(stream))
@staticmethod
def _get_xref_issues(stream: StreamType, startxref: int) ->int:
@@ -200,7 +247,15 @@ class PdfReader(PdfDocCommon):
Returns:
0 means no issue, other values represent specific issues.
"""
- pass
+ stream.seek(startxref)
+ if stream.read(5) != b"xref ":
+ return 1
+ stream.seek(startxref + 5)
+ try:
+ int(stream.read(10))
+ except ValueError:
+ return 2
+ return 0
def decrypt(self, password: Union[str, bytes]) ->PasswordType:
"""
diff --git a/pypdf/_text_extraction/_layout_mode/_fixed_width_page.py b/pypdf/_text_extraction/_layout_mode/_fixed_width_page.py
index 4d943bc..54e0060 100644
--- a/pypdf/_text_extraction/_layout_mode/_fixed_width_page.py
+++ b/pypdf/_text_extraction/_layout_mode/_fixed_width_page.py
@@ -50,7 +50,15 @@ def bt_group(tj_op: TextStateParams, rendered_text: str, dispaced_tx: float
rendered_text (str): rendered text
dispaced_tx (float): x coordinate of last character in BTGroup
"""
- pass
+ return BTGroup(
+ tx=tj_op.tx,
+ ty=tj_op.ty,
+ font_size=tj_op.font_size,
+ font_height=tj_op.font_height,
+ text=rendered_text,
+ displaced_tx=dispaced_tx,
+ flip_sort=1 if tj_op.ty >= 0 else -1
+ )
def recurs_to_target_op(ops: Iterator[Tuple[List[Any], bytes]],
@@ -70,7 +78,33 @@ def recurs_to_target_op(ops: Iterator[Tuple[List[Any], bytes]],
Returns:
tuple: list of BTGroup dicts + list of TextStateParams dataclass instances.
"""
- pass
+ bt_groups = []
+ text_state_params = []
+
+ for operands, operator in ops:
+ if operator == end_target:
+ return bt_groups, text_state_params
+
+ if operator == b'BT':
+ sub_bt_groups, sub_text_state_params = recurs_to_target_op(ops, text_state_mgr, b'ET', fonts, strip_rotated)
+ bt_groups.extend(sub_bt_groups)
+ text_state_params.extend(sub_text_state_params)
+ elif operator == b'q':
+ text_state_mgr.push()
+ sub_bt_groups, sub_text_state_params = recurs_to_target_op(ops, text_state_mgr, b'Q', fonts, strip_rotated)
+ bt_groups.extend(sub_bt_groups)
+ text_state_params.extend(sub_text_state_params)
+ elif operator in (b'Tj', b'TJ'):
+ tj_op = text_state_mgr.tj_op()
+ if not strip_rotated or text_state_mgr.is_upright():
+ rendered_text = fonts[tj_op.font_name].decode(operands[0])
+ displaced_tx = text_state_mgr.displaced_tx(operands[0])
+ bt_groups.append(bt_group(tj_op, rendered_text, displaced_tx))
+ text_state_params.append(tj_op)
+ else:
+ text_state_mgr.apply(operands, operator)
+
+ return bt_groups, text_state_params
def y_coordinate_groups(bt_groups: List[BTGroup], debug_path: Optional[Path
@@ -86,7 +120,18 @@ def y_coordinate_groups(bt_groups: List[BTGroup], debug_path: Optional[Path
Dict[int, List[BTGroup]]: dict of lists of text rendered by each BT operator
keyed by y coordinate
"""
- pass
+ sorted_groups = sorted(bt_groups, key=lambda x: (x['ty'] * x['flip_sort'], x['tx']))
+ grouped = {k: list(g) for k, g in groupby(sorted_groups, key=lambda x: ceil(x['ty']))}
+
+ if debug_path:
+ with open(debug_path / 'y_coordinate_groups.txt', 'w') as f:
+ for y, group in grouped.items():
+ f.write(f"Y: {y}\n")
+ for item in group:
+ f.write(f" {item['text']}\n")
+ f.write("\n")
+
+ return grouped
def text_show_operations(ops: Iterator[Tuple[List[Any], bytes]], fonts:
@@ -104,7 +149,26 @@ def text_show_operations(ops: Iterator[Tuple[List[Any], bytes]], fonts:
Returns:
List[BTGroup]: list of dicts of text rendered by each BT operator
"""
- pass
+ text_state_mgr = TextStateManager()
+ bt_groups = []
+
+ for operands, operator in ops:
+ if operator == b'BT':
+ sub_bt_groups, _ = recurs_to_target_op(ops, text_state_mgr, b'ET', fonts, strip_rotated)
+ bt_groups.extend(sub_bt_groups)
+ elif operator == b'q':
+ text_state_mgr.push()
+ sub_bt_groups, _ = recurs_to_target_op(ops, text_state_mgr, b'Q', fonts, strip_rotated)
+ bt_groups.extend(sub_bt_groups)
+ else:
+ text_state_mgr.apply(operands, operator)
+
+ if debug_path:
+ with open(debug_path / 'text_show_operations.txt', 'w') as f:
+ for group in bt_groups:
+ f.write(f"Text: {group['text']}, Position: ({group['tx']}, {group['ty']})\n")
+
+ return bt_groups
def fixed_char_width(bt_groups: List[BTGroup], scale_weight: float=1.25
@@ -120,7 +184,19 @@ def fixed_char_width(bt_groups: List[BTGroup], scale_weight: float=1.25
Returns:
float: fixed character width
"""
- pass
+ total_width = 0
+ total_chars = 0
+ for group in bt_groups:
+ text_length = len(group['text'])
+ if text_length > 0:
+ width = (group['displaced_tx'] - group['tx']) / text_length
+ total_width += width * text_length * scale_weight
+ total_chars += text_length
+
+ if total_chars == 0:
+ return 1.0 # Default to 1.0 if no characters found
+
+ return total_width / total_chars
def fixed_width_page(ty_groups: Dict[int, List[BTGroup]], char_width: float,
@@ -137,4 +213,23 @@ def fixed_width_page(ty_groups: Dict[int, List[BTGroup]], char_width: float,
str: page text in a fixed width format that closely adheres to the rendered
layout in the source pdf.
"""
- pass
+ sorted_y = sorted(ty_groups.keys(), reverse=True)
+ result = []
+ prev_y = None
+
+ for y in sorted_y:
+ if space_vertically and prev_y is not None:
+ line_gap = int((prev_y - y) / char_width) - 1
+ result.extend([''] * max(0, line_gap))
+
+ line = ''
+ for group in sorted(ty_groups[y], key=lambda x: x['tx']):
+ x_pos = int(group['tx'] / char_width)
+ while len(line) < x_pos:
+ line += ' '
+ line += group['text']
+
+ result.append(line.rstrip())
+ prev_y = y
+
+ return '\n'.join(result)
diff --git a/pypdf/_text_extraction/_layout_mode/_font.py b/pypdf/_text_extraction/_layout_mode/_font.py
index f63da23..0b70bc1 100644
--- a/pypdf/_text_extraction/_layout_mode/_font.py
+++ b/pypdf/_text_extraction/_layout_mode/_font.py
@@ -70,9 +70,16 @@ class Font:
def word_width(self, word: str) ->float:
"""Sum of character widths specified in PDF font for the supplied word"""
- pass
+ return sum(self.width_map.get(char, self.space_width) for char in word)
@staticmethod
def to_dict(font_instance: 'Font') ->Dict[str, Any]:
"""Dataclass to dict for json.dumps serialization."""
- pass
+ return {
+ 'subtype': font_instance.subtype,
+ 'space_width': font_instance.space_width,
+ 'encoding': font_instance.encoding,
+ 'char_map': font_instance.char_map,
+ 'font_dictionary': font_instance.font_dictionary,
+ 'width_map': font_instance.width_map
+ }
diff --git a/pypdf/_text_extraction/_layout_mode/_text_state_manager.py b/pypdf/_text_extraction/_layout_mode/_text_state_manager.py
index 3dc8948..92576f6 100644
--- a/pypdf/_text_extraction/_layout_mode/_text_state_manager.py
+++ b/pypdf/_text_extraction/_layout_mode/_text_state_manager.py
@@ -53,7 +53,17 @@ class TextStateManager:
value (float | List[Any]): new parameter value. If a list,
value[0] is used.
"""
- pass
+ param_value = value[0] if isinstance(value, list) else value
+ if op == b'Tc':
+ self.Tc = float(param_value)
+ elif op == b'Tz':
+ self.Tz = float(param_value)
+ elif op == b'Tw':
+ self.Tw = float(param_value)
+ elif op == b'TL':
+ self.TL = float(param_value)
+ elif op == b'Ts':
+ self.Ts = float(param_value)
def set_font(self, font: Font, size: float) ->None:
"""
@@ -63,7 +73,8 @@ class TextStateManager:
font (Font): a layout mode Font
size (float): font size
"""
- pass
+ self.font = font
+ self.font_size = size
def text_state_params(self, value: Union[bytes, str]='') ->TextStateParams:
"""
@@ -79,54 +90,90 @@ class TextStateManager:
Returns:
TextStateParams: current text state parameters
"""
- pass
+ if self.font is None:
+ raise PdfReadError("Font not set (no Tf operator in incoming pdf content stream)")
+
+ if isinstance(value, bytes):
+ value = value.decode('utf-8')
+
+ return TextStateParams(
+ font=self.font,
+ font_size=self.font_size,
+ Tc=self.Tc,
+ Tw=self.Tw,
+ Tz=self.Tz,
+ TL=self.TL,
+ Ts=self.Ts,
+ transform=self.effective_transform,
+ value=value
+ )
@staticmethod
def raw_transform(_a: float=1.0, _b: float=0.0, _c: float=0.0, _d:
float=1.0, _e: float=0.0, _f: float=0.0) ->Dict[int, float]:
"""Only a/b/c/d/e/f matrix params"""
- pass
+ return {0: _a, 1: _b, 2: _c, 3: _d, 4: _e, 5: _f}
@staticmethod
def new_transform(_a: float=1.0, _b: float=0.0, _c: float=0.0, _d:
float=1.0, _e: float=0.0, _f: float=0.0, is_text: bool=False,
is_render: bool=False) ->TextStateManagerDictType:
"""Standard a/b/c/d/e/f matrix params + 'is_text' and 'is_render' keys"""
- pass
+ transform = TextStateManager.raw_transform(_a, _b, _c, _d, _e, _f)
+ transform['is_text'] = is_text
+ transform['is_render'] = is_render
+ return transform
def reset_tm(self) ->TextStateManagerChainMapType:
"""Clear all transforms from chainmap having is_text==True or is_render==True"""
- pass
+ self.transform_stack = ChainMap({k: v for k, v in self.transform_stack.maps[0].items() if not (v.get('is_text', False) or v.get('is_render', False))})
+ return self.transform_stack
def reset_trm(self) ->TextStateManagerChainMapType:
"""Clear all transforms from chainmap having is_render==True"""
- pass
+ self.transform_stack = ChainMap({k: v for k, v in self.transform_stack.maps[0].items() if not v.get('is_render', False)})
+ return self.transform_stack
def remove_q(self) ->TextStateManagerChainMapType:
"""Rewind to stack prior state after closing a 'q' with internal 'cm' ops"""
- pass
+ if self.q_depth[-1] > 0:
+ self.q_depth[-1] -= 1
+ self.transform_stack = self.transform_stack.parents
+ return self.transform_stack
def add_q(self) ->None:
"""Add another level to q_queue"""
- pass
+ self.q_depth[-1] += 1
+ self.transform_stack = self.transform_stack.new_child()
def add_cm(self, *args: Any) ->TextStateManagerChainMapType:
"""Concatenate an additional transform matrix"""
- pass
+ new_transform = self.new_transform(*args)
+ self.transform_stack = self.transform_stack.new_child(new_transform)
+ return self.transform_stack
def _complete_matrix(self, operands: List[float]) ->List[float]:
"""Adds a, b, c, and d to an "e/f only" operand set (e.g Td)"""
- pass
+ return [1, 0, 0, 1] + operands if len(operands) == 2 else operands
def add_tm(self, operands: List[float]) ->TextStateManagerChainMapType:
"""Append a text transform matrix"""
- pass
+ complete_operands = self._complete_matrix(operands)
+ new_transform = self.new_transform(*complete_operands, is_text=True)
+ self.transform_stack = self.transform_stack.new_child(new_transform)
+ return self.transform_stack
def add_trm(self, operands: List[float]) ->TextStateManagerChainMapType:
"""Append a text rendering transform matrix"""
- pass
+ complete_operands = self._complete_matrix(operands)
+ new_transform = self.new_transform(*complete_operands, is_render=True)
+ self.transform_stack = self.transform_stack.new_child(new_transform)
+ return self.transform_stack
@property
def effective_transform(self) ->List[float]:
"""Current effective transform accounting for cm, tm, and trm transforms"""
- pass
+ result = [1, 0, 0, 1, 0, 0]
+ for transform in reversed(self.transform_stack.maps):
+ result = mult(result, [transform.get(i, 0) for i in range(6)])
+ return result
diff --git a/pypdf/_text_extraction/_layout_mode/_text_state_params.py b/pypdf/_text_extraction/_layout_mode/_text_state_params.py
index 341ce6c..3afa9df 100644
--- a/pypdf/_text_extraction/_layout_mode/_text_state_params.py
+++ b/pypdf/_text_extraction/_layout_mode/_text_state_params.py
@@ -70,15 +70,18 @@ class TextStateParams:
def font_size_matrix(self) ->List[float]:
"""Font size matrix"""
- pass
+ return [self.font_size * self.Tz / 100, 0, 0, self.font_size, 0, 0]
def displaced_transform(self) ->List[float]:
"""Effective transform matrix after text has been rendered."""
- pass
+ displacement = self.displacement_matrix()
+ return mult(displacement, self.transform)
def render_transform(self) ->List[float]:
"""Effective transform matrix accounting for font size, Tz, and Ts."""
- pass
+ font_size_matrix = self.font_size_matrix()
+ text_rise = [1, 0, 0, 1, 0, self.Ts]
+ return mult(mult(text_rise, font_size_matrix), self.transform)
def displacement_matrix(self, word: Union[str, None]=None, TD_offset:
float=0.0) ->List[float]:
@@ -90,13 +93,34 @@ class TextStateParams:
returned.
TD_offset (float, optional): translation applied by TD operator. Defaults to 0.0.
"""
- pass
+ text = word if word is not None else self.txt
+ tx = self.word_tx(text, TD_offset)
+ return [1, 0, 0, 1, tx, 0]
def word_tx(self, word: str, TD_offset: float=0.0) ->float:
"""Horizontal text displacement for any word according this text state"""
- pass
+ width = self.font.get_width(word) * self.font_size / 1000
+ spaces = word.count(' ')
+ return (width + self.Tc * len(word) + self.Tw * spaces) * self.Tz / 100 + TD_offset
@staticmethod
def to_dict(inst: 'TextStateParams') ->Dict[str, Any]:
"""Dataclass to dict for json.dumps serialization"""
- pass
+ return {
+ 'txt': inst.txt,
+ 'font': inst.font.to_dict(),
+ 'font_size': inst.font_size,
+ 'Tc': inst.Tc,
+ 'Tw': inst.Tw,
+ 'Tz': inst.Tz,
+ 'TL': inst.TL,
+ 'Ts': inst.Ts,
+ 'transform': inst.transform,
+ 'tx': inst.tx,
+ 'ty': inst.ty,
+ 'displaced_tx': inst.displaced_tx,
+ 'space_tx': inst.space_tx,
+ 'font_height': inst.font_height,
+ 'flip_vertical': inst.flip_vertical,
+ 'rotated': inst.rotated
+ }
diff --git a/pypdf/_utils.py b/pypdf/_utils.py
index d2f9468..0d22654 100644
--- a/pypdf/_utils.py
+++ b/pypdf/_utils.py
@@ -38,7 +38,15 @@ def read_until_whitespace(stream: StreamType, maxchars: Optional[int]=None
Returns:
The data which was read.
"""
- pass
+ txt = b""
+ while True:
+ if maxchars is not None and len(txt) >= maxchars:
+ break
+ tok = stream.read(1)
+ if tok.isspace() or not tok:
+ break
+ txt += tok
+ return txt
def read_non_whitespace(stream: StreamType) ->bytes:
@@ -51,7 +59,12 @@ def read_non_whitespace(stream: StreamType) ->bytes:
Returns:
The data which was read.
"""
- pass
+ while True:
+ tok = stream.read(1)
+ if not tok:
+ return b""
+ if not tok.isspace():
+ return tok
def skip_over_whitespace(stream: StreamType) ->bool:
@@ -65,7 +78,15 @@ def skip_over_whitespace(stream: StreamType) ->bool:
Returns:
True if more than one whitespace was skipped, otherwise return False.
"""
- pass
+ num_whitespace = 0
+ while True:
+ tok = stream.read(1)
+ if not tok:
+ return num_whitespace > 1
+ if not tok.isspace():
+ stream.seek(-1, SEEK_CUR)
+ return num_whitespace > 1
+ num_whitespace += 1
def check_if_whitespace_only(value: bytes) ->bool:
@@ -78,7 +99,7 @@ def check_if_whitespace_only(value: bytes) ->bool:
Returns:
True if the value only has whitespace characters, otherwise return False.
"""
- pass
+ return all(byte.isspace() for byte in value)
def read_until_regex(stream: StreamType, regex: Pattern[bytes]) ->bytes:
@@ -92,7 +113,14 @@ def read_until_regex(stream: StreamType, regex: Pattern[bytes]) ->bytes:
Returns:
The read bytes.
"""
- pass
+ buf = b""
+ while True:
+ tok = stream.read(1)
+ if not tok:
+ return buf
+ buf += tok
+ if regex.search(buf):
+ return buf[:-1]
def read_block_backwards(stream: StreamType, to_read: int) ->bytes:
@@ -109,7 +137,12 @@ def read_block_backwards(stream: StreamType, to_read: int) ->bytes:
Returns:
The data which was read.
"""
- pass
+ current_pos = stream.tell()
+ start_pos = max(0, current_pos - to_read)
+ stream.seek(start_pos)
+ data = stream.read(current_pos - start_pos)
+ stream.seek(start_pos)
+ return data
def read_previous_line(stream: StreamType) ->bytes:
@@ -128,12 +161,36 @@ def read_previous_line(stream: StreamType) ->bytes:
Returns:
The data which was read.
"""
- pass
+ current_pos = stream.tell()
+ stream.seek(0, SEEK_CUR)
+ line = b""
+ while True:
+ if stream.tell() == 0:
+ break
+ stream.seek(-2, SEEK_CUR)
+ char = stream.read(1)
+ if char in (b'\n', b'\r'):
+ stream.seek(1, SEEK_CUR)
+ break
+ line = char + line
+ result = stream.read(current_pos - stream.tell())
+ while stream.tell() < current_pos:
+ char = stream.read(1)
+ if char not in (b'\n', b'\r'):
+ stream.seek(-1, SEEK_CUR)
+ break
+ return result
def mark_location(stream: StreamType) ->None:
"""Create text file showing current location in context."""
- pass
+ pos = stream.tell()
+ stream.seek(max(0, pos - 32))
+ before = stream.read(min(32, pos))
+ after = stream.read(32)
+ stream.seek(pos)
+ with open("pypdf_debug_location.txt", "wb") as fp:
+ fp.write(before + b"<*>" + after)
B_CACHE: Dict[Union[str, bytes], bytes] = {}
@@ -145,23 +202,36 @@ WHITESPACES_AS_REGEXP = b'[' + WHITESPACES_AS_BYTES + b']'
def deprecate_with_replacement(old_name: str, new_name: str, removed_in: str
) ->None:
"""Raise an exception that a feature will be removed, but has a replacement."""
- pass
+ warnings.warn(
+ f"{old_name} is deprecated and will be removed in {removed_in}. "
+ f"Use {new_name} instead.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
def deprecation_with_replacement(old_name: str, new_name: str, removed_in: str
) ->None:
"""Raise an exception that a feature was already removed, but has a replacement."""
- pass
+ raise DeprecationError(
+ f"{old_name} was removed in {removed_in}. Use {new_name} instead."
+ )
def deprecate_no_replacement(name: str, removed_in: str) ->None:
"""Raise an exception that a feature will be removed without replacement."""
- pass
+ warnings.warn(
+ f"{name} is deprecated and will be removed in {removed_in}.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
def deprecation_no_replacement(name: str, removed_in: str) ->None:
"""Raise an exception that a feature was already removed without replacement."""
- pass
+ raise DeprecationError(
+ f"{name} was removed in {removed_in}."
+ )
def logger_error(msg: str, src: str) ->None:
@@ -173,7 +243,7 @@ def logger_error(msg: str, src: str) ->None:
See the docs on when to use which:
https://pypdf.readthedocs.io/en/latest/user/suppress-warnings.html
"""
- pass
+ logging.getLogger(src).error(msg)
def logger_warning(msg: str, src: str) ->None:
@@ -192,7 +262,7 @@ def logger_warning(msg: str, src: str) ->None:
pypdf could apply a robustness fix to still read it. This applies mainly
to strict=False mode.
"""
- pass
+ logging.getLogger(src).warning(msg)
def rename_kwargs(func_name: str, kwargs: Dict[str, Any], aliases: Dict[str,
@@ -206,7 +276,18 @@ def rename_kwargs(func_name: str, kwargs: Dict[str, Any], aliases: Dict[str,
aliases:
fail:
"""
- pass
+ for old_arg, new_arg in aliases.items():
+ if old_arg in kwargs:
+ if new_arg in kwargs:
+ raise TypeError(f"{func_name}() received both {old_arg} and {new_arg}")
+ warnings.warn(
+ f"{old_arg} is deprecated. Use {new_arg} instead.",
+ DeprecationWarning,
+ stacklevel=3,
+ )
+ kwargs[new_arg] = kwargs.pop(old_arg)
+ elif fail and new_arg not in kwargs:
+ raise TypeError(f"{func_name}() missing required argument: '{new_arg}'")
class classproperty:
diff --git a/pypdf/_writer.py b/pypdf/_writer.py
index 9c09266..5516a8d 100644
--- a/pypdf/_writer.py
+++ b/pypdf/_writer.py
@@ -114,7 +114,7 @@ class PdfWriter(PdfDocCommon):
Note that this property, if true, will remain true even after the
:meth:`decrypt()<pypdf.PdfReader.decrypt>` method is called.
"""
- pass
+ return self._encryption is not None
@property
def root_object(self) ->DictionaryObject:
@@ -124,7 +124,7 @@ class PdfWriter(PdfDocCommon):
Note:
Recommended only for read access.
"""
- pass
+ return self._root_object
@property
def _info(self) ->Optional[DictionaryObject]:
@@ -134,17 +134,26 @@ class PdfWriter(PdfDocCommon):
Returns:
/Info Dictionary; None if the entry does not exist
"""
- pass
+ return self._info_obj if isinstance(self._info_obj, DictionaryObject) else None
@property
def xmp_metadata(self) ->Optional[XmpInformation]:
"""XMP (Extensible Metadata Platform) data."""
- pass
+ if CA.METADATA not in self._root_object:
+ return None
+ metadata = self._root_object[CA.METADATA]
+ if not isinstance(metadata, XmpInformation):
+ metadata = XmpInformation(metadata)
+ self._root_object[CA.METADATA] = metadata
+ return metadata
@xmp_metadata.setter
def xmp_metadata(self, value: Optional[XmpInformation]) ->None:
"""XMP (Extensible Metadata Platform) data."""
- pass
+ if value is None:
+ del self._root_object[CA.METADATA]
+ else:
+ self._root_object[CA.METADATA] = value
def __enter__(self) ->'PdfWriter':
"""Store that writer is initialized by 'with'."""
@@ -170,7 +179,16 @@ class PdfWriter(PdfDocCommon):
See https://ipython.readthedocs.io/en/stable/config/integrating.html
"""
- pass
+ from io import BytesIO
+
+ data = BytesIO()
+ self.write(data)
+ data.seek(0)
+
+ return {
+ 'application/pdf': data.getvalue(),
+ 'text/plain': f'PDF document with {len(self.pages)} page{"s" if len(self.pages) != 1 else ""}'
+ }
@property
def pdf_header(self) ->str:
@@ -183,7 +201,13 @@ class PdfWriter(PdfDocCommon):
Note: `pdf_header` returns a string but accepts bytes or str for writing
"""
- pass
+ return self._header.decode('ascii')
+
+ @pdf_header.setter
+ def pdf_header(self, value: Union[str, bytes]) ->None:
+ if isinstance(value, str):
+ value = value.encode('ascii')
+ self._header = value
def set_need_appearances_writer(self, state: bool=True) ->None:
"""
diff --git a/pypdf/_xobj_image_helpers.py b/pypdf/_xobj_image_helpers.py
index 658d357..96b15b0 100644
--- a/pypdf/_xobj_image_helpers.py
+++ b/pypdf/_xobj_image_helpers.py
@@ -33,7 +33,38 @@ def _get_imagemode(color_space: Union[str, List[Any], Any],
Image mode not taking into account mask(transparency)
ColorInversion is required (like for some DeviceCMYK)
"""
- pass
+ if depth > MAX_IMAGE_MODE_NESTING_DEPTH:
+ return '', False
+
+ if isinstance(color_space, str):
+ if color_space == ColorSpaces.DEVICE_RGB:
+ return 'RGB', False
+ elif color_space == ColorSpaces.DEVICE_CMYK:
+ return 'CMYK', True
+ elif color_space == ColorSpaces.DEVICE_GRAY:
+ return 'L', False
+ elif color_space == ColorSpaces.INDEXED:
+ return 'P', False
+ elif isinstance(color_space, list) and len(color_space) > 0:
+ if color_space[0] == ColorSpaces.INDEXED:
+ return 'P', False
+ elif color_space[0] == ColorSpaces.SEPARATION:
+ return _get_imagemode(color_space[-1], color_components, prev_mode, depth + 1)
+ elif color_space[0] == ColorSpaces.CAL_RGB:
+ return 'RGB', False
+ elif color_space[0] == ColorSpaces.CAL_GRAY:
+ return 'L', False
+ elif color_space[0] == ColorSpaces.LAB:
+ return 'LAB', False
+
+ if color_components == 1:
+ return 'L', False
+ elif color_components == 3:
+ return 'RGB', False
+ elif color_components == 4:
+ return 'CMYK', True
+
+ return prev_mode, False
def _handle_flate(size: Tuple[int, int], data: bytes, mode: mode_str_type,
@@ -43,13 +74,43 @@ def _handle_flate(size: Tuple[int, int], data: bytes, mode: mode_str_type,
Process image encoded in flateEncode
Returns img, image_format, extension, color inversion
"""
- pass
+ img = Image.frombytes(mode, size, data)
+
+ if mode == 'P':
+ # For indexed color images, we need to create a palette
+ palette = []
+ for i in range(256):
+ if i * 3 + 2 < len(data):
+ palette.extend(data[i * 3 : i * 3 + 3])
+ else:
+ palette.extend([0, 0, 0])
+ img.putpalette(palette)
+
+ if mode == 'CMYK':
+ # CMYK images need to be inverted
+ img = Image.merge('CMYK', [channel.point(lambda x: 255 - x) for channel in img.split()])
+
+ image_format = 'PNG'
+ extension = '.png'
+ color_inversion = mode == 'CMYK'
+
+ return img, image_format, extension, color_inversion
def _handle_jpx(size: Tuple[int, int], data: bytes, mode: mode_str_type,
color_space: str, colors: int) ->Tuple[Image.Image, str, str, bool]:
"""
- Process image encoded in flateEncode
+ Process image encoded in JPEG2000
Returns img, image_format, extension, inversion
"""
- pass
+ try:
+ img = Image.open(BytesIO(data))
+ except UnidentifiedImageError:
+ raise PdfReadError("Unable to process JPEG2000 image")
+
+ # JPEG2000 images are already in the correct color mode
+ image_format = 'JPEG2000'
+ extension = '.jp2'
+ inversion = False
+
+ return img, image_format, extension, inversion
diff --git a/pypdf/constants.py b/pypdf/constants.py
index 5c0b0cf..348df72 100644
--- a/pypdf/constants.py
+++ b/pypdf/constants.py
@@ -92,21 +92,21 @@ class UserAccessPermissions(IntFlag):
@classmethod
def _is_reserved(cls, name: str) ->bool:
"""Check if the given name corresponds to a reserved flag entry."""
- pass
+ return name.startswith('R')
@classmethod
def _is_active(cls, name: str) ->bool:
"""Check if the given reserved name defaults to 1 = active."""
- pass
+ return name in {'R1', 'R2'}
def to_dict(self) ->Dict[str, bool]:
"""Convert the given flag value to a corresponding verbose name mapping."""
- pass
+ return {name: bool(self & value) for name, value in self.__class__.__members__.items()}
@classmethod
def from_dict(cls, value: Dict[str, bool]) ->'UserAccessPermissions':
"""Convert the verbose name mapping to the corresponding flag value."""
- pass
+ return cls(sum(cls.__members__[name] for name, is_set in value.items() if is_set))
class Resources:
@@ -415,7 +415,7 @@ class FieldDictionaryAttributes:
Returns:
A tuple containing all the attribute constants.
"""
- pass
+ return tuple(attr for attr in vars(cls).values() if isinstance(attr, str) and attr.startswith('/'))
@classmethod
def attributes_dict(cls) ->Dict[str, str]:
@@ -431,7 +431,7 @@ class FieldDictionaryAttributes:
Returns:
A dictionary containing attribute keys and their names.
"""
- pass
+ return {attr: attr[1:] for attr in cls.attributes()}
class CheckboxRadioButtonAttributes:
@@ -451,7 +451,7 @@ class CheckboxRadioButtonAttributes:
Returns:
A tuple containing all the attribute constants.
"""
- pass
+ return tuple(attr for attr in vars(cls).values() if isinstance(attr, str) and attr.startswith('/'))
@classmethod
def attributes_dict(cls) ->Dict[str, str]:
@@ -467,7 +467,7 @@ class CheckboxRadioButtonAttributes:
Returns:
A dictionary containing attribute keys and their names.
"""
- pass
+ return {attr: attr[1:] for attr in cls.attributes()}
class FieldFlag(IntFlag):
diff --git a/pypdf/filters.py b/pypdf/filters.py
index c0443ef..7351364 100644
--- a/pypdf/filters.py
+++ b/pypdf/filters.py
@@ -37,7 +37,10 @@ def decompress(data: bytes) ->bytes:
Returns:
The decompressed data.
"""
- pass
+ try:
+ return zlib.decompress(data)
+ except zlib.error:
+ return zlib.decompress(data, -15)
class FlateDecode:
@@ -59,7 +62,39 @@ class FlateDecode:
Raises:
PdfReadError:
"""
- pass
+ data = decompress(data)
+ predictor = 1
+ if decode_parms:
+ try:
+ predictor = decode_parms.get("/Predictor", 1)
+ except AttributeError:
+ pass
+
+ if predictor != 1:
+ columns = decode_parms.get("/Columns", 1)
+ colors = decode_parms.get("/Colors", 1)
+ bitspercomponent = decode_parms.get("/BitsPerComponent", 8)
+
+ rowlength = (columns * colors * bitspercomponent + 7) // 8
+ if len(data) % rowlength != 0:
+ raise PdfReadError("Invalid data length")
+
+ output = bytearray()
+ row_prev = [0] * rowlength
+ for i in range(0, len(data), rowlength):
+ row = list(data[i:i+rowlength])
+ if predictor >= 10:
+ predictor_type = row[0]
+ row = row[1:]
+ if predictor_type == 2:
+ for j in range(len(row)):
+ row[j] = (row[j] + row_prev[j]) % 256
+ else:
+ raise PdfReadError(f"Unsupported PNG predictor {predictor_type}")
+ output.extend(row)
+ row_prev = row
+ data = bytes(output)
+ return data
@staticmethod
def encode(data: bytes, level: int=-1) ->bytes:
@@ -73,7 +108,7 @@ class FlateDecode:
Returns:
The compressed data.
"""
- pass
+ return zlib.compress(data, level)
class ASCIIHexDecode:
@@ -101,7 +136,23 @@ class ASCIIHexDecode:
Raises:
PdfStreamError:
"""
- pass
+ if isinstance(data, str):
+ data = data.encode('ascii')
+
+ data = data.replace(b'\x00', b'').replace(b'\n', b'').replace(b'\r', b'').replace(b' ', b'')
+
+ if data[-1:] == b'>':
+ data = data[:-1]
+
+ if len(data) % 2 != 0:
+ data += b'0'
+
+ try:
+ result = bytes.fromhex(data.decode('ascii'))
+ except ValueError:
+ raise PdfStreamError("Invalid hex data in ASCIIHexDecode")
+
+ return result
class RunLengthDecode:
@@ -134,7 +185,19 @@ class RunLengthDecode:
Raises:
PdfStreamError:
"""
- pass
+ decoded = bytearray()
+ i = 0
+ while i < len(data):
+ length = data[i]
+ if length == 128:
+ break
+ if length < 128:
+ decoded.extend(data[i+1:i+length+2])
+ i += length + 2
+ else:
+ decoded.extend([data[i+1]] * (257 - length))
+ i += 2
+ return bytes(decoded)
class LZWDecode:
@@ -171,7 +234,29 @@ class LZWDecode:
Raises:
PdfReadError: If the stop code is missing
"""
- pass
+ cW = self.CLEARDICT
+ baos = ""
+ while True:
+ pW = cW
+ cW = self.get_next_code()
+ if cW == -1:
+ raise PdfReadError("Missed the stop code in LZWDecode!")
+ if cW == self.STOP:
+ break
+ elif cW == self.CLEARDICT:
+ self.reset_dict()
+ elif pW == self.CLEARDICT:
+ baos += self.dict[cW]
+ else:
+ if cW < len(self.dict):
+ baos += self.dict[cW]
+ p = self.dict[pW] + self.dict[cW][0]
+ self.dict.append(p)
+ else:
+ p = self.dict[pW] + self.dict[pW][0]
+ baos += p
+ self.dict.append(p)
+ return baos
@staticmethod
def decode(data: bytes, decode_parms: Optional[DictionaryObject]=None,
@@ -186,7 +271,8 @@ class LZWDecode:
Returns:
decoded data.
"""
- pass
+ decoder = LZWDecode.Decoder(data)
+ return decoder.decode()
class ASCII85Decode:
@@ -205,7 +291,15 @@ class ASCII85Decode:
Returns:
decoded data.
"""
- pass
+ if isinstance(data, str):
+ data = data.encode('ascii')
+
+ if data.startswith(b'<~'):
+ data = data[2:]
+ if data.endswith(b'~>'):
+ data = data[:-2]
+
+ return a85decode(data)
class DCTDecode:
@@ -258,7 +352,49 @@ def decode_stream_data(stream: Any) ->Union[bytes, str]:
Raises:
NotImplementedError: If an unsupported filter type is encountered.
"""
- pass
+ filters = stream.get("/Filter", ())
+ if isinstance(filters, IndirectObject):
+ filters = filters.get_object()
+ params = stream.get("/DecodeParms", ())
+ if isinstance(params, IndirectObject):
+ params = params.get_object()
+
+ if isinstance(filters, ArrayObject):
+ filters = filters.items()
+ elif isinstance(filters, (NameObject, str)):
+ filters = (filters,)
+ else:
+ raise TypeError(f"/Filter should be name, string, or array, not {type(filters)}")
+
+ if isinstance(params, ArrayObject):
+ params = params.items()
+ elif isinstance(params, DictionaryObject):
+ params = (params,)
+ else:
+ params = ()
+
+ data = stream._data
+ for filter_type, param in zip_longest(filters, params):
+ if filter_type in (FTA.FLATEDECODE, FTA.FL, FT.FLATEDECODE):
+ data = FlateDecode.decode(data, param)
+ elif filter_type in (FTA.ASCIIHEXDECODE, FTA.AHX, FT.ASCIIHEXDECODE):
+ data = ASCIIHexDecode.decode(data, param)
+ elif filter_type in (FTA.RUNLENGTHDECODE, FTA.RL, FT.RUNLENGTHDECODE):
+ data = RunLengthDecode.decode(data, param)
+ elif filter_type in (FTA.LZWDECODE, FTA.LZW, FT.LZWDECODE):
+ data = LZWDecode.decode(data, param)
+ elif filter_type in (FTA.ASCII85DECODE, FTA.A85, FT.ASCII85DECODE):
+ data = ASCII85Decode.decode(data, param)
+ elif filter_type in (FTA.DCTDECODE, FTA.DCT, FT.DCTDECODE):
+ data = DCTDecode.decode(data, param)
+ elif filter_type in (FTA.JPXDECODE, FTA.JPX, FT.JPXDECODE):
+ data = JPXDecode.decode(data, param)
+ elif filter_type in (FTA.CCITTFAXDECODE, FTA.CCF, FT.CCITTFAXDECODE):
+ data = CCITTFaxDecode.decode(data, param)
+ else:
+ raise NotImplementedError(f"Unsupported filter: {filter_type}")
+
+ return data
def decodeStreamData(stream: Any) ->Union[str, bytes]:
diff --git a/pypdf/generic/_base.py b/pypdf/generic/_base.py
index 0b650fd..fe28f70 100644
--- a/pypdf/generic/_base.py
+++ b/pypdf/generic/_base.py
@@ -40,7 +40,7 @@ class PdfObject(PdfObjectProtocol):
Returns:
The cloned PdfObject
"""
- pass
+ return self._reference_clone(self, pdf_dest, force_duplicate)
def _reference_clone(self, clone: Any, pdf_dest: PdfWriterProtocol,
force_duplicate: bool=False) ->PdfObjectProtocol:
@@ -57,11 +57,15 @@ class PdfObject(PdfObjectProtocol):
Returns:
The clone
"""
- pass
+ if hasattr(self, 'indirect_reference') and self.indirect_reference is not None:
+ if not force_duplicate and self.indirect_reference.idnum in pdf_dest._objects:
+ return pdf_dest._objects[self.indirect_reference.idnum]
+ pdf_dest._add_object(clone)
+ return clone
def get_object(self) ->Optional['PdfObject']:
"""Resolve indirect references."""
- pass
+ return self
class NullObject(PdfObject):
@@ -70,7 +74,7 @@ class NullObject(PdfObject):
False, ignore_fields: Optional[Sequence[Union[str, int]]]=()
) ->'NullObject':
"""Clone object into pdf_dest."""
- pass
+ return NullObject()
def __repr__(self) ->str:
return 'NullObject'
@@ -85,7 +89,7 @@ class BooleanObject(PdfObject):
False, ignore_fields: Optional[Sequence[Union[str, int]]]=()
) ->'BooleanObject':
"""Clone object into pdf_dest."""
- pass
+ return BooleanObject(self.value)
def __eq__(self, __o: object) ->bool:
if isinstance(__o, BooleanObject):
@@ -110,7 +114,7 @@ class IndirectObject(PdfObject):
False, ignore_fields: Optional[Sequence[Union[str, int]]]=()
) ->'IndirectObject':
"""Clone object into pdf_dest."""
- pass
+ return IndirectObject(self.idnum, self.generation, pdf_dest)
def __deepcopy__(self, memo: Any) ->'IndirectObject':
return IndirectObject(self.idnum, self.generation, self.pdf)
@@ -162,7 +166,7 @@ class FloatObject(float, PdfObject):
def clone(self, pdf_dest: Any, force_duplicate: bool=False,
ignore_fields: Optional[Sequence[Union[str, int]]]=()) ->'FloatObject':
"""Clone object into pdf_dest."""
- pass
+ return FloatObject(self)
def __repr__(self) ->str:
return self.myrepr()
@@ -183,7 +187,7 @@ class NumberObject(int, PdfObject):
ignore_fields: Optional[Sequence[Union[str, int]]]=()
) ->'NumberObject':
"""Clone object into pdf_dest."""
- pass
+ return NumberObject(self)
class ByteStringObject(bytes, PdfObject):
@@ -199,12 +203,12 @@ class ByteStringObject(bytes, PdfObject):
ignore_fields: Optional[Sequence[Union[str, int]]]=()
) ->'ByteStringObject':
"""Clone object into pdf_dest."""
- pass
+ return ByteStringObject(self)
@property
def original_bytes(self) ->bytes:
"""For compatibility with TextStringObject.original_bytes."""
- pass
+ return bytes(self)
class TextStringObject(str, PdfObject):
@@ -241,7 +245,11 @@ class TextStringObject(str, PdfObject):
ignore_fields: Optional[Sequence[Union[str, int]]]=()
) ->'TextStringObject':
"""Clone object into pdf_dest."""
- pass
+ clone = TextStringObject(self)
+ clone.autodetect_utf16 = self.autodetect_utf16
+ clone.autodetect_pdfdocencoding = self.autodetect_pdfdocencoding
+ clone.utf16_bom = self.utf16_bom
+ return clone
@property
def original_bytes(self) ->bytes:
@@ -251,7 +259,12 @@ class TextStringObject(str, PdfObject):
if that occurs, this "original_bytes" property can be used to
back-calculate what the original encoded bytes were.
"""
- pass
+ if self.autodetect_utf16:
+ return self.utf16_bom + self.encode('utf-16be')
+ elif self.autodetect_pdfdocencoding:
+ return encode_pdfdocencoding(self)
+ else:
+ raise UnicodeEncodeError("No information about original bytes")
class NameObject(str, PdfObject):
@@ -264,5 +277,5 @@ class NameObject(str, PdfObject):
def clone(self, pdf_dest: Any, force_duplicate: bool=False,
ignore_fields: Optional[Sequence[Union[str, int]]]=()) ->'NameObject':
"""Clone object into pdf_dest."""
- pass
+ return NameObject(self)
CHARSETS = 'utf-8', 'gbk', 'latin1'
diff --git a/pypdf/generic/_data_structures.py b/pypdf/generic/_data_structures.py
index 5063153..123749e 100644
--- a/pypdf/generic/_data_structures.py
+++ b/pypdf/generic/_data_structures.py
@@ -33,11 +33,14 @@ class ArrayObject(List[Any], PdfObject):
False, ignore_fields: Optional[Sequence[Union[str, int]]]=()
) ->'ArrayObject':
"""Clone object into pdf_dest."""
- pass
+ return ArrayObject(
+ [obj.clone(pdf_dest, force_duplicate, ignore_fields) if isinstance(obj, PdfObject) else obj
+ for obj in self]
+ )
def items(self) ->Iterable[Any]:
"""Emulate DictionaryObject.items for a list (index, object)."""
- pass
+ return enumerate(self)
def __add__(self, lst: Any) ->'ArrayObject':
"""
@@ -88,7 +91,10 @@ class DictionaryObject(Dict[Any, Any], PdfObject):
False, ignore_fields: Optional[Sequence[Union[str, int]]]=()
) ->'DictionaryObject':
"""Clone object into pdf_dest."""
- pass
+ cloned = DictionaryObject()
+ visited = set()
+ self._clone(self, pdf_dest, force_duplicate, ignore_fields, visited)
+ return cloned
def _clone(self, src: 'DictionaryObject', pdf_dest: PdfWriterProtocol,
force_duplicate: bool, ignore_fields: Optional[Sequence[Union[str,
@@ -102,7 +108,16 @@ class DictionaryObject(Dict[Any, Any], PdfObject):
force_duplicate:
ignore_fields:
"""
- pass
+ for key, value in src.items():
+ if key in ignore_fields:
+ continue
+ if isinstance(value, PdfObject):
+ if hasattr(value, 'clone'):
+ self[key] = value.clone(pdf_dest, force_duplicate, ignore_fields)
+ else:
+ self[key] = value
+ else:
+ self[key] = value
def get_inherited(self, key: str, default: Any=None) ->Any:
"""
@@ -117,7 +132,14 @@ class DictionaryObject(Dict[Any, Any], PdfObject):
Returns:
Current key or inherited one, otherwise default value.
"""
- pass
+ try:
+ return self[key]
+ except KeyError:
+ if '/Parent' in self:
+ parent = self['/Parent']
+ if isinstance(parent, DictionaryObject):
+ return parent.get_inherited(key, default)
+ return default
def __setitem__(self, key: Any, value: Any) ->Any:
if not isinstance(key, PdfObject):
@@ -142,7 +164,16 @@ class DictionaryObject(Dict[Any, Any], PdfObject):
that can be used to access XMP metadata from the document. Can also
return None if no metadata was found on the document root.
"""
- pass
+ from ..xmp import XmpInformation
+
+ metadata = self.get("/Metadata", None)
+ if metadata is None:
+ return None
+ metadata = metadata.get_object()
+
+ if not isinstance(metadata, StreamObject):
+ return None
+ return XmpInformation(metadata)
class TreeObject(DictionaryObject):
@@ -166,11 +197,44 @@ class TreeObject(DictionaryObject):
cur:
last:
"""
- pass
+ if prev is None:
+ if last == cur:
+ self[NameObject("/First")] = NullObject()
+ self[NameObject("/Last")] = NullObject()
+ else:
+ self[NameObject("/First")] = cur.get("/Next")
+ else:
+ prev[NameObject("/Next")] = cur.get("/Next")
+ if last == cur:
+ self[NameObject("/Last")] = prev_ref
+
+ if cur.get("/Next") is not None:
+ next_ref = cur.get("/Next")
+ next_ref[NameObject("/Prev")] = prev_ref
+
+ self[NameObject("/Count")] = NumberObject(self["/Count"] - 1)
def remove_from_tree(self) ->None:
"""Remove the object from the tree it is in."""
- pass
+ if "/Parent" not in self:
+ return
+
+ parent = self["/Parent"]
+ prev = None
+ prev_ref = None
+ cur = parent["/First"]
+ last = parent["/Last"]
+
+ while cur is not None:
+ if cur == self:
+ parent._remove_node_from_tree(prev, prev_ref, cur, last)
+ break
+
+ prev = cur
+ prev_ref = parent.indirect_reference
+ cur = cur.get("/Next")
+
+ _reset_node_tree_relationship(self)
def _reset_node_tree_relationship(child_obj: Any) ->None:
@@ -182,7 +246,9 @@ def _reset_node_tree_relationship(child_obj: Any) ->None:
Args:
child_obj:
"""
- pass
+ del child_obj["/Parent"]
+ del child_obj["/Next"]
+ del child_obj["/Prev"]
class StreamObject(DictionaryObject):
diff --git a/pypdf/generic/_fit.py b/pypdf/generic/_fit.py
index c30240c..ff6b136 100644
--- a/pypdf/generic/_fit.py
+++ b/pypdf/generic/_fit.py
@@ -31,7 +31,7 @@ class Fit:
Returns:
The created fit object.
"""
- pass
+ return cls('XYZ', (left, top, zoom))
@classmethod
def fit(cls) ->'Fit':
@@ -44,7 +44,7 @@ class Fit:
different, use the smaller of the two, centering the page within the
window in the other dimension.
"""
- pass
+ return cls('Fit')
@classmethod
def fit_horizontally(cls, top: Optional[float]=None) ->'Fit':
@@ -63,7 +63,7 @@ class Fit:
Returns:
The created fit object.
"""
- pass
+ return cls('FitH', (top,))
@classmethod
def fit_rectangle(cls, left: Optional[float]=None, bottom: Optional[
@@ -91,7 +91,7 @@ class Fit:
Returns:
The created fit object.
"""
- pass
+ return cls('FitR', (left, bottom, right, top))
@classmethod
def fit_box(cls) ->'Fit':
@@ -104,7 +104,7 @@ class Fit:
different, use the smaller of the two, centering the bounding box
within the window in the other dimension.
"""
- pass
+ return cls('FitB')
@classmethod
def fit_box_horizontally(cls, top: Optional[float]=None) ->'Fit':
@@ -123,7 +123,7 @@ class Fit:
Returns:
The created fit object.
"""
- pass
+ return cls('FitBH', (top,))
@classmethod
def fit_box_vertically(cls, left: Optional[float]=None) ->'Fit':
@@ -142,7 +142,7 @@ class Fit:
Returns:
The created fit object.
"""
- pass
+ return cls('FitBV', (left,))
def __str__(self) ->str:
if not self.fit_args:
diff --git a/pypdf/generic/_image_inline.py b/pypdf/generic/_image_inline.py
index 8a46ea7..7fc12c9 100644
--- a/pypdf/generic/_image_inline.py
+++ b/pypdf/generic/_image_inline.py
@@ -11,7 +11,18 @@ def extract_inline_AHx(stream: StreamType) ->bytes:
Extract HexEncoded Stream from Inline Image.
the stream will be moved onto the EI
"""
- pass
+ data = BytesIO()
+ while True:
+ tok = read_non_whitespace(stream)
+ if tok == b'>':
+ stream.read(1) # Move past the '>'
+ break
+ elif tok not in b'0123456789ABCDEFabcdef':
+ raise PdfReadError(f"Unexpected token in ASCIIHexDecode: {tok}")
+ data.write(tok)
+
+ hex_data = data.getvalue()
+ return bytes.fromhex(hex_data.decode('ascii'))
def extract_inline_A85(stream: StreamType) ->bytes:
@@ -19,7 +30,19 @@ def extract_inline_A85(stream: StreamType) ->bytes:
Extract A85 Stream from Inline Image.
the stream will be moved onto the EI
"""
- pass
+ data = BytesIO()
+ while True:
+ chunk = stream.read(BUFFER_SIZE)
+ if not chunk:
+ raise PdfReadError("Unexpected EOF in ASCII85Decode")
+ if b'~>' in chunk:
+ data.write(chunk[:chunk.index(b'~>')])
+ stream.seek(stream.tell() - len(chunk) + chunk.index(b'~>') + 2)
+ break
+ data.write(chunk)
+
+ ascii85_data = data.getvalue()
+ return ascii85_data.decode('ascii85')
def extract_inline_RL(stream: StreamType) ->bytes:
@@ -27,7 +50,19 @@ def extract_inline_RL(stream: StreamType) ->bytes:
Extract RL Stream from Inline Image.
the stream will be moved onto the EI
"""
- pass
+ data = BytesIO()
+ while True:
+ byte = stream.read(1)
+ if not byte:
+ raise PdfReadError("Unexpected EOF in RunLengthDecode")
+ if byte == b'\x80':
+ break
+ if ord(byte) < 128:
+ data.write(stream.read(ord(byte) + 1))
+ else:
+ data.write(stream.read(1) * (257 - ord(byte)))
+
+ return data.getvalue()
def extract_inline_DCT(stream: StreamType) ->bytes:
@@ -35,7 +70,18 @@ def extract_inline_DCT(stream: StreamType) ->bytes:
Extract DCT (JPEG) Stream from Inline Image.
the stream will be moved onto the EI
"""
- pass
+ data = BytesIO()
+ while True:
+ chunk = stream.read(BUFFER_SIZE)
+ if not chunk:
+ raise PdfReadError("Unexpected EOF in DCTDecode")
+ data.write(chunk)
+ if b'\xff\xd9' in chunk: # JPEG EOI marker
+ break
+
+ jpeg_data = data.getvalue()
+ stream.seek(stream.tell() - len(chunk) + chunk.rindex(b'\xff\xd9') + 2)
+ return jpeg_data
def extract_inline_default(stream: StreamType) ->bytes:
@@ -43,4 +89,20 @@ def extract_inline_default(stream: StreamType) ->bytes:
Legacy method
used by default
"""
- pass
+ data = BytesIO()
+ while True:
+ tok = stream.read(1)
+ if not tok:
+ raise PdfReadError("Unexpected EOF in inline image")
+ if tok in WHITESPACES:
+ continue
+ if tok == b'E':
+ tok2 = stream.read(1)
+ if tok2 == b'I':
+ stream.seek(-2, 1)
+ break
+ else:
+ stream.seek(-1, 1)
+ data.write(tok)
+
+ return data.getvalue()
diff --git a/pypdf/generic/_outline.py b/pypdf/generic/_outline.py
index 3c300f6..67c1826 100644
--- a/pypdf/generic/_outline.py
+++ b/pypdf/generic/_outline.py
@@ -1,8 +1,84 @@
-from typing import Union
+from typing import Union, Optional, List
from .._utils import StreamType, deprecate_no_replacement
-from ._base import NameObject
+from ._base import NameObject, BooleanObject, ArrayObject, DictionaryObject
from ._data_structures import Destination
class OutlineItem(Destination):
- pass
+ def __init__(self, title: str, page: Union[int, Destination], color: Optional[List[float]] = None,
+ bold: bool = False, italic: bool = False, fit: Union[str, NameObject] = "/Fit") -> None:
+ super().__init__(page, fit)
+ self.title = title
+ self.color = color
+ self.bold = bold
+ self.italic = italic
+ self.parent = None
+ self.prev = None
+ self.next = None
+ self.first = None
+ self.last = None
+ self.count = 0
+
+ def get_object(self) -> DictionaryObject:
+ obj = DictionaryObject()
+ obj[NameObject("/Title")] = self.title
+
+ # Add destination
+ obj.update(super().get_object())
+
+ # Add color if specified
+ if self.color:
+ obj[NameObject("/C")] = ArrayObject([float(c) for c in self.color])
+
+ # Add text format flags
+ if self.bold or self.italic:
+ format_flag = 0
+ if self.bold:
+ format_flag += 2
+ if self.italic:
+ format_flag += 1
+ obj[NameObject("/F")] = format_flag
+
+ # Add structural attributes
+ if self.parent:
+ obj[NameObject("/Parent")] = self.parent
+ if self.prev:
+ obj[NameObject("/Prev")] = self.prev
+ if self.next:
+ obj[NameObject("/Next")] = self.next
+ if self.first:
+ obj[NameObject("/First")] = self.first
+ if self.last:
+ obj[NameObject("/Last")] = self.last
+ if self.count:
+ obj[NameObject("/Count")] = self.count
+
+ return obj
+
+ def add_child(self, child: 'OutlineItem') -> None:
+ child.parent = self
+ if self.first is None:
+ self.first = child
+ self.last = child
+ else:
+ child.prev = self.last
+ self.last.next = child
+ self.last = child
+ self.count += 1
+
+ def add_sibling(self, sibling: 'OutlineItem') -> None:
+ if self.parent:
+ self.parent.add_child(sibling)
+ else:
+ raise ValueError("Cannot add sibling to root outline item")
+
+ @property
+ def is_closed(self) -> bool:
+ return self.count < 0
+
+ @is_closed.setter
+ def is_closed(self, value: bool) -> None:
+ if value:
+ self.count = -abs(self.count)
+ else:
+ self.count = abs(self.count)
diff --git a/pypdf/generic/_rectangle.py b/pypdf/generic/_rectangle.py
index 5e885b2..5684dec 100644
--- a/pypdf/generic/_rectangle.py
+++ b/pypdf/generic/_rectangle.py
@@ -30,7 +30,12 @@ class RectangleObject(ArrayObject):
Property to read and modify the lower left coordinate of this box
in (x,y) form.
"""
- pass
+ return (float(self[0]), float(self[1]))
+
+ @lower_left.setter
+ def lower_left(self, value: Tuple[float, float]) ->None:
+ self[0] = self._ensure_is_number(value[0])
+ self[1] = self._ensure_is_number(value[1])
@property
def lower_right(self) ->Tuple[float, float]:
@@ -38,7 +43,12 @@ class RectangleObject(ArrayObject):
Property to read and modify the lower right coordinate of this box
in (x,y) form.
"""
- pass
+ return (float(self[2]), float(self[1]))
+
+ @lower_right.setter
+ def lower_right(self, value: Tuple[float, float]) ->None:
+ self[2] = self._ensure_is_number(value[0])
+ self[1] = self._ensure_is_number(value[1])
@property
def upper_left(self) ->Tuple[float, float]:
@@ -46,7 +56,12 @@ class RectangleObject(ArrayObject):
Property to read and modify the upper left coordinate of this box
in (x,y) form.
"""
- pass
+ return (float(self[0]), float(self[3]))
+
+ @upper_left.setter
+ def upper_left(self, value: Tuple[float, float]) ->None:
+ self[0] = self._ensure_is_number(value[0])
+ self[3] = self._ensure_is_number(value[1])
@property
def upper_right(self) ->Tuple[float, float]:
@@ -54,4 +69,9 @@ class RectangleObject(ArrayObject):
Property to read and modify the upper right coordinate of this box
in (x,y) form.
"""
- pass
+ return (float(self[2]), float(self[3]))
+
+ @upper_right.setter
+ def upper_right(self, value: Tuple[float, float]) ->None:
+ self[2] = self._ensure_is_number(value[0])
+ self[3] = self._ensure_is_number(value[1])
diff --git a/pypdf/generic/_utils.py b/pypdf/generic/_utils.py
index f259fc9..c619857 100644
--- a/pypdf/generic/_utils.py
+++ b/pypdf/generic/_utils.py
@@ -17,9 +17,34 @@ def create_string_object(string: Union[str, bytes], forced_encoding: Union[
forced_encoding: Typically None, or an encoding string
Returns:
- A ByteStringObject
+ A ByteStringObject or TextStringObject
Raises:
TypeError: If string is not of type str or bytes.
"""
- pass
+ if isinstance(string, str):
+ return TextStringObject(string)
+ elif isinstance(string, bytes):
+ if forced_encoding:
+ if isinstance(forced_encoding, str):
+ return TextStringObject(string.decode(forced_encoding))
+ elif isinstance(forced_encoding, list):
+ for encoding in forced_encoding:
+ try:
+ return TextStringObject(string.decode(encoding))
+ except UnicodeDecodeError:
+ pass
+ elif isinstance(forced_encoding, dict):
+ try:
+ return TextStringObject(codecs.decode(string, _pdfdoc_encoding))
+ except UnicodeDecodeError:
+ pass
+ try:
+ return TextStringObject(string.decode('utf-16'))
+ except UnicodeDecodeError:
+ try:
+ return TextStringObject(string.decode('utf-8'))
+ except UnicodeDecodeError:
+ return ByteStringObject(string)
+ else:
+ raise TypeError("string must be of type str or bytes")
diff --git a/pypdf/generic/_viewerpref.py b/pypdf/generic/_viewerpref.py
index af352e7..2b590a0 100644
--- a/pypdf/generic/_viewerpref.py
+++ b/pypdf/generic/_viewerpref.py
@@ -73,3 +73,29 @@ class ViewerPreferences(DictionaryObject):
self.indirect_reference = obj.indirect_reference
except AttributeError:
pass
+
+ def _get_bool(self, key: str, default: Optional[BooleanObject]) -> Optional[BooleanObject]:
+ return self.get(key, default)
+
+ def _set_bool(self, key: str, value: bool) -> None:
+ self[key] = BooleanObject(value)
+
+ def _get_name(self, key: str, default: Optional[NameObject]) -> Optional[NameObject]:
+ return self.get(key, default)
+
+ def _set_name(self, key: str, allowed_values: List[str], value: str) -> None:
+ if value not in allowed_values:
+ raise ValueError(f"Invalid value for {key}. Allowed values are: {allowed_values}")
+ self[key] = NameObject(value)
+
+ def _get_arr(self, key: str, default: Optional[ArrayObject]) -> Optional[ArrayObject]:
+ return self.get(key, default)
+
+ def _set_arr(self, key: str, value: List[Any]) -> None:
+ self[key] = ArrayObject(value)
+
+ def _get_int(self, key: str, default: Optional[int]) -> Optional[int]:
+ return self.get(key, default)
+
+ def _set_int(self, key: str, value: int) -> None:
+ self[key] = NumberObject(value)
diff --git a/pypdf/pagerange.py b/pypdf/pagerange.py
index 2bc1277..3254af6 100644
--- a/pypdf/pagerange.py
+++ b/pypdf/pagerange.py
@@ -81,11 +81,15 @@ class PageRange:
Returns:
True, if the ``input`` is a valid PageRange.
"""
- pass
+ if isinstance(input, (PageRange, slice)):
+ return True
+ if isinstance(input, str):
+ return bool(re.match(PAGE_RANGE_RE, input))
+ return False
def to_slice(self) ->slice:
"""Return the slice equivalent of this page range."""
- pass
+ return self._slice
def __str__(self) ->str:
"""A string like "1:2:3"."""
@@ -116,7 +120,7 @@ class PageRange:
Returns:
Arguments for range().
"""
- pass
+ return self._slice.indices(n)
def __eq__(self, other: object) ->bool:
if not isinstance(other, PageRange):
@@ -153,7 +157,32 @@ def parse_filename_page_ranges(args: List[Union[str, PageRange, None]]) ->List[
Returns:
A list of (filename, page_range) pairs.
"""
- pass
+ result = []
+ current_filename = None
+
+ for arg in args:
+ if arg is None:
+ continue
+ if isinstance(arg, str):
+ if PageRange.valid(arg):
+ if current_filename is None:
+ raise ValueError("Page range specified before filename")
+ result.append((current_filename, PageRange(arg)))
+ else:
+ if current_filename is not None:
+ result.append((current_filename, PAGE_RANGE_ALL))
+ current_filename = arg
+ elif isinstance(arg, (PageRange, slice)):
+ if current_filename is None:
+ raise ValueError("Page range specified before filename")
+ result.append((current_filename, PageRange(arg)))
+ else:
+ raise TypeError(f"Unexpected argument type: {type(arg)}")
+
+ if current_filename is not None:
+ result.append((current_filename, PAGE_RANGE_ALL))
+
+ return result
PageRangeSpec = Union[str, PageRange, Tuple[int, int], Tuple[int, int, int],
diff --git a/pypdf/xmp.py b/pypdf/xmp.py
index 78c923e..a300605 100644
--- a/pypdf/xmp.py
+++ b/pypdf/xmp.py
@@ -146,4 +146,11 @@ class XmpInformation(PdfObject):
Returns:
A dictionary of key/value items for custom metadata properties.
"""
- pass
+ if 'custom_properties' not in self.cache:
+ self.cache['custom_properties'] = {}
+ properties = self.rdf_root.getElementsByTagNameNS(PDFX_NAMESPACE, 'property')
+ for prop in properties:
+ key = prop.getAttribute('pdfx:name')
+ value = prop.firstChild.nodeValue if prop.firstChild else None
+ self.cache['custom_properties'][key] = value
+ return self.cache['custom_properties']