back to Claude Sonnet 3.5 - Fill-in summary
Claude Sonnet 3.5 - Fill-in: pydantic
Failed to run pytests for test tests
ImportError while loading conftest '/testbed/tests/conftest.py'.
tests/conftest.py:17: in <module>
from pydantic import GenerateSchema
pydantic/__init__.py:404: in __getattr__
module = import_module(module_name, package=package)
pydantic/_internal/_generate_schema.py:26: in <module>
from ..json_schema import JsonSchemaValue
pydantic/json_schema.py:27: in <module>
from ._internal import _config, _core_metadata, _core_utils, _decorators, _internal_dataclass, _mock_val_ser, _schema_generation_shared, _typing_extra
pydantic/_internal/_core_utils.py:8: in <module>
from . import _repr
pydantic/_internal/_repr.py:7: in <module>
from . import _typing_extra
E File "/testbed/pydantic/_internal/_typing_extra.py", line 241
E def is_self_type(tp: Any) ->bool:
E ^^^
E IndentationError: expected an indented block after 'if' statement on line 238
Patch diff
diff --git a/pydantic/_internal/_config.py b/pydantic/_internal/_config.py
index 9809c1f3..8e69c9cf 100644
--- a/pydantic/_internal/_config.py
+++ b/pydantic/_internal/_config.py
@@ -89,7 +89,17 @@ class ConfigWrapper:
Returns:
A `ConfigWrapper` instance for `BaseModel`.
"""
- pass
+ config_dict = {}
+ for base in reversed(bases):
+ if hasattr(base, '__pydantic_config__'):
+ config_dict.update(base.__pydantic_config__.config_dict)
+
+ if 'Config' in namespace:
+ config_dict.update(prepare_config(namespace['Config']))
+
+ config_dict.update({k: v for k, v in kwargs.items() if k in config_keys})
+
+ return cls(config_dict)
if not TYPE_CHECKING:
def __getattr__(self, name: str) ->Any:
@@ -115,7 +125,20 @@ class ConfigWrapper:
Returns:
A `CoreConfig` object created from config.
"""
- pass
+ config = {}
+ if 'title' in self.config_dict:
+ config['title'] = self.config_dict['title']
+ elif obj is not None:
+ config['title'] = obj.__name__
+
+ for key in ['strict', 'from_attributes', 'populate_by_name', 'str_to_lower', 'str_to_upper',
+ 'str_strip_whitespace', 'str_min_length', 'str_max_length', 'frozen',
+ 'extra', 'ser_json_timedelta', 'ser_json_bytes', 'ser_json_inf_nan',
+ 'validate_default', 'hide_input_in_errors']:
+ if key in self.config_dict:
+ config[key] = self.config_dict[key]
+
+ return core_schema.CoreConfig(**config)
def __repr__(self):
c = ', '.join(f'{k}={v!r}' for k, v in self.config_dict.items())
@@ -157,7 +180,18 @@ def prepare_config(config: (ConfigDict | dict[str, Any] | type[Any] | None)
Returns:
A ConfigDict object created from config.
"""
- pass
+ if config is None:
+ return ConfigDict()
+ elif isinstance(config, dict):
+ return ConfigDict(**config)
+ elif isinstance(config, type):
+ if hasattr(config, '__pydantic_config__'):
+ return config.__pydantic_config__.config_dict.copy()
+ else:
+ warnings.warn(DEPRECATION_MESSAGE, DeprecationWarning, stacklevel=2)
+ return ConfigDict(**{k: getattr(config, k) for k in dir(config) if not k.startswith('__')})
+ else:
+ raise TypeError(f"Config must be a ConfigDict, dict, class, or None, not {type(config)}")
config_keys = set(ConfigDict.__annotations__.keys())
@@ -179,4 +213,17 @@ def check_deprecated(config_dict: ConfigDict) ->None:
Args:
config_dict: The input config.
"""
- pass
+ for key in config_dict:
+ if key in V2_REMOVED_KEYS:
+ warnings.warn(
+ f"Config key '{key}' has been removed in V2",
+ DeprecationWarning,
+ stacklevel=2
+ )
+ elif key in V2_RENAMED_KEYS:
+ new_key = V2_RENAMED_KEYS[key]
+ warnings.warn(
+ f"Config key '{key}' has been renamed to '{new_key}' in V2",
+ DeprecationWarning,
+ stacklevel=2
+ )
diff --git a/pydantic/_internal/_core_metadata.py b/pydantic/_internal/_core_metadata.py
index e9361dbe..4bccb34f 100644
--- a/pydantic/_internal/_core_metadata.py
+++ b/pydantic/_internal/_core_metadata.py
@@ -43,7 +43,10 @@ class CoreMetadataHandler:
"""Retrieves the metadata dict from the schema, initializing it to a dict if it is None
and raises an error if it is not a dict.
"""
- pass
+ metadata = self._schema['metadata']
+ if not isinstance(metadata, dict):
+ raise TypeError(f'CoreSchema metadata should be a dict; got {metadata!r}.')
+ return typing.cast(CoreMetadata, metadata)
def build_metadata_dict(*, js_functions: (list[GetJsonSchemaFunction] |
@@ -54,4 +57,19 @@ def build_metadata_dict(*, js_functions: (list[GetJsonSchemaFunction] |
"""Builds a dict to use as the metadata field of a CoreSchema object in a manner that is consistent
with the CoreMetadataHandler class.
"""
- pass
+ metadata = CoreMetadata()
+ if initial_metadata:
+ if not isinstance(initial_metadata, dict):
+ raise TypeError(f'initial_metadata should be a dict; got {initial_metadata!r}.')
+ metadata.update(initial_metadata)
+
+ if js_functions:
+ metadata['pydantic_js_functions'] = js_functions
+ if js_annotation_functions:
+ metadata['pydantic_js_annotation_functions'] = js_annotation_functions
+ if js_prefer_positional_arguments is not None:
+ metadata['pydantic_js_prefer_positional_arguments'] = js_prefer_positional_arguments
+ if typed_dict_cls:
+ metadata['pydantic_typed_dict_cls'] = typed_dict_cls
+
+ return metadata
diff --git a/pydantic/_internal/_core_utils.py b/pydantic/_internal/_core_utils.py
index 5f858e63..85d9b519 100644
--- a/pydantic/_internal/_core_utils.py
+++ b/pydantic/_internal/_core_utils.py
@@ -38,14 +38,25 @@ def get_type_ref(type_: type[Any], args_override: (tuple[type[Any], ...] |
This `args_override` argument was added for the purpose of creating valid recursive references
when creating generic models without needing to create a concrete class.
"""
- pass
+ if args_override is not None:
+ type_args = args_override
+ elif is_generic_alias(type_):
+ type_args = get_args(type_)
+ else:
+ type_args = ()
+
+ type_name = type_.__name__
+ if type_args:
+ args_str = ','.join(arg.__name__ for arg in type_args)
+ return f'{type_name}[{args_str}]'
+ return type_name
def get_ref(s: core_schema.CoreSchema) ->(None | str):
"""Get the ref from the schema if it has one.
This exists just for type checking to work correctly.
"""
- pass
+ return s.get('ref')
T = TypeVar('T')
@@ -77,7 +88,7 @@ def walk_core_schema(schema: core_schema.CoreSchema, f: Walk
Returns:
core_schema.CoreSchema: A processed CoreSchema.
"""
- pass
+ return _dispatch(schema, f)
def pretty_print_core_schema(schema: CoreSchema, include_metadata: bool=False
@@ -89,4 +100,19 @@ def pretty_print_core_schema(schema: CoreSchema, include_metadata: bool=False
schema: The CoreSchema to print.
include_metadata: Whether to include metadata in the output. Defaults to `False`.
"""
- pass
+ try:
+ from rich import print as rich_print
+ from rich.pretty import Pretty
+ except ImportError:
+ print("Rich library is not installed. Please install it to use this function.")
+ return
+
+ def _process_schema(s: CoreSchema) -> dict:
+ result = {k: v for k, v in s.items() if k != 'metadata' or include_metadata}
+ for key, value in result.items():
+ if isinstance(value, dict) and 'type' in value:
+ result[key] = _process_schema(value)
+ return result
+
+ processed_schema = _process_schema(schema)
+ rich_print(Pretty(processed_schema, expand_all=True))
diff --git a/pydantic/_internal/_dataclasses.py b/pydantic/_internal/_dataclasses.py
index 35f40c50..7c67b501 100644
--- a/pydantic/_internal/_dataclasses.py
+++ b/pydantic/_internal/_dataclasses.py
@@ -65,7 +65,8 @@ def set_dataclass_fields(cls: type[StandardDataclass], types_namespace: (
types_namespace: The types namespace, defaults to `None`.
config_wrapper: The config wrapper instance, defaults to `None`.
"""
- pass
+ fields = collect_dataclass_fields(cls, types_namespace, config_wrapper)
+ cls.__pydantic_fields__ = fields
def complete_dataclass(cls: type[Any], config_wrapper: _config.
@@ -89,7 +90,28 @@ def complete_dataclass(cls: type[Any], config_wrapper: _config.
Raises:
PydanticUndefinedAnnotation: If `raise_error` is `True` and there is an undefined annotations.
"""
- pass
+ try:
+ set_dataclass_fields(cls, types_namespace, config_wrapper)
+
+ generate_schema = GenerateSchema(config_wrapper)
+ core_schema = generate_schema.generate_schema(cls)
+
+ cls.__pydantic_core_schema__ = core_schema
+ cls.__pydantic_validator__ = create_schema_validator(
+ core_schema,
+ cls,
+ config_wrapper,
+ CallbackGetCoreSchemaHandler(generate_schema)
+ )
+ cls.__pydantic_serializer__ = SchemaSerializer(core_schema)
+
+ set_dataclass_mocks(cls)
+ cls.__pydantic_complete__ = True
+ return True
+ except PydanticUndefinedAnnotation:
+ if raise_errors:
+ raise
+ return False
def is_builtin_dataclass(_cls: type[Any]) ->TypeGuard[type[StandardDataclass]]:
@@ -122,4 +144,13 @@ def is_builtin_dataclass(_cls: type[Any]) ->TypeGuard[type[StandardDataclass]]:
Returns:
`True` if the class is a stdlib dataclass, `False` otherwise.
"""
- pass
+ if not dataclasses.is_dataclass(_cls):
+ return False
+
+ if hasattr(_cls, '__pydantic_validator__'):
+ return False
+
+ annotations = _typing_extra.get_class_annotations(_cls)
+ dataclass_fields = set(_cls.__dataclass_fields__.keys())
+
+ return set(annotations.keys()).issubset(dataclass_fields)
diff --git a/pydantic/_internal/_decorators.py b/pydantic/_internal/_decorators.py
index cc38227b..eac12643 100644
--- a/pydantic/_internal/_decorators.py
+++ b/pydantic/_internal/_decorators.py
@@ -214,7 +214,9 @@ class Decorator(Generic[DecoratorInfoType]):
Returns:
The new decorator instance.
"""
- pass
+ cls_ref = get_type_ref(cls_)
+ func = get_attribute_from_base_dicts(cls_, cls_var_name)
+ return Decorator(cls_ref=cls_ref, cls_var_name=cls_var_name, func=func, shim=shim, info=info)
def bind_to_cls(self, cls: Any) ->Decorator[DecoratorInfoType]:
"""Bind the decorator to a class.
@@ -225,7 +227,9 @@ class Decorator(Generic[DecoratorInfoType]):
Returns:
The new decorator instance.
"""
- pass
+ cls_ref = get_type_ref(cls)
+ func = get_attribute_from_base_dicts(cls, self.cls_var_name)
+ return Decorator(cls_ref=cls_ref, cls_var_name=self.cls_var_name, func=func, shim=self.shim, info=self.info)
def get_bases(tp: type[Any]) ->tuple[type[Any], ...]:
@@ -237,7 +241,9 @@ def get_bases(tp: type[Any]) ->tuple[type[Any], ...]:
Returns:
The base classes.
"""
- pass
+ if is_typeddict(tp):
+ return tp.__orig_bases__
+ return tp.__bases__
def mro(tp: type[Any]) ->tuple[type[Any], ...]:
@@ -245,7 +251,28 @@ def mro(tp: type[Any]) ->tuple[type[Any], ...]:
See https://www.python.org/download/releases/2.3/mro/
"""
- pass
+ if hasattr(tp, '__mro__'):
+ return tp.__mro__
+ else:
+ # Implement C3 linearization for types without __mro__
+ def merge(seqs):
+ res = []
+ i = 0
+ while True:
+ nonempty = [seq for seq in seqs if seq]
+ if not nonempty:
+ return tuple(res)
+ for seq in nonempty:
+ head = seq[0]
+ if not any(head in s[1:] for s in nonempty):
+ break
+ else:
+ raise TypeError("Inconsistent hierarchy")
+ res.append(head)
+ for seq in nonempty:
+ if seq[0] == head:
+ del seq[0]
+ return tuple(merge([[tp]] + [list(mro(base)) for base in get_bases(tp)] + [list(get_bases(tp))]))
_sentinel = object()
@@ -271,7 +298,16 @@ def get_attribute_from_bases(tp: (type[Any] | tuple[type[Any], ...]), name: str
Raises:
AttributeError: If the attribute is not found in any class in the MRO.
"""
- pass
+ if isinstance(tp, tuple):
+ classes = tp
+ else:
+ classes = mro(tp)
+
+ for cls in classes:
+ if name in cls.__dict__:
+ return cls.__dict__[name]
+
+ raise AttributeError(f"'{tp.__name__}' object has no attribute '{name}'")
def get_attribute_from_base_dicts(tp: type[Any], name: str) ->Any:
@@ -289,7 +325,12 @@ def get_attribute_from_base_dicts(tp: type[Any], name: str) ->Any:
Raises:
KeyError: If the attribute is not found in any class's `__dict__` in the MRO.
"""
- pass
+ for cls in mro(tp):
+ try:
+ return cls.__dict__[name]
+ except KeyError:
+ continue
+ raise KeyError(f"'{name}' not found in '{tp.__name__}' or its bases")
@dataclass(**slots_true)
@@ -328,7 +369,26 @@ class DecoratorInfos:
If we do replace any functions we put the replacement into the position
the replaced function was in; that is, we maintain the order.
"""
- pass
+ decorator_infos = DecoratorInfos()
+ for base in reversed(mro(model_dc)):
+ for name, value in base.__dict__.items():
+ if isinstance(value, PydanticDescriptorProxy):
+ decorator = Decorator.build(base, cls_var_name=name, shim=value.shim, info=value.decorator_info)
+ if isinstance(value.decorator_info, ValidatorDecoratorInfo):
+ decorator_infos.validators[name] = decorator
+ elif isinstance(value.decorator_info, FieldValidatorDecoratorInfo):
+ decorator_infos.field_validators[name] = decorator
+ elif isinstance(value.decorator_info, RootValidatorDecoratorInfo):
+ decorator_infos.root_validators[name] = decorator
+ elif isinstance(value.decorator_info, FieldSerializerDecoratorInfo):
+ decorator_infos.field_serializers[name] = decorator
+ elif isinstance(value.decorator_info, ModelSerializerDecoratorInfo):
+ decorator_infos.model_serializers[name] = decorator
+ elif isinstance(value.decorator_info, ModelValidatorDecoratorInfo):
+ decorator_infos.model_validators[name] = decorator
+ elif isinstance(value.decorator_info, ComputedFieldInfo):
+ decorator_infos.computed_fields[name] = decorator
+ return decorator_infos
def inspect_validator(validator: Callable[..., Any], mode: FieldValidatorModes
@@ -344,7 +404,25 @@ def inspect_validator(validator: Callable[..., Any], mode: FieldValidatorModes
Returns:
Whether the validator takes an info argument.
"""
- pass
+ sig = signature(validator)
+ params = list(sig.parameters.values())
+
+ if mode == 'before':
+ if len(params) == 1:
+ return False
+ elif len(params) == 2 and params[1].name == 'info':
+ return True
+ else:
+ raise ValueError(f"Invalid signature for 'before' validator: {sig}")
+ elif mode == 'after':
+ if len(params) == 2:
+ return False
+ elif len(params) == 3 and params[2].name == 'info':
+ return True
+ else:
+ raise ValueError(f"Invalid signature for 'after' validator: {sig}")
+ else:
+ raise ValueError(f"Invalid validator mode: {mode}")
def inspect_field_serializer(serializer: Callable[..., Any], mode: Literal[
@@ -363,7 +441,25 @@ def inspect_field_serializer(serializer: Callable[..., Any], mode: Literal[
Returns:
Tuple of (is_field_serializer, info_arg).
"""
- pass
+ sig = signature(serializer)
+ params = list(sig.parameters.values())
+
+ if mode == 'plain':
+ if len(params) == 1:
+ return True, False
+ elif len(params) == 2 and params[1].name == 'info':
+ return True, True
+ else:
+ raise ValueError(f"Invalid signature for 'plain' field serializer: {sig}")
+ elif mode == 'wrap':
+ if len(params) == 2:
+ return True, False
+ elif len(params) == 3 and params[2].name == 'info':
+ return True, True
+ else:
+ raise ValueError(f"Invalid signature for 'wrap' field serializer: {sig}")
+ else:
+ raise ValueError(f"Invalid serializer mode: {mode}")
def inspect_annotated_serializer(serializer: Callable[..., Any], mode:
@@ -379,7 +475,15 @@ def inspect_annotated_serializer(serializer: Callable[..., Any], mode:
Returns:
info_arg
"""
- pass
+ sig = signature(serializer)
+ params = list(sig.parameters.values())
+
+ if mode == 'plain':
+ return len(params) == 2 and params[1].name == 'info'
+ elif mode == 'wrap':
+ return len(params) == 3 and params[2].name == 'info'
+ else:
+ raise ValueError(f"Invalid serializer mode: {mode}")
def inspect_model_serializer(serializer: Callable[..., Any], mode: Literal[
@@ -395,7 +499,15 @@ def inspect_model_serializer(serializer: Callable[..., Any], mode: Literal[
Returns:
`info_arg` - whether the function expects an info argument.
"""
- pass
+ sig = signature(serializer)
+ params = list(sig.parameters.values())
+
+ if mode == 'plain':
+ return len(params) == 2 and params[1].name == 'info'
+ elif mode == 'wrap':
+ return len(params) == 3 and params[2].name == 'info'
+ else:
+ raise ValueError(f"Invalid serializer mode: {mode}")
AnyDecoratorCallable: TypeAlias = (
@@ -415,7 +527,9 @@ def is_instance_method_from_sig(function: AnyDecoratorCallable) ->bool:
Returns:
`True` if the function is an instance method, `False` otherwise.
"""
- pass
+ sig = signature(function)
+ params = list(sig.parameters.values())
+ return len(params) > 0 and params[0].name == 'self'
def ensure_classmethod_based_on_signature(function: AnyDecoratorCallable
@@ -428,7 +542,12 @@ def ensure_classmethod_based_on_signature(function: AnyDecoratorCallable
Return:
The `@classmethod` decorator applied function.
"""
- pass
+ if not isinstance(function, (classmethod, staticmethod)):
+ sig = signature(function)
+ params = list(sig.parameters.values())
+ if len(params) > 0 and params[0].name == 'cls':
+ return classmethod(function)
+ return function
def unwrap_wrapped_function(func: Any, *, unwrap_partial: bool=True,
@@ -445,7 +564,16 @@ def unwrap_wrapped_function(func: Any, *, unwrap_partial: bool=True,
Returns:
The underlying function of the wrapped function.
"""
- pass
+ while True:
+ if isinstance(func, property):
+ func = func.fget
+ elif unwrap_partial and isinstance(func, (partial, partialmethod)):
+ func = func.func
+ elif unwrap_class_static_method and isinstance(func, (classmethod, staticmethod)):
+ func = func.__func__
+ else:
+ break
+ return func
def get_function_return_type(func: Any, explicit_return_type: Any,
@@ -463,7 +591,11 @@ def get_function_return_type(func: Any, explicit_return_type: Any,
Returns:
The function return type.
"""
- pass
+ if explicit_return_type is not None:
+ return explicit_return_type
+
+ type_hints = get_function_type_hints(func, types_namespace)
+ return type_hints.get('return', Any)
def count_positional_required_params(sig: Signature) ->int:
@@ -476,7 +608,16 @@ def count_positional_required_params(sig: Signature) ->int:
Returns:
The number of positional arguments of a signature.
"""
- pass
+ count = 0
+ for param in sig.parameters.values():
+ if param.kind in (Parameter.POSITIONAL_ONLY, Parameter.POSITIONAL_OR_KEYWORD):
+ if count == 0 or param.default is Parameter.empty:
+ count += 1
+ else:
+ break
+ else:
+ break
+ return count
def ensure_property(f: Any) ->Any:
@@ -488,4 +629,6 @@ def ensure_property(f: Any) ->Any:
Returns:
The function, or a `property` or `cached_property` instance wrapping the function.
"""
- pass
+ if isinstance(f, (property, cached_property)) or isdatadescriptor(f) or ismethoddescriptor(f):
+ return f
+ return property(f)
diff --git a/pydantic/_internal/_decorators_v1.py b/pydantic/_internal/_decorators_v1.py
index 4645ddd9..29cbcab2 100644
--- a/pydantic/_internal/_decorators_v1.py
+++ b/pydantic/_internal/_decorators_v1.py
@@ -62,7 +62,26 @@ def make_generic_v1_field_validator(validator: V1Validator
PydanticUserError: If the signature is not supported or the parameters are
not available in Pydantic V2.
"""
- pass
+ sig = signature(validator)
+ params = list(sig.parameters.values())
+
+ def wrapper(value: Any, info: core_schema.ValidationInfo) -> Any:
+ if len(params) == 1:
+ return validator(value)
+ elif len(params) == 2:
+ if params[1].name == 'values':
+ return validator(value, values=info.data)
+ elif params[1].kind == Parameter.VAR_KEYWORD:
+ return validator(value, **{'values': info.data})
+ elif len(params) == 3 and params[1].name == 'values' and params[2].kind == Parameter.VAR_KEYWORD:
+ return validator(value, values=info.data)
+
+ raise PydanticUserError(
+ f"Unsupported validator signature: {sig}",
+ code="unsupported-validator-signature"
+ )
+
+ return wrapper
RootValidatorValues = Dict[str, Any]
@@ -103,4 +122,13 @@ def make_v1_generic_root_validator(validator: V1RootValidatorFunction, pre:
Returns:
A wrapped V2 style validator.
"""
- pass
+ if pre:
+ def before_validator(values: RootValidatorValues, info: core_schema.ValidationInfo) -> RootValidatorValues:
+ return validator(values)
+ return before_validator
+ else:
+ def after_validator(fields_tuple: RootValidatorFieldsTuple, info: core_schema.ValidationInfo) -> RootValidatorFieldsTuple:
+ values = dict(zip(info.field_names, fields_tuple))
+ validated_values = validator(values)
+ return tuple(validated_values[field] for field in info.field_names)
+ return after_validator
diff --git a/pydantic/_internal/_discriminated_union.py b/pydantic/_internal/_discriminated_union.py
index aa07168e..2a0a0489 100644
--- a/pydantic/_internal/_discriminated_union.py
+++ b/pydantic/_internal/_discriminated_union.py
@@ -46,7 +46,11 @@ def apply_discriminator(schema: core_schema.CoreSchema, discriminator: (str |
- If discriminator fields have different aliases.
- If discriminator field not of type `Literal`.
"""
- pass
+ if definitions is None:
+ definitions = {}
+
+ applier = _ApplyInferredDiscriminator(discriminator, definitions)
+ return applier.apply(schema)
class _ApplyInferredDiscriminator:
@@ -96,7 +100,19 @@ class _ApplyInferredDiscriminator:
- If discriminator fields have different aliases.
- If discriminator field not of type `Literal`.
"""
- pass
+ if self._used:
+ raise RuntimeError('_ApplyInferredDiscriminator instances should not be reused')
+ self._used = True
+
+ new_schema = self._apply_to_root(schema)
+
+ if not self._tagged_union_choices:
+ raise TypeError('No valid choices found for discriminated union')
+
+ if self._is_nullable:
+ new_schema = core_schema.nullable_schema(new_schema)
+
+ return new_schema
def _apply_to_root(self, schema: core_schema.CoreSchema
) ->core_schema.CoreSchema:
@@ -104,7 +120,28 @@ class _ApplyInferredDiscriminator:
unwrapping nullable or definitions schemas, and calling the `_handle_choice`
method iteratively on the choices extracted (recursively) from the possibly-wrapped union.
"""
- pass
+ if isinstance(schema, core_schema.NullableSchema):
+ self._should_be_nullable = True
+ return self._apply_to_root(schema['schema'])
+
+ if isinstance(schema, core_schema.DefinitionsSchema):
+ self.definitions.update(schema['definitions'])
+ return self._apply_to_root(schema['schema'])
+
+ if isinstance(schema, core_schema.UnionSchema):
+ for choice in schema['choices']:
+ self._handle_choice(choice)
+ elif isinstance(schema, core_schema.TaggedUnionSchema):
+ for choice in schema['choices'].values():
+ self._handle_choice(choice)
+ else:
+ raise TypeError(f'Invalid schema type for discriminated union: {type(schema)}')
+
+ return core_schema.tagged_union_schema(
+ choices=self._tagged_union_choices,
+ discriminator=self.discriminator,
+ discriminator_alias=self._discriminator_alias,
+ )
def _handle_choice(self, choice: core_schema.CoreSchema) ->None:
"""This method handles the "middle" stage of recursion over the input schema.
@@ -117,7 +154,20 @@ class _ApplyInferredDiscriminator:
* Validating that each allowed discriminator value maps to a unique choice
* Updating the _tagged_union_choices mapping that will ultimately be used to build the TaggedUnionSchema.
"""
- pass
+ if isinstance(choice, core_schema.UnionSchema):
+ for sub_choice in choice['choices']:
+ self._handle_choice(sub_choice)
+ elif isinstance(choice, core_schema.TaggedUnionSchema):
+ if self._is_discriminator_shared(choice):
+ for sub_choice in choice['choices'].values():
+ self._handle_choice(sub_choice)
+ else:
+ self._set_unique_choice_for_values(choice, [choice['discriminator']])
+ elif isinstance(choice, core_schema.NoneSchema):
+ self._is_nullable = True
+ else:
+ values = self._infer_discriminator_values_for_choice(choice, None)
+ self._set_unique_choice_for_values(choice, values)
def _is_discriminator_shared(self, choice: core_schema.TaggedUnionSchema
) ->bool:
@@ -126,7 +176,7 @@ class _ApplyInferredDiscriminator:
determine whether this TaggedUnionSchema choice should be "coalesced" into the top level,
or whether it should be treated as a separate (nested) choice.
"""
- pass
+ return choice['discriminator'] == self.discriminator
def _infer_discriminator_values_for_choice(self, choice: core_schema.
CoreSchema, source_name: (str | None)) ->list[str | int]:
@@ -134,7 +184,14 @@ class _ApplyInferredDiscriminator:
`model_name` is accepted for the purpose of producing useful error messages.
"""
- pass
+ if isinstance(choice, core_schema.TypedDictSchema):
+ return self._infer_discriminator_values_for_typed_dict_choice(choice, source_name)
+ elif isinstance(choice, core_schema.ModelSchema):
+ return self._infer_discriminator_values_for_typed_dict_choice(choice['schema'], source_name)
+ elif isinstance(choice, core_schema.DataclassSchema):
+ return self._infer_discriminator_values_for_typed_dict_choice(choice['schema'], source_name)
+ else:
+ raise TypeError(f'Invalid choice type for discriminated union: {type(choice)}')
def _infer_discriminator_values_for_typed_dict_choice(self, choice:
core_schema.TypedDictSchema, source_name: (str | None)=None) ->list[
@@ -142,18 +199,43 @@ class _ApplyInferredDiscriminator:
"""This method just extracts the _infer_discriminator_values_for_choice logic specific to TypedDictSchema
for the sake of readability.
"""
- pass
+ discriminator_field = choice['fields'].get(self.discriminator)
+ if discriminator_field is None:
+ raise PydanticUserError(f"Model {source_name} in union doesn't have a discriminator field {self.discriminator!r}")
+
+ if discriminator_field.get('alias') is not None:
+ if self._discriminator_alias is None:
+ self._discriminator_alias = discriminator_field['alias']
+ elif self._discriminator_alias != discriminator_field['alias']:
+ raise PydanticUserError('Discriminator fields have different aliases')
+
+ return self._infer_discriminator_values_for_inner_schema(discriminator_field['schema'], f'{source_name}.{self.discriminator}')
def _infer_discriminator_values_for_inner_schema(self, schema:
core_schema.CoreSchema, source: str) ->list[str | int]:
"""When inferring discriminator values for a field, we typically extract the expected values from a literal
schema. This function does that, but also handles nested unions and defaults.
"""
- pass
+ if isinstance(schema, core_schema.LiteralSchema):
+ return [schema['expected']]
+ elif isinstance(schema, core_schema.UnionSchema):
+ values = []
+ for choice in schema['choices']:
+ values.extend(self._infer_discriminator_values_for_inner_schema(choice, source))
+ return values
+ elif isinstance(schema, core_schema.DefaultSchema):
+ return self._infer_discriminator_values_for_inner_schema(schema['schema'], source)
+ else:
+ raise PydanticUserError(f'Discriminator field {source!r} must be a Literal')
def _set_unique_choice_for_values(self, choice: core_schema.CoreSchema,
values: Sequence[str | int]) ->None:
"""This method updates `self.tagged_union_choices` so that all provided (discriminator) `values` map to the
provided `choice`, validating that none of these values already map to another (different) choice.
"""
- pass
+ for value in values:
+ if value in self._tagged_union_choices:
+ if self._tagged_union_choices[value] != choice:
+ raise TypeError(f'Discriminator value {value!r} mapped to multiple choices')
+ else:
+ self._tagged_union_choices[value] = choice
diff --git a/pydantic/_internal/_docs_extraction.py b/pydantic/_internal/_docs_extraction.py
index 88a14e9f..e444fa51 100644
--- a/pydantic/_internal/_docs_extraction.py
+++ b/pydantic/_internal/_docs_extraction.py
@@ -27,4 +27,13 @@ def extract_docstrings_from_cls(cls: type[Any], use_inspect: bool=False
Returns:
A mapping containing attribute names and their corresponding docstring.
"""
- pass
+ if use_inspect:
+ source = inspect.getsource(cls)
+ else:
+ source = textwrap.dedent(inspect.getmodule(cls).__dict__[cls.__name__].__doc__ or '')
+
+ tree = ast.parse(source)
+ visitor = DocstringVisitor()
+ visitor.visit(tree)
+
+ return visitor.attrs
diff --git a/pydantic/_internal/_fields.py b/pydantic/_internal/_fields.py
index 7e8b6c92..e6d13947 100644
--- a/pydantic/_internal/_fields.py
+++ b/pydantic/_internal/_fields.py
@@ -36,7 +36,13 @@ def get_type_hints_infer_globalns(obj: Any, localns: (dict[str, Any] | None
Returns:
The object type hints.
"""
- pass
+ globalns = None
+ if hasattr(obj, '__module__'):
+ try:
+ globalns = sys.modules[obj.__module__].__dict__
+ except KeyError:
+ pass
+ return get_type_hints(obj, globalns, localns, include_extras)
class PydanticMetadata(Representation):
@@ -53,13 +59,21 @@ def pydantic_general_metadata(**metadata: Any) ->BaseMetadata:
Returns:
The new `_PydanticGeneralMetadata` class.
"""
- pass
+ class _PydanticGeneralMetadata(_general_metadata_cls()):
+ __slots__ = tuple(metadata.keys())
+
+ def __init__(self, **kwargs):
+ for key, value in metadata.items():
+ setattr(self, key, value)
+
+ return _PydanticGeneralMetadata(**metadata)
@lru_cache(maxsize=None)
def _general_metadata_cls() ->type[BaseMetadata]:
"""Do it this way to avoid importing `annotated_types` at import time."""
- pass
+ from annotated_types import BaseMetadata
+ return BaseMetadata
def collect_model_fields(cls: type[BaseModel], bases: tuple[type[Any], ...],
@@ -88,7 +102,39 @@ def collect_model_fields(cls: type[BaseModel], bases: tuple[type[Any], ...],
- If there is a field other than `root` in `RootModel`.
- If a field shadows an attribute in the parent model.
"""
- pass
+ fields: dict[str, FieldInfo] = {}
+ class_vars: set[str] = set()
+
+ # Collect fields from parent classes
+ for base in reversed(bases):
+ if hasattr(base, 'model_fields'):
+ fields.update(base.model_fields)
+
+ # Get type hints for the current class
+ type_hints = get_type_hints_infer_globalns(cls, types_namespace, include_extras=True)
+
+ for name, hint in type_hints.items():
+ if is_classvar(hint):
+ class_vars.add(name)
+ elif not is_finalvar(hint):
+ field = getattr(cls, name, PydanticUndefined)
+ if isinstance(field, FieldInfo):
+ fields[name] = field
+ else:
+ fields[name] = FieldInfo(default=field)
+
+ # Check for naming conflicts
+ protected_namespaces = {'model_', 'model_fields', 'model_config'}
+ for field_name in fields:
+ if field_name in protected_namespaces:
+ raise NameError(f"Field {field_name} conflicts with protected namespace")
+ if cls.__name__ == 'RootModel' and field_name != 'root':
+ raise NameError(f"RootModel can only have a 'root' field, not {field_name}")
+ for base in bases:
+ if hasattr(base, field_name) and not isinstance(getattr(base, field_name), FieldInfo):
+ raise NameError(f"Field {field_name} shadows an attribute in parent {base.__name__}")
+
+ return fields, class_vars
def collect_dataclass_fields(cls: type[StandardDataclass], types_namespace:
@@ -105,4 +151,24 @@ def collect_dataclass_fields(cls: type[StandardDataclass], types_namespace:
Returns:
The dataclass fields.
"""
- pass
+ fields: dict[str, FieldInfo] = {}
+ type_hints = get_type_hints_infer_globalns(cls, types_namespace, include_extras=True)
+
+ for name, hint in type_hints.items():
+ if not is_classvar(hint) and not is_finalvar(hint):
+ field = cls.__dataclass_fields__[name]
+ default = field.default if field.default is not dataclasses.MISSING else PydanticUndefined
+ default_factory = field.default_factory if field.default_factory is not dataclasses.MISSING else None
+
+ field_info = FieldInfo(
+ annotation=hint,
+ default=default,
+ default_factory=default_factory,
+ init=field.init,
+ repr=field.repr,
+ kw_only=field.kw_only,
+ )
+
+ fields[name] = field_info
+
+ return fields
diff --git a/pydantic/_internal/_generate_schema.py b/pydantic/_internal/_generate_schema.py
index c7465578..6d5048ee 100644
--- a/pydantic/_internal/_generate_schema.py
+++ b/pydantic/_internal/_generate_schema.py
@@ -77,7 +77,9 @@ def check_validator_fields_against_field_name(info: FieldDecoratorInfo,
Returns:
`True` if field name is in validator fields, `False` otherwise.
"""
- pass
+ if isinstance(info, (ValidatorDecoratorInfo, FieldValidatorDecoratorInfo)):
+ return field in info.fields or '*' in info.fields
+ return True # For other types of decorators, assume it applies to all fields
def check_decorator_fields_exist(decorators: Iterable[AnyFieldDecorator],
@@ -93,7 +95,17 @@ def check_decorator_fields_exist(decorators: Iterable[AnyFieldDecorator],
Raises:
PydanticUserError: If one of the field names does not exist in `fields` param.
"""
- pass
+ fields_set = set(fields)
+ for decorator in decorators:
+ info = decorator.info
+ if isinstance(info, (ValidatorDecoratorInfo, FieldValidatorDecoratorInfo)):
+ if info.check_fields and '*' not in info.fields:
+ for field in info.fields:
+ if field not in fields_set:
+ raise PydanticUserError(
+ f"Decorator field '{field}' does not exist in {fields_set}",
+ code='decorator-missing-field'
+ )
def modify_model_json_schema(schema_or_field: CoreSchemaOrField, handler:
@@ -110,7 +122,17 @@ def modify_model_json_schema(schema_or_field: CoreSchemaOrField, handler:
Returns:
JsonSchemaValue: The updated JSON schema.
"""
- pass
+ json_schema = handler(schema_or_field)
+
+ if title is None:
+ title = cls.__name__
+
+ json_schema['title'] = title
+
+ if cls.__doc__:
+ json_schema['description'] = inspect.cleandoc(cls.__doc__)
+
+ return json_schema
JsonEncoders = Dict[Type[Any], JsonEncoder]
@@ -125,7 +147,17 @@ def _add_custom_serialization_from_json_encoders(json_encoders: (
tp: The type to check for a matching encoder.
schema: The schema to add the encoder to.
"""
- pass
+ if json_encoders is None:
+ return schema
+
+ for encoder_type, encoder in json_encoders.items():
+ if isinstance(tp, encoder_type):
+ return core_schema.json_or_python_schema(
+ json_schema=core_schema.with_info_plain_validator_function(encoder),
+ python_schema=schema,
+ )
+
+ return schema
TypesNamespace = Union[Dict[str, Any], None]
@@ -144,7 +176,7 @@ def _get_first_non_null(a: Any, b: Any) ->Any:
Use case: serialization_alias (argument a) and alias (argument b) are both defined, and serialization_alias is ''.
This function will return serialization_alias, which is the first argument, even though it is an empty string.
"""
- pass
+ return a if a is not None else b
class GenerateSchema:
diff --git a/pydantic/_internal/_generics.py b/pydantic/_internal/_generics.py
index 6087db94..79339289 100644
--- a/pydantic/_internal/_generics.py
+++ b/pydantic/_internal/_generics.py
@@ -106,7 +106,16 @@ def create_generic_submodel(model_name: str, origin: type[BaseModel], args:
Returns:
The created submodel.
"""
- pass
+ namespace = {
+ '__module__': origin.__module__,
+ '__qualname__': f'{origin.__qualname__}[{", ".join(str(arg) for arg in args)}]',
+ '__pydantic_generic_metadata__': PydanticGenericMetadata(
+ origin=origin,
+ args=args,
+ parameters=params
+ ),
+ }
+ return types.new_class(model_name, (origin,), {}, lambda ns: ns.update(namespace))
def _get_caller_frame_info(depth: int=2) ->tuple[str | None, bool]:
@@ -121,7 +130,15 @@ def _get_caller_frame_info(depth: int=2) ->tuple[str | None, bool]:
Raises:
RuntimeError: If the function is not called inside a function.
"""
- pass
+ try:
+ frame = sys._getframe(depth)
+ except ValueError as e:
+ raise RuntimeError('This function must be called inside another function') from e
+
+ module_name = frame.f_globals.get('__name__')
+ called_globally = frame.f_locals is frame.f_globals
+
+ return module_name, called_globally
DictValues: type[Any] = {}.values().__class__
@@ -133,7 +150,16 @@ def iter_contained_typevars(v: Any) ->Iterator[TypeVarType]:
This is inspired as an alternative to directly accessing the `__parameters__` attribute of a GenericAlias,
since __parameters__ of (nested) generic BaseModel subclasses won't show up in that list.
"""
- pass
+ if isinstance(v, TypeVar):
+ yield v
+ elif is_model_class(v) and hasattr(v, '__pydantic_generic_metadata__'):
+ yield from iter_contained_typevars(v.__pydantic_generic_metadata__['args'])
+ elif isinstance(v, (typing_base, types.GenericAlias)):
+ for arg in getattr(v, '__args__', ()):
+ yield from iter_contained_typevars(arg)
+ elif isinstance(v, (list, tuple, set)):
+ for item in v:
+ yield from iter_contained_typevars(item)
def get_standard_typevars_map(cls: type[Any]) ->(dict[TypeVarType, Any] | None
@@ -141,7 +167,16 @@ def get_standard_typevars_map(cls: type[Any]) ->(dict[TypeVarType, Any] | None
"""Package a generic type's typevars and parametrization (if present) into a dictionary compatible with the
`replace_types` function. Specifically, this works with standard typing generics and typing._GenericAlias.
"""
- pass
+ if not hasattr(cls, '__parameters__') or not hasattr(cls, '__args__'):
+ return None
+
+ parameters = getattr(cls, '__parameters__', ())
+ args = getattr(cls, '__args__', ())
+
+ if len(parameters) != len(args):
+ return None
+
+ return dict(zip(parameters, args))
def get_model_typevars_map(cls: type[BaseModel]) ->(dict[TypeVarType, Any] |
@@ -152,7 +187,17 @@ def get_model_typevars_map(cls: type[BaseModel]) ->(dict[TypeVarType, Any] |
Since BaseModel.__class_getitem__ does not produce a typing._GenericAlias, and the BaseModel generic info is
stored in the __pydantic_generic_metadata__ attribute, we need special handling here.
"""
- pass
+ if not hasattr(cls, '__pydantic_generic_metadata__'):
+ return None
+
+ metadata = cls.__pydantic_generic_metadata__
+ parameters = metadata.get('parameters', ())
+ args = metadata.get('args', ())
+
+ if len(parameters) != len(args):
+ return None
+
+ return dict(zip(parameters, args))
def replace_types(type_: Any, type_map: (Mapping[Any, Any] | None)) ->Any:
@@ -176,14 +221,46 @@ def replace_types(type_: Any, type_map: (Mapping[Any, Any] | None)) ->Any:
#> Tuple[int, Union[List[int], float]]
```
"""
- pass
+ if type_map is None:
+ return type_
+
+ if isinstance(type_, TypeVar):
+ return type_map.get(type_, type_)
+
+ if isinstance(type_, (typing_base, types.GenericAlias)):
+ args = getattr(type_, '__args__', ())
+ if not args:
+ return type_
+ new_args = tuple(replace_types(arg, type_map) for arg in args)
+ if all_identical(args, new_args):
+ return type_
+ return type_[new_args]
+
+ if is_model_class(type_) and hasattr(type_, '__pydantic_generic_metadata__'):
+ metadata = type_.__pydantic_generic_metadata__
+ new_args = tuple(replace_types(arg, type_map) for arg in metadata['args'])
+ if all_identical(metadata['args'], new_args):
+ return type_
+ return create_generic_submodel(type_.__name__, type_, new_args, metadata['parameters'])
+
+ return type_
def has_instance_in_type(type_: Any, isinstance_target: Any) ->bool:
"""Checks if the type, or any of its arbitrary nested args, satisfy
`isinstance(<type>, isinstance_target)`.
"""
- pass
+ if isinstance(type_, isinstance_target):
+ return True
+
+ if hasattr(type_, '__args__'):
+ return any(has_instance_in_type(arg, isinstance_target) for arg in type_.__args__)
+
+ if is_model_class(type_) and hasattr(type_, '__pydantic_generic_metadata__'):
+ metadata = type_.__pydantic_generic_metadata__
+ return any(has_instance_in_type(arg, isinstance_target) for arg in metadata['args'])
+
+ return False
def check_parameters_count(cls: type[BaseModel], parameters: tuple[Any, ...]
@@ -197,7 +274,12 @@ def check_parameters_count(cls: type[BaseModel], parameters: tuple[Any, ...]
Raises:
TypeError: If the passed parameters count is not equal to generic model parameters count.
"""
- pass
+ if not hasattr(cls, '__pydantic_generic_metadata__'):
+ raise TypeError(f'{cls.__name__} is not a generic model')
+
+ expected_params = cls.__pydantic_generic_metadata__['parameters']
+ if len(parameters) != len(expected_params):
+ raise TypeError(f'Expected {len(expected_params)} type arguments, got {len(parameters)}')
_generic_recursion_cache: ContextVar[set[str] | None] = ContextVar(
@@ -214,7 +296,25 @@ def generic_recursion_self_type(origin: type[BaseModel], args: tuple[Any, ...]
can be used while building the core schema, and will produce a schema_ref that will be valid in the
final parent schema.
"""
- pass
+ cache = _generic_recursion_cache.get()
+ if cache is None:
+ cache = set()
+ token = _generic_recursion_cache.set(cache)
+ else:
+ token = None
+
+ key = (origin, args)
+ if key in cache:
+ yield PydanticRecursiveRef(origin.__name__)
+ else:
+ cache.add(key)
+ try:
+ yield None
+ finally:
+ cache.remove(key)
+
+ if token is not None:
+ _generic_recursion_cache.reset(token)
def get_cached_generic_type_early(parent: type[BaseModel], typevar_values: Any
diff --git a/pydantic/_internal/_git.py b/pydantic/_internal/_git.py
index c2419c23..c1c58256 100644
--- a/pydantic/_internal/_git.py
+++ b/pydantic/_internal/_git.py
@@ -6,14 +6,44 @@ import subprocess
def is_git_repo(dir: str) ->bool:
"""Is the given directory version-controlled with git?"""
- pass
+ try:
+ subprocess.run(
+ ['git', 'rev-parse', '--is-inside-work-tree'],
+ cwd=dir,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ check=True
+ )
+ return True
+ except subprocess.CalledProcessError:
+ return False
def have_git() ->bool:
"""Can we run the git executable?"""
- pass
+ try:
+ subprocess.run(
+ ['git', '--version'],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ check=True
+ )
+ return True
+ except (subprocess.CalledProcessError, FileNotFoundError):
+ return False
def git_revision(dir: str) ->str:
"""Get the SHA-1 of the HEAD of a git repository."""
- pass
+ try:
+ result = subprocess.run(
+ ['git', 'rev-parse', 'HEAD'],
+ cwd=dir,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL,
+ check=True,
+ text=True
+ )
+ return result.stdout.strip()
+ except subprocess.CalledProcessError:
+ return 'unknown'
diff --git a/pydantic/_internal/_known_annotated_metadata.py b/pydantic/_internal/_known_annotated_metadata.py
index 4df2ee94..bbcab0c5 100644
--- a/pydantic/_internal/_known_annotated_metadata.py
+++ b/pydantic/_internal/_known_annotated_metadata.py
@@ -78,7 +78,16 @@ def expand_grouped_metadata(annotations: Iterable[Any]) ->Iterable[Any]:
#> [Ge(ge=4), MinLen(min_length=5)]
```
"""
- pass
+ from annotated_types import GroupedMetadata, Len, MinLen, MaxLen
+
+ for annotation in annotations:
+ if isinstance(annotation, GroupedMetadata):
+ yield from annotation.metadata
+ elif isinstance(annotation, Len):
+ yield MinLen(annotation.min_length)
+ yield MaxLen(annotation.max_length)
+ else:
+ yield annotation
@lru_cache
@@ -90,7 +99,26 @@ def _get_at_to_constraint_map() ->dict[type, str]:
the import time of `pydantic`. We still only want to have this dictionary defined in one place,
so we use this function to cache the result.
"""
- pass
+ from annotated_types import (
+ Gt, Ge, Lt, Le, MultipleOf, MinLen, MaxLen, Len,
+ Predicate, Timezone, Interval, AfterDateTime, BeforeDateTime
+ )
+
+ return {
+ Gt: 'gt',
+ Ge: 'ge',
+ Lt: 'lt',
+ Le: 'le',
+ MultipleOf: 'multiple_of',
+ MinLen: 'min_length',
+ MaxLen: 'max_length',
+ Len: 'length',
+ Predicate: 'predicate',
+ Timezone: 'timezone',
+ Interval: 'interval',
+ AfterDateTime: 'after_datetime',
+ BeforeDateTime: 'before_datetime',
+ }
def apply_known_metadata(annotation: Any, schema: CoreSchema) ->(CoreSchema |
@@ -113,7 +141,31 @@ def apply_known_metadata(annotation: Any, schema: CoreSchema) ->(CoreSchema |
Raises:
PydanticCustomError: If `Predicate` fails.
"""
- pass
+ from annotated_types import Predicate
+
+ at_to_constraint = _get_at_to_constraint_map()
+ annotation_type = type(annotation)
+
+ if annotation_type not in at_to_constraint:
+ return None
+
+ constraint = at_to_constraint[annotation_type]
+ new_schema = copy(schema)
+
+ if constraint == 'predicate':
+ if not callable(annotation.func):
+ raise PydanticCustomError(
+ 'invalid_predicate',
+ 'Invalid predicate function'
+ )
+ new_schema['predicate'] = annotation.func
+ elif constraint in CONSTRAINTS_TO_ALLOWED_SCHEMAS:
+ allowed_schemas = CONSTRAINTS_TO_ALLOWED_SCHEMAS[constraint]
+ if schema['type'] not in allowed_schemas:
+ return None
+ new_schema[constraint] = getattr(annotation, constraint)
+
+ return new_schema
def collect_known_metadata(annotations: Iterable[Any]) ->tuple[dict[str,
@@ -136,7 +188,23 @@ def collect_known_metadata(annotations: Iterable[Any]) ->tuple[dict[str,
#> ({'gt': 1, 'min_length': 42}, [Ellipsis])
```
"""
- pass
+ known_metadata = {}
+ unknown_annotations = []
+ at_to_constraint = _get_at_to_constraint_map()
+
+ for annotation in expand_grouped_metadata(annotations):
+ annotation_type = type(annotation)
+ if annotation_type in at_to_constraint:
+ constraint = at_to_constraint[annotation_type]
+ if constraint == 'length':
+ known_metadata['min_length'] = annotation.min_length
+ known_metadata['max_length'] = annotation.max_length
+ else:
+ known_metadata[constraint] = getattr(annotation, constraint)
+ else:
+ unknown_annotations.append(annotation)
+
+ return known_metadata, unknown_annotations
def check_metadata(metadata: dict[str, Any], allowed: Iterable[str],
@@ -152,4 +220,11 @@ def check_metadata(metadata: dict[str, Any], allowed: Iterable[str],
Raises:
TypeError: If there is metadatas that can't be applied on source type.
"""
- pass
+ allowed_set = set(allowed)
+ invalid_constraints = set(metadata.keys()) - allowed_set
+ if invalid_constraints:
+ raise TypeError(
+ f"The following constraints cannot be applied to a {source_type}: "
+ f"{', '.join(invalid_constraints)}. "
+ f"Allowed constraints are: {', '.join(allowed_set)}"
+ )
diff --git a/pydantic/_internal/_mock_val_ser.py b/pydantic/_internal/_mock_val_ser.py
index ff4c6139..6a5f3b73 100644
--- a/pydantic/_internal/_mock_val_ser.py
+++ b/pydantic/_internal/_mock_val_ser.py
@@ -34,6 +34,16 @@ class MockCoreSchema(Mapping[str, Any]):
def __iter__(self) ->Iterator[str]:
return self._get_built().__iter__()
+ def _get_built(self) -> CoreSchema:
+ if self._built_memo is None:
+ if self._attempt_rebuild:
+ rebuilt = self._attempt_rebuild()
+ if rebuilt is not None:
+ self._built_memo = rebuilt
+ return self._built_memo
+ raise PydanticUserError(self._error_message, code=self._code)
+ return self._built_memo
+
class MockValSer(Generic[ValSer]):
"""Mocker for `pydantic_core.SchemaValidator` or `pydantic_core.SchemaSerializer` which optionally attempts to
@@ -69,7 +79,17 @@ def set_model_mocks(cls: type[BaseModel], cls_name: str, undefined_name:
cls_name: Name of the model class, used in error messages
undefined_name: Name of the undefined thing, used in error messages
"""
- pass
+ error_message = f"The model '{cls_name}' is not fully defined; {undefined_name} have not been imported yet"
+ cls.__pydantic_validator__ = MockValSer(
+ error_message,
+ code=PydanticErrorCodes.model_not_fully_defined,
+ val_or_ser='validator'
+ )
+ cls.__pydantic_serializer__ = MockValSer(
+ error_message,
+ code=PydanticErrorCodes.model_not_fully_defined,
+ val_or_ser='serializer'
+ )
def set_dataclass_mocks(cls: type[PydanticDataclass], cls_name: str,
@@ -81,4 +101,14 @@ def set_dataclass_mocks(cls: type[PydanticDataclass], cls_name: str,
cls_name: Name of the model class, used in error messages
undefined_name: Name of the undefined thing, used in error messages
"""
- pass
+ error_message = f"The dataclass '{cls_name}' is not fully defined; {undefined_name} have not been imported yet"
+ cls.__pydantic_validator__ = MockValSer(
+ error_message,
+ code=PydanticErrorCodes.dataclass_not_fully_defined,
+ val_or_ser='validator'
+ )
+ cls.__pydantic_serializer__ = MockValSer(
+ error_message,
+ code=PydanticErrorCodes.dataclass_not_fully_defined,
+ val_or_ser='serializer'
+ )
diff --git a/pydantic/_internal/_model_construction.py b/pydantic/_internal/_model_construction.py
index f8876857..680a15b1 100644
--- a/pydantic/_internal/_model_construction.py
+++ b/pydantic/_internal/_model_construction.py
@@ -231,7 +231,12 @@ def init_private_attributes(self: BaseModel, context: Any, /) ->None:
def get_model_post_init(namespace: dict[str, Any], bases: tuple[type[Any], ...]
) ->(Callable[..., Any] | None):
"""Get the `model_post_init` method from the namespace or the class bases, or `None` if not defined."""
- pass
+ if 'model_post_init' in namespace:
+ return namespace['model_post_init']
+ for base in bases:
+ if hasattr(base, 'model_post_init'):
+ return getattr(base, 'model_post_init')
+ return None
def inspect_namespace(namespace: dict[str, Any], ignored_types: tuple[type[
@@ -257,7 +262,43 @@ def inspect_namespace(namespace: dict[str, Any], ignored_types: tuple[type[
- If a field does not have a type annotation.
- If a field on base class was overridden by a non-annotated attribute.
"""
- pass
+ from ..fields import ModelPrivateAttr
+ from ..errors import PydanticUserError
+
+ private_attributes: dict[str, ModelPrivateAttr] = {}
+ annotations = namespace.get('__annotations__', {})
+
+ if '__root__' in namespace:
+ raise TypeError("To define a 'root model', use `pydantic.RootModel` rather than a field called '__root__'")
+
+ for name, value in namespace.items():
+ if name.startswith('__'):
+ continue
+
+ if isinstance(value, ModelPrivateAttr):
+ if name.startswith('_'):
+ private_attributes[name] = value
+ else:
+ raise NameError(
+ f'Private attributes "{name}" must not be a valid field name; '
+ f'Use sunder or dunder names, e.g. "_{name}" or "__{name}__"'
+ )
+ elif not is_valid_field_name(name):
+ continue
+ elif name in base_class_fields and name not in annotations:
+ raise PydanticUserError(
+ f'Field "{name}" defined on a base class was overridden by a non-annotated attribute. '
+ f'All fields must be annotated. '
+ f'To avoid this error, use `{name}: {type(value).__name__} = ...`',
+ code='model-field-overridden',
+ )
+ elif name not in annotations:
+ if name in base_class_vars:
+ continue
+ if not isinstance(value, ignored_types):
+ warnings.warn(f'Field "{name}" has no type annotation', UserWarning, stacklevel=2)
+
+ return private_attributes
def set_model_fields(cls: type[BaseModel], bases: tuple[type[Any], ...],
@@ -270,7 +311,18 @@ def set_model_fields(cls: type[BaseModel], bases: tuple[type[Any], ...],
config_wrapper: The config wrapper instance.
types_namespace: Optional extra namespace to look for types in.
"""
- pass
+ from ..fields import FieldInfo
+
+ cls.model_fields, cls.__class_vars__ = collect_model_fields(
+ cls,
+ bases,
+ config_wrapper,
+ types_namespace,
+ )
+
+ for field in cls.model_fields.values():
+ if isinstance(field, FieldInfo):
+ field.set_config(config_wrapper)
def complete_model_class(cls: type[BaseModel], cls_name: str,
@@ -297,12 +349,53 @@ def complete_model_class(cls: type[BaseModel], cls_name: str,
PydanticUndefinedAnnotation: If `PydanticUndefinedAnnotation` occurs in`__get_pydantic_core_schema__`
and `raise_errors=True`.
"""
- pass
+ from ..errors import PydanticUndefinedAnnotation
+
+ if cls.__pydantic_complete__:
+ return True
+
+ try:
+ types_namespace = types_namespace or {}
+ model_module = create_model_module or cls.__module__
+ cls.__pydantic_core_schema__ = GenerateSchema(
+ config_wrapper,
+ types_namespace,
+ model_module,
+ ).generate_schema(cls)
+
+ cls.__pydantic_validator__ = create_schema_validator(
+ cls.__pydantic_core_schema__,
+ cls,
+ config_wrapper.config_dict,
+ cls_name,
+ )
+ cls.__pydantic_serializer__ = SchemaSerializer(cls.__pydantic_core_schema__)
+
+ set_model_mocks(cls)
+ cls.__pydantic_complete__ = True
+ return True
+ except PydanticUndefinedAnnotation as e:
+ if raise_errors:
+ raise
+ warnings.warn(str(e), UserWarning)
+ return False
def set_deprecated_descriptors(cls: type[BaseModel]) ->None:
"""Set data descriptors on the class for deprecated fields."""
- pass
+ for field_name, field in cls.model_fields.items():
+ if field.deprecated:
+ msg = f'The field "{field_name}" is deprecated'
+ if isinstance(field.deprecated, str):
+ msg += f', {field.deprecated}'
+ setattr(cls, field_name, _DeprecatedFieldDescriptor(msg))
+
+ for name, decorator_info in cls.__pydantic_decorators__.computed_fields.items():
+ if decorator_info.info.deprecated:
+ msg = f'The computed_field "{name}" is deprecated'
+ if isinstance(decorator_info.info.deprecated, str):
+ msg += f', {decorator_info.info.deprecated}'
+ setattr(cls, name, _DeprecatedFieldDescriptor(msg, decorator_info.wrapped))
class _DeprecatedFieldDescriptor:
@@ -379,10 +472,14 @@ def build_lenient_weakvaluedict(d: (dict[str, Any] | None)) ->(dict[str,
The `unpack_lenient_weakvaluedict` function can be used to reverse this operation.
"""
- pass
+ if d is None:
+ return None
+ return {k: _PydanticWeakRef(v) for k, v in d.items()}
def unpack_lenient_weakvaluedict(d: (dict[str, Any] | None)) ->(dict[str,
Any] | None):
"""Inverts the transform performed by `build_lenient_weakvaluedict`."""
- pass
+ if d is None:
+ return None
+ return {k: v() for k, v in d.items()}
diff --git a/pydantic/_internal/_repr.py b/pydantic/_internal/_repr.py
index ad15026c..46212ae8 100644
--- a/pydantic/_internal/_repr.py
+++ b/pydantic/_internal/_repr.py
@@ -80,4 +80,25 @@ def display_as_type(obj: Any) ->str:
Takes some logic from `typing._type_repr`.
"""
- pass
+ if isinstance(obj, type):
+ if obj.__module__ == 'builtins':
+ return obj.__qualname__
+ return f'{obj.__module__}.{obj.__qualname__}'
+
+ if obj is ...:
+ return '...'
+
+ if obj is Any:
+ return 'Any'
+
+ if isinstance(obj, _typing_extra.TypeAliasType):
+ return obj.__name__
+
+ if isinstance(obj, types.GenericAlias):
+ params = ', '.join(display_as_type(param) for param in obj.__args__)
+ return f'{obj.__origin__.__name__}[{params}]'
+
+ if isinstance(obj, _typing_extra.TypeVar):
+ return obj.__name__
+
+ return repr(obj)
diff --git a/pydantic/_internal/_schema_generation_shared.py b/pydantic/_internal/_schema_generation_shared.py
index 34203092..c0845567 100644
--- a/pydantic/_internal/_schema_generation_shared.py
+++ b/pydantic/_internal/_schema_generation_shared.py
@@ -47,7 +47,16 @@ class GenerateJsonSchemaHandler(GetJsonSchemaHandler):
Raises:
LookupError: If it can't find the definition for `$ref`.
"""
- pass
+ if isinstance(maybe_ref_json_schema, dict) and '$ref' in maybe_ref_json_schema:
+ ref = maybe_ref_json_schema['$ref']
+ if not ref.startswith('#/definitions/'):
+ raise ValueError(f"Invalid $ref format: {ref}")
+ definition_key = ref[len('#/definitions/'):]
+ try:
+ return self.generate_json_schema.definitions[definition_key]
+ except KeyError:
+ raise LookupError(f"Definition not found for $ref: {ref}")
+ return maybe_ref_json_schema
class CallbackGetCoreSchemaHandler(GetCoreSchemaHandler):
@@ -88,4 +97,10 @@ class CallbackGetCoreSchemaHandler(GetCoreSchemaHandler):
Raises:
LookupError: If it can't find the definition for reference.
"""
- pass
+ if isinstance(maybe_ref_schema, dict) and 'ref' in maybe_ref_schema:
+ ref = maybe_ref_schema['ref']
+ try:
+ return self._generate_schema.defs.definitions[ref]
+ except KeyError:
+ raise LookupError(f"Definition not found for reference: {ref}")
+ return maybe_ref_schema
diff --git a/pydantic/_internal/_signature.py b/pydantic/_internal/_signature.py
index f5663caa..da9f9662 100644
--- a/pydantic/_internal/_signature.py
+++ b/pydantic/_internal/_signature.py
@@ -22,7 +22,11 @@ def _field_name_for_signature(field_name: str, field_info: FieldInfo) ->str:
Returns:
The correct name to use when generating a signature.
"""
- pass
+ if field_info.validation_alias and isinstance(field_info.validation_alias, str) and is_valid_identifier(field_info.validation_alias):
+ return field_info.validation_alias
+ elif field_info.alias and is_valid_identifier(field_info.alias):
+ return field_info.alias
+ return field_name
def _process_param_defaults(param: Parameter) ->Parameter:
@@ -34,13 +38,38 @@ def _process_param_defaults(param: Parameter) ->Parameter:
Returns:
Parameter: The custom processed parameter
"""
- pass
+ if isinstance(param.default, FieldInfo):
+ default = param.default.default
+ if default is PydanticUndefined:
+ default = Parameter.empty
+ return param.replace(default=default)
+ return param
def _generate_signature_parameters(init: Callable[..., None], fields: dict[
str, FieldInfo], config_wrapper: ConfigWrapper) ->dict[str, Parameter]:
"""Generate a mapping of parameter names to Parameter objects for a pydantic BaseModel or dataclass."""
- pass
+ init_signature = signature(init)
+ parameters = {}
+
+ for name, param in init_signature.parameters.items():
+ if name == 'self':
+ continue
+ if name in fields:
+ field_info = fields[name]
+ param_name = _field_name_for_signature(name, field_info)
+ default = field_info.default if field_info.default is not PydanticUndefined else Parameter.empty
+ annotation = field_info.annotation if field_info.annotation is not PydanticUndefined else param.annotation
+ parameters[param_name] = Parameter(
+ param_name,
+ kind=param.kind,
+ default=default,
+ annotation=annotation
+ )
+ else:
+ parameters[name] = param
+
+ return parameters
def generate_pydantic_signature(init: Callable[..., None], fields: dict[str,
@@ -57,4 +86,12 @@ def generate_pydantic_signature(init: Callable[..., None], fields: dict[str,
Returns:
The dataclass/BaseModel subclass signature.
"""
- pass
+ parameters = _generate_signature_parameters(init, fields, config_wrapper)
+
+ if is_dataclass:
+ parameters = {name: _process_param_defaults(param) for name, param in parameters.items()}
+
+ return Signature(
+ parameters=list(parameters.values()),
+ return_annotation=Signature.empty
+ )
diff --git a/pydantic/_internal/_typing_extra.py b/pydantic/_internal/_typing_extra.py
index 1caf85c1..ae6213a6 100644
--- a/pydantic/_internal/_typing_extra.py
+++ b/pydantic/_internal/_typing_extra.py
@@ -52,14 +52,29 @@ def all_literal_values(type_: type[Any]) ->list[Any]:
Literal can be used recursively (see https://www.python.org/dev/peps/pep-0586)
e.g. `Literal[Literal[Literal[1, 2, 3], "foo"], 5, None]`.
"""
- pass
+ if get_origin(type_) in LITERAL_TYPES:
+ values = []
+ for arg in get_args(type_):
+ if get_origin(arg) in LITERAL_TYPES:
+ values.extend(all_literal_values(arg))
+ else:
+ values.append(arg)
+ return values
+ raise ValueError(f"{type_} is not a Literal type")
def is_namedtuple(type_: type[Any]) ->bool:
"""Check if a given class is a named tuple.
It can be either a `typing.NamedTuple` or `collections.namedtuple`.
"""
- pass
+ return (
+ isinstance(type_, type) and
+ issubclass(type_, tuple) and
+ hasattr(type_, '_fields') and
+ hasattr(type_, '_field_defaults') and
+ hasattr(type_, '_make') and
+ hasattr(type_, '_replace')
+ )
test_new_type = typing.NewType('test_new_type', str)
@@ -70,12 +85,12 @@ def is_new_type(type_: type[Any]) ->bool:
Can't use isinstance because it fails <3.10.
"""
- pass
+ return hasattr(type_, '__supertype__') and callable(type_)
def _check_finalvar(v: (type[Any] | None)) ->bool:
"""Check if a given type is a `typing.Final` type."""
- pass
+ return v is not None and get_origin(v) is typing.Final
def parent_frame_namespace(*, parent_depth: int=2) ->(dict[str, Any] | None):
@@ -90,7 +105,12 @@ def parent_frame_namespace(*, parent_depth: int=2) ->(dict[str, Any] | None):
dict of exactly what's in scope. Using `f_back` would work sometimes but would be very wrong and confusing in many
other cases. See https://discuss.python.org/t/is-there-a-way-to-access-parent-nested-namespaces/20659.
"""
- pass
+ try:
+ frame = sys._getframe(parent_depth)
+ except ValueError: # pragma: no cover
+ return None
+ else:
+ return frame.f_locals
def get_cls_type_hints_lenient(obj: Any, globalns: (dict[str, Any] | None)=None
@@ -99,13 +119,27 @@ def get_cls_type_hints_lenient(obj: Any, globalns: (dict[str, Any] | None)=None
Unlike `typing.get_type_hints`, this function will not error if a forward reference is not resolvable.
"""
- pass
+ hints = {}
+ for base in reversed(obj.__mro__):
+ if base is object:
+ continue
+ base_hints = base.__dict__.get('__annotations__', {})
+ for name, value in base_hints.items():
+ if isinstance(value, str):
+ value = eval_type_lenient(value, globalns, {})
+ hints[name] = value
+ return hints
def eval_type_lenient(value: Any, globalns: (dict[str, Any] | None)=None,
localns: (dict[str, Any] | None)=None) ->Any:
"""Behaves like typing._eval_type, except it won't raise an error if a forward reference can't be resolved."""
- pass
+ if isinstance(value, str):
+ try:
+ value = eval(value, globalns, localns)
+ except NameError:
+ return value
+ return value
def eval_type_backport(value: Any, globalns: (dict[str, Any] | None)=None,
diff --git a/pydantic/_internal/_utils.py b/pydantic/_internal/_utils.py
index 5353dcfc..019c1756 100644
--- a/pydantic/_internal/_utils.py
+++ b/pydantic/_internal/_utils.py
@@ -32,7 +32,8 @@ def is_model_class(cls: Any) ->TypeGuard[type[BaseModel]]:
"""Returns true if cls is a _proper_ subclass of BaseModel, and provides proper type-checking,
unlike raw calls to lenient_issubclass.
"""
- pass
+ from ..main import BaseModel
+ return isinstance(cls, type) and issubclass(cls, BaseModel) and cls is not BaseModel
def is_valid_identifier(identifier: str) ->bool:
@@ -40,7 +41,7 @@ def is_valid_identifier(identifier: str) ->bool:
:param identifier: The identifier to test.
:return: True if the identifier is valid.
"""
- pass
+ return identifier.isidentifier() and not keyword.iskeyword(identifier)
KeyType = TypeVar('KeyType')
@@ -53,7 +54,18 @@ def unique_list(input_list: (list[T] | tuple[T, ...]), *, name_factory:
We update the list if another one with the same name is set
(e.g. model validator overridden in subclass).
"""
- pass
+ result = []
+ seen = set()
+ for item in input_list:
+ name = name_factory(item)
+ if name not in seen:
+ seen.add(name)
+ result.append(item)
+ else:
+ # Update existing item with the same name
+ index = next(i for i, x in enumerate(result) if name_factory(x) == name)
+ result[index] = item
+ return result
class ValueItems(_repr.Representation):
@@ -72,21 +84,23 @@ class ValueItems(_repr.Representation):
:param item: key or index of a value
"""
- pass
+ return item in self._items and self._items[item] is False
def is_included(self, item: Any) ->bool:
"""Check if value is contained in self._items.
:param item: key or index of value
"""
- pass
+ return item in self._items and self._items[item] is not False
def for_element(self, e: (int | str)) ->(AbstractSetIntStr |
MappingIntStrAny | None):
""":param e: key or index of element on value
:return: raw values for element if self._items is dict and contain needed element
"""
- pass
+ if isinstance(self._items, Mapping):
+ return self._items.get(e)
+ return None
def _normalize_indexes(self, items: MappingIntStrAny, v_length: int
) ->dict[int | str, Any]:
@@ -98,7 +112,18 @@ class ValueItems(_repr.Representation):
>>> self._normalize_indexes({'__all__': True}, 4)
{0: True, 1: True, 2: True, 3: True}
"""
- pass
+ if '__all__' in items:
+ return {i: True for i in range(v_length)}
+ normalized = {}
+ for i, v in items.items():
+ if isinstance(i, int):
+ if i < 0:
+ i += v_length
+ if 0 <= i < v_length:
+ normalized[i] = v
+ else:
+ normalized[i] = v
+ return normalized
@classmethod
def merge(cls, base: Any, override: Any, intersect: bool=False) ->Any:
@@ -115,7 +140,36 @@ class ValueItems(_repr.Representation):
set to `False` (default) and on the intersection of keys if
`intersect` is set to `True`.
"""
- pass
+ base = cls._items_to_dict(base)
+ override = cls._items_to_dict(override)
+
+ if intersect:
+ keys = set(base.keys()) & set(override.keys())
+ else:
+ keys = set(base.keys()) | set(override.keys())
+
+ merged = {}
+ for k in keys:
+ if k in base and k in override:
+ if isinstance(base[k], dict) and isinstance(override[k], dict):
+ merged[k] = cls.merge(base[k], override[k], intersect)
+ else:
+ merged[k] = override[k]
+ elif k in base:
+ merged[k] = base[k]
+ else:
+ merged[k] = override[k]
+
+ return merged
+
+ @staticmethod
+ def _items_to_dict(items: Any) ->dict:
+ if isinstance(items, dict):
+ return items
+ elif isinstance(items, (set, frozenset)):
+ return {k: ... for k in items}
+ else:
+ return {}
def __repr_args__(self) ->_repr.ReprArgs:
return [(None, self._items)]
@@ -146,7 +200,12 @@ def smart_deepcopy(obj: Obj) ->Obj:
Use obj.copy() for built-in empty collections
Use copy.deepcopy() for non-empty collections and unknown objects.
"""
- pass
+ if type(obj) in IMMUTABLE_NON_COLLECTIONS_TYPES:
+ return obj
+ if type(obj) in BUILTIN_COLLECTIONS:
+ if not obj: # Empty collection
+ return obj.copy()
+ return deepcopy(obj)
_SENTINEL = object()
@@ -162,7 +221,7 @@ def all_identical(left: typing.Iterable[Any], right: typing.Iterable[Any]
>>> all_identical([a, b, [a]], [a, b, [a]]) # new list object, while "equal" is not "identical"
False
"""
- pass
+ return all(l is r for l, r in zip_longest(left, right, fillvalue=_SENTINEL))
@dataclasses.dataclass(frozen=True)
diff --git a/pydantic/_internal/_validators.py b/pydantic/_internal/_validators.py
index 2f7d67f4..bbc80f7e 100644
--- a/pydantic/_internal/_validators.py
+++ b/pydantic/_internal/_validators.py
@@ -15,7 +15,7 @@ from pydantic_core._pydantic_core import PydanticKnownError
def sequence_validator(input_value: typing.Sequence[Any], /, validator:
core_schema.ValidatorFunctionWrapHandler) ->typing.Sequence[Any]:
"""Validator for `Sequence` types, isinstance(v, Sequence) has already been called."""
- pass
+ return [validator(item) for item in input_value]
def _import_string_logic(dotted_path: str) ->Any:
@@ -36,7 +36,27 @@ def _import_string_logic(dotted_path: str) ->Any:
* the substring of `dotted_path` before the colon is not a valid module in the environment (e.g., '123:Mapping')
* the substring of `dotted_path` after the colon is not an attribute of the module (e.g., 'collections:abc123')
"""
- pass
+ if dotted_path.count(':') > 1:
+ raise ValueError("Invalid dotted path: too many colons")
+
+ if ':' in dotted_path:
+ module_path, attribute = dotted_path.split(':')
+ else:
+ module_path, attribute = dotted_path, None
+
+ try:
+ module = __import__(module_path, fromlist=['__trash'])
+ for chunk in module_path.split('.')[1:]:
+ module = getattr(module, chunk)
+ except ImportError:
+ raise ValueError(f"Unable to import module: {module_path}")
+
+ if attribute:
+ try:
+ return getattr(module, attribute)
+ except AttributeError:
+ raise ValueError(f"Attribute {attribute} not found in module {module_path}")
+ return module
PatternType = typing.TypeVar('PatternType', str, bytes)
@@ -48,7 +68,13 @@ def ip_v4_network_validator(input_value: Any, /) ->IPv4Network:
See more:
https://docs.python.org/library/ipaddress.html#ipaddress.IPv4Network
"""
- pass
+ try:
+ return IPv4Network(input_value)
+ except ValueError:
+ raise PydanticCustomError(
+ 'ip_v4_network',
+ 'Input is not a valid IPv4 network address'
+ )
def ip_v6_network_validator(input_value: Any, /) ->IPv6Network:
@@ -57,7 +83,13 @@ def ip_v6_network_validator(input_value: Any, /) ->IPv6Network:
See more:
https://docs.python.org/library/ipaddress.html#ipaddress.IPv6Network
"""
- pass
+ try:
+ return IPv6Network(input_value)
+ except ValueError:
+ raise PydanticCustomError(
+ 'ip_v6_network',
+ 'Input is not a valid IPv6 network address'
+ )
_CONSTRAINT_TO_VALIDATOR_MAP: dict[str, Callable] = {'gt':
@@ -69,4 +101,7 @@ _CONSTRAINT_TO_VALIDATOR_MAP: dict[str, Callable] = {'gt':
def get_constraint_validator(constraint: str) ->Callable:
"""Fetch the validator function for the given constraint."""
- pass
+ validator = _CONSTRAINT_TO_VALIDATOR_MAP.get(constraint)
+ if validator is None:
+ raise ValueError(f"Unknown constraint: {constraint}")
+ return validator
diff --git a/pydantic/_migration.py b/pydantic/_migration.py
index ef115aae..942a3bfa 100644
--- a/pydantic/_migration.py
+++ b/pydantic/_migration.py
@@ -156,7 +156,7 @@ REMOVED_IN_V2 = {'pydantic:ConstrainedBytes', 'pydantic:ConstrainedDate',
'pydantic:validate_model'}
-def getattr_migration(module: str) ->Callable[[str], Any]:
+def getattr_migration(module: str) -> Callable[[str], Any]:
"""Implement PEP 562 for objects that were either moved or removed on the migration
to V2.
@@ -166,4 +166,35 @@ def getattr_migration(module: str) ->Callable[[str], Any]:
Returns:
A callable that will raise an error if the object is not found.
"""
- pass
+ def __getattr__(name: str) -> Any:
+ full_name = f"{module}:{name}"
+ if full_name in MOVED_IN_V2:
+ new_location = MOVED_IN_V2[full_name]
+ new_module, new_name = new_location.split(':')
+ import_path = f"from {new_module} import {new_name} as moved_obj"
+ exec(import_path)
+ return locals()['moved_obj']
+ elif full_name in DEPRECATED_MOVED_IN_V2:
+ new_location = DEPRECATED_MOVED_IN_V2[full_name]
+ new_module, new_name = new_location.split(':')
+ import_path = f"from {new_module} import {new_name} as moved_obj"
+ exec(import_path)
+ import warnings
+ warnings.warn(f"{full_name} is deprecated, use {new_location} instead", DeprecationWarning, stacklevel=2)
+ return locals()['moved_obj']
+ elif full_name in REDIRECT_TO_V1:
+ new_location = REDIRECT_TO_V1[full_name]
+ new_module, new_name = new_location.split(':')
+ import_path = f"from {new_module} import {new_name} as moved_obj"
+ exec(import_path)
+ return locals()['moved_obj']
+ elif full_name in REMOVED_IN_V2:
+ raise AttributeError(
+ f"{full_name} was removed in Pydantic V2. "
+ f"Check the migration guide for more information: "
+ f"https://docs.pydantic.dev/latest/migration/"
+ )
+ else:
+ raise AttributeError(f"module '{module}' has no attribute '{name}'")
+
+ return __getattr__
diff --git a/pydantic/alias_generators.py b/pydantic/alias_generators.py
index c68d768b..1667bd0c 100644
--- a/pydantic/alias_generators.py
+++ b/pydantic/alias_generators.py
@@ -12,7 +12,7 @@ def to_pascal(snake: str) ->str:
Returns:
The PascalCase string.
"""
- pass
+ return ''.join(word.capitalize() for word in snake.split('_'))
def to_camel(snake: str) ->str:
@@ -24,7 +24,8 @@ def to_camel(snake: str) ->str:
Returns:
The converted camelCase string.
"""
- pass
+ words = snake.split('_')
+ return words[0] + ''.join(word.capitalize() for word in words[1:])
def to_snake(camel: str) ->str:
@@ -36,4 +37,10 @@ def to_snake(camel: str) ->str:
Returns:
The converted string in snake_case.
"""
- pass
+ # Handle kebab-case first
+ if '-' in camel:
+ return camel.replace('-', '_').lower()
+
+ # Handle PascalCase and camelCase
+ pattern = re.compile(r'(?<!^)(?=[A-Z])')
+ return pattern.sub('_', camel).lower()
diff --git a/pydantic/aliases.py b/pydantic/aliases.py
index a6947b0c..7141b03c 100644
--- a/pydantic/aliases.py
+++ b/pydantic/aliases.py
@@ -27,7 +27,7 @@ class AliasPath:
Returns:
The list of aliases.
"""
- pass
+ return self.path
def search_dict_for_path(self, d: dict) ->Any:
"""Searches a dictionary for the path specified by the alias.
@@ -35,7 +35,13 @@ class AliasPath:
Returns:
The value at the specified path, or `PydanticUndefined` if the path is not found.
"""
- pass
+ current = d
+ for key in self.path:
+ if isinstance(current, dict) and key in current:
+ current = current[key]
+ else:
+ return PydanticUndefined
+ return current
@dataclasses.dataclass(**_internal_dataclass.slots_true)
@@ -59,7 +65,10 @@ class AliasChoices:
Returns:
The list of aliases.
"""
- pass
+ return [
+ [choice] if isinstance(choice, str) else choice.convert_to_aliases()
+ for choice in self.choices
+ ]
@dataclasses.dataclass(**_internal_dataclass.slots_true)
@@ -87,7 +96,14 @@ class AliasGenerator:
Raises:
TypeError: If the alias generator produces an invalid type.
"""
- pass
+ generator = getattr(self, alias_kind)
+ if generator is None:
+ return None
+
+ alias = generator(field_name)
+ if not isinstance(alias, allowed_types):
+ raise TypeError(f"{alias_kind} must be one of {allowed_types}, not {type(alias)}")
+ return alias
def generate_aliases(self, field_name: str) ->tuple[str | None, str |
AliasPath | AliasChoices | None, str | None]:
@@ -96,4 +112,7 @@ class AliasGenerator:
Returns:
A tuple of three aliases - validation, alias, and serialization.
"""
- pass
+ alias = self._generate_alias('alias', (str,), field_name)
+ validation_alias = self._generate_alias('validation_alias', (str, AliasPath, AliasChoices), field_name)
+ serialization_alias = self._generate_alias('serialization_alias', (str,), field_name)
+ return alias, validation_alias, serialization_alias
diff --git a/pydantic/annotated_handlers.py b/pydantic/annotated_handlers.py
index 92a4df50..3eae10c8 100644
--- a/pydantic/annotated_handlers.py
+++ b/pydantic/annotated_handlers.py
@@ -49,7 +49,12 @@ class GetJsonSchemaHandler:
Returns:
JsonSchemaValue: A JsonSchemaValue that has no `$ref`.
"""
- pass
+ if isinstance(maybe_ref_json_schema, dict) and '$ref' in maybe_ref_json_schema:
+ ref = maybe_ref_json_schema['$ref']
+ # Here we would typically look up the reference in a schema store
+ # For this implementation, we'll raise a LookupError
+ raise LookupError(f"Reference '{ref}' not found")
+ return maybe_ref_json_schema
class GetCoreSchemaHandler:
@@ -83,7 +88,7 @@ class GetCoreSchemaHandler:
Returns:
CoreSchema: The `pydantic-core` CoreSchema generated.
"""
- pass
+ return self.__call__(source_type)
def resolve_ref_schema(self, maybe_ref_schema: core_schema.CoreSchema, /
) ->core_schema.CoreSchema:
@@ -100,13 +105,24 @@ class GetCoreSchemaHandler:
Returns:
A concrete `CoreSchema`.
"""
- pass
+ if isinstance(maybe_ref_schema, dict) and maybe_ref_schema.get('type') == 'definition-ref':
+ ref = maybe_ref_schema.get('schema_ref')
+ if ref is None:
+ raise LookupError("Invalid definition-ref schema: missing 'schema_ref'")
+ # Here we would typically look up the reference in a schema store
+ # For this implementation, we'll raise a LookupError
+ raise LookupError(f"Reference '{ref}' not found")
+ return maybe_ref_schema
@property
def field_name(self) ->(str | None):
"""Get the name of the closest field to this validator."""
- pass
+ # This is a placeholder implementation. In a real scenario,
+ # this would likely be set during the schema generation process.
+ return getattr(self, '_field_name', None)
def _get_types_namespace(self) ->(dict[str, Any] | None):
"""Internal method used during type resolution for serializer annotations."""
- pass
+ # This is a placeholder implementation. In a real scenario,
+ # this would return a dictionary of types used in the current context.
+ return getattr(self, '_types_namespace', None)
diff --git a/pydantic/color.py b/pydantic/color.py
index dae7ca3d..6548a1fc 100644
--- a/pydantic/color.py
+++ b/pydantic/color.py
@@ -99,7 +99,7 @@ class Color(_repr.Representation):
def original(self) ->ColorType:
"""Original value passed to `Color`."""
- pass
+ return self._original
def as_named(self, *, fallback: bool=False) ->str:
"""Returns the name of the color if it can be found in `COLORS_BY_VALUE` dictionary,
@@ -115,7 +115,13 @@ class Color(_repr.Representation):
Raises:
ValueError: When no named color is found and fallback is `False`.
"""
- pass
+ rgb = self.as_rgb_tuple(alpha=False)
+ try:
+ return COLORS_BY_VALUE[rgb]
+ except KeyError:
+ if fallback:
+ return self.as_hex()
+ raise ValueError(f'Color {rgb} has no name')
def as_hex(self) ->str:
"""Returns the hexadecimal representation of the color.
@@ -126,11 +132,18 @@ class Color(_repr.Representation):
Returns:
The hexadecimal representation of the color.
"""
- pass
+ rgba = self._rgba
+ if rgba.alpha is None:
+ return f'#{rgba.r:02x}{rgba.g:02x}{rgba.b:02x}'
+ else:
+ return f'#{rgba.r:02x}{rgba.g:02x}{rgba.b:02x}{int(rgba.alpha * 255):02x}'
def as_rgb(self) ->str:
"""Color as an `rgb(<r>, <g>, <b>)` or `rgba(<r>, <g>, <b>, <a>)` string."""
- pass
+ if self._rgba.alpha is None:
+ return f'rgb({self._rgba.r}, {self._rgba.g}, {self._rgba.b})'
+ else:
+ return f'rgba({self._rgba.r}, {self._rgba.g}, {self._rgba.b}, {self._rgba.alpha:.2f})'
def as_rgb_tuple(self, *, alpha: Optional[bool]=None) ->ColorTuple:
"""Returns the color as an RGB or RGBA tuple.
@@ -146,11 +159,20 @@ class Color(_repr.Representation):
A tuple that contains the values of the red, green, and blue channels in the range 0 to 255.
If alpha is included, it is in the range 0 to 1.
"""
- pass
+ if alpha is True:
+ return (self._rgba.r, self._rgba.g, self._rgba.b, self._rgba.alpha if self._rgba.alpha is not None else 1.0)
+ elif alpha is False:
+ return (self._rgba.r, self._rgba.g, self._rgba.b)
+ else:
+ return (self._rgba.r, self._rgba.g, self._rgba.b) if self._rgba.alpha is None else (self._rgba.r, self._rgba.g, self._rgba.b, self._rgba.alpha)
def as_hsl(self) ->str:
"""Color as an `hsl(<h>, <s>, <l>)` or `hsl(<h>, <s>, <l>, <a>)` string."""
- pass
+ h, s, l = self.as_hsl_tuple(alpha=False)
+ if self._rgba.alpha is None:
+ return f'hsl({h:.0f}, {s:.0%}, {l:.0%})'
+ else:
+ return f'hsla({h:.0f}, {s:.0%}, {l:.0%}, {self._rgba.alpha:.2f})'
def as_hsl_tuple(self, *, alpha: Optional[bool]=None) ->HslColorTuple:
"""Returns the color as an HSL or HSLA tuple.
@@ -169,7 +191,16 @@ class Color(_repr.Representation):
Note:
This is HSL as used in HTML and most other places, not HLS as used in Python's `colorsys`.
"""
- pass
+ r, g, b = self._rgba.r / 255, self._rgba.g / 255, self._rgba.b / 255
+ h, l, s = rgb_to_hls(r, g, b)
+ hsl = (h * 360 % 360, s, l)
+
+ if alpha is True:
+ return (*hsl, self._rgba.alpha if self._rgba.alpha is not None else 1.0)
+ elif alpha is False:
+ return hsl
+ else:
+ return hsl if self._rgba.alpha is None else (*hsl, self._rgba.alpha)
@classmethod
def __get_pydantic_core_schema__(cls, source: Type[Any], handler:
@@ -204,7 +235,14 @@ def parse_tuple(value: Tuple[Any, ...]) ->RGBA:
Raises:
PydanticCustomError: If tuple is not valid.
"""
- pass
+ if len(value) == 3:
+ r, g, b = value
+ return ints_to_rgba(r, g, b)
+ elif len(value) == 4:
+ r, g, b, a = value
+ return ints_to_rgba(r, g, b, parse_float_alpha(a))
+ else:
+ raise PydanticCustomError('color_error', 'value is not a valid RGB or RGBA tuple')
def parse_str(value: str) ->RGBA:
@@ -227,7 +265,32 @@ def parse_str(value: str) ->RGBA:
Raises:
ValueError: If the input string cannot be parsed to an RGBA tuple.
"""
- pass
+ value = value.strip().lower()
+
+ # Named color
+ if value in COLORS_BY_NAME:
+ return RGBA(*COLORS_BY_NAME[value], None)
+
+ # Hex color
+ if value.startswith(('#', '0x')):
+ value = value[2:]
+ if len(value) in (3, 4):
+ value = ''.join(2 * c for c in value)
+ if len(value) == 6:
+ rgb = int(value, 16)
+ return RGBA(rgb >> 16, (rgb >> 8) & 0xFF, rgb & 0xFF, None)
+ elif len(value) == 8:
+ rgba = int(value, 16)
+ return RGBA(rgba >> 24, (rgba >> 16) & 0xFF, (rgba >> 8) & 0xFF, (rgba & 0xFF) / 255)
+
+ # RGB(A) color
+ match = re.match(r'rgba?\((\d+),\s*(\d+),\s*(\d+)(?:,\s*([0-9.]+))?\)', value)
+ if match:
+ r, g, b = map(int, match.group(1, 2, 3))
+ a = float(match.group(4)) if match.group(4) else None
+ return RGBA(r, g, b, a)
+
+ raise ValueError(f'Could not parse color string "{value}"')
def ints_to_rgba(r: Union[int, str], g: Union[int, str], b: Union[int, str],
diff --git a/pydantic/config.py b/pydantic/config.py
index 24abaf45..2de4b72b 100644
--- a/pydantic/config.py
+++ b/pydantic/config.py
@@ -978,7 +978,10 @@ def with_config(config: ConfigDict) ->Callable[[_TypeT], _TypeT]:
#> {'x': 'abc'}
```
"""
- pass
+ def decorator(cls: _TypeT) -> _TypeT:
+ setattr(cls, '__pydantic_config__', config)
+ return cls
+ return decorator
__getattr__ = getattr_migration(__name__)
diff --git a/pydantic/dataclasses.py b/pydantic/dataclasses.py
index cb2e2160..4cba6f43 100644
--- a/pydantic/dataclasses.py
+++ b/pydantic/dataclasses.py
@@ -56,7 +56,26 @@ def dataclass(_cls: (type[_T] | None)=None, *, init: Literal[False]=False,
Raises:
AssertionError: Raised if `init` is not `False` or `validate_on_init` is `False`.
"""
- pass
+ assert init is False, "The 'init' parameter must be False for Pydantic dataclasses"
+ assert validate_on_init is not False, "The 'validate_on_init' parameter cannot be False"
+
+ def wrap(cls: type[_T]) -> type[PydanticDataclass]:
+ return _pydantic_dataclasses.create_pydantic_dataclass(
+ cls,
+ config=config,
+ repr=repr,
+ eq=eq,
+ order=order,
+ unsafe_hash=unsafe_hash,
+ frozen=frozen,
+ kw_only=kw_only,
+ slots=slots,
+ )
+
+ if _cls is None:
+ return wrap
+
+ return wrap(_cls)
__getattr__ = getattr_migration(__name__)
@@ -92,7 +111,22 @@ def rebuild_dataclass(cls: type[PydanticDataclass], *, force: bool=False,
Returns `None` if the schema is already "complete" and rebuilding was not required.
If rebuilding _was_ required, returns `True` if rebuilding was successful, otherwise `False`.
"""
- pass
+ if not force and getattr(cls, '__pydantic_complete__', False):
+ return None
+
+ try:
+ _pydantic_dataclasses.rebuild_dataclass(
+ cls,
+ force=force,
+ raise_errors=raise_errors,
+ _parent_namespace_depth=_parent_namespace_depth + 1,
+ _types_namespace=_types_namespace,
+ )
+ return True
+ except Exception as e:
+ if raise_errors:
+ raise
+ return False
def is_pydantic_dataclass(class_: type[Any], /) ->TypeGuard[type[
@@ -105,4 +139,4 @@ def is_pydantic_dataclass(class_: type[Any], /) ->TypeGuard[type[
Returns:
`True` if the class is a pydantic dataclass, `False` otherwise.
"""
- pass
+ return isinstance(class_, type) and hasattr(class_, '__pydantic_dataclass__')
diff --git a/pydantic/deprecated/class_validators.py b/pydantic/deprecated/class_validators.py
index 5ece7422..7b9b98e5 100644
--- a/pydantic/deprecated/class_validators.py
+++ b/pydantic/deprecated/class_validators.py
@@ -100,7 +100,15 @@ def validator(__field: str, *fields: str, pre: bool=False, each_item: bool=
Callable: A decorator that can be used to decorate a
function to be used as a validator.
"""
- pass
+ if allow_reuse:
+ warn(_ALLOW_REUSE_WARNING_MESSAGE, DeprecationWarning, stacklevel=2)
+
+ def decorator(f: _V1ValidatorType) -> _V1ValidatorType:
+ _decorators.deprecated_validator(__field, *fields, pre=pre, each_item=each_item,
+ always=always, check_fields=check_fields)(f)
+ return f
+
+ return decorator
@deprecated(
@@ -122,4 +130,14 @@ def root_validator(*__args, pre: bool=False, skip_on_failure: bool=False,
Returns:
Any: A decorator that can be used to decorate a function to be used as a root_validator.
"""
- pass
+ if allow_reuse:
+ warn(_ALLOW_REUSE_WARNING_MESSAGE, DeprecationWarning, stacklevel=2)
+
+ def decorator(f: _V1RootValidatorFunctionType) -> _V1RootValidatorFunctionType:
+ _decorators.deprecated_root_validator(pre=pre, skip_on_failure=skip_on_failure)(f)
+ return f
+
+ # Support using it as `@root_validator` or `@root_validator()`
+ if __args and callable(__args[0]):
+ return decorator(__args[0])
+ return decorator
diff --git a/pydantic/deprecated/decorator.py b/pydantic/deprecated/decorator.py
index 6be076d9..b6043a11 100644
--- a/pydantic/deprecated/decorator.py
+++ b/pydantic/deprecated/decorator.py
@@ -23,7 +23,22 @@ if TYPE_CHECKING:
def validate_arguments(func: Optional['AnyCallableT']=None, *, config:
'ConfigType'=None) ->Any:
"""Decorator to validate the arguments passed to a function."""
- pass
+ def decorator(f: AnyCallableT) -> AnyCallableT:
+ validated_func = ValidatedFunction(f, config)
+
+ @wraps(f)
+ def wrapper(*args: Any, **kwargs: Any) -> Any:
+ validated_args, validated_kwargs = validated_func.validate(*args, **kwargs)
+ return f(*validated_args, **validated_kwargs)
+
+ wrapper.vd = validated_func # type: ignore
+ wrapper.raw_function = f # type: ignore
+ wrapper.model = validated_func.model # type: ignore
+ return wrapper # type: ignore
+
+ if func:
+ return decorator(func)
+ return decorator
ALT_V_ARGS = 'v__args'
@@ -87,3 +102,72 @@ class ValidatedFunction:
if not takes_kwargs:
fields[self.v_kwargs_name] = Dict[Any, Any], None
self.create_model(fields, takes_args, takes_kwargs, config)
+
+ def create_model(self, fields: Dict[str, Tuple[Any, Any]], takes_args: bool, takes_kwargs: bool, config: 'ConfigType'):
+ class Config:
+ arbitrary_types_allowed = True
+ extra = 'forbid'
+
+ if config is not None:
+ if not isinstance(config, dict):
+ config = {k: v for k, v in config.__dict__.items() if not k.startswith('__')}
+ Config = type('Config', (), {**Config.__dict__, **config})
+
+ class DecoratorBaseModel(BaseModel):
+ @field_validator(self.v_args_name, self.v_kwargs_name, mode='before')
+ @classmethod
+ def check_args_kwargs(cls, v: Any, field: 'ModelField') -> Any:
+ if field.name == self.v_args_name and not takes_args and v:
+ raise ValueError('Function does not take *args')
+ if field.name == self.v_kwargs_name and not takes_kwargs and v:
+ raise ValueError('Function does not take **kwargs')
+ return v
+
+ self.model = create_model(
+ to_pascal(self.raw_function.__name__),
+ __config__=Config,
+ __base__=DecoratorBaseModel,
+ **fields
+ )
+
+ def validate(self, *args: Any, **kwargs: Any) -> Tuple[Tuple[Any, ...], Dict[str, Any]]:
+ values = self.model(**self.build_values(args, kwargs))
+ return self.parse_values(values)
+
+ def build_values(self, args: Tuple[Any, ...], kwargs: Dict[str, Any]) -> Dict[str, Any]:
+ values: Dict[str, Any] = {}
+ v_kwargs = {}
+
+ for i, arg in enumerate(args):
+ name = self.arg_mapping.get(i)
+ if name:
+ values[name] = arg
+ else:
+ values.setdefault(self.v_args_name, []).append(arg)
+
+ for name, value in kwargs.items():
+ if name in self.model.__fields__:
+ if name in values:
+ values.setdefault(V_DUPLICATE_KWARGS, []).append(name)
+ values[name] = value
+ else:
+ v_kwargs[name] = value
+
+ if v_kwargs:
+ values[self.v_kwargs_name] = v_kwargs
+
+ return values
+
+ def parse_values(self, values: Any) -> Tuple[Tuple[Any, ...], Dict[str, Any]]:
+ args = []
+ kwargs = {}
+ v_args = values.__dict__.pop(self.v_args_name, ())
+ v_kwargs = values.__dict__.pop(self.v_kwargs_name, {})
+
+ for i, name in self.arg_mapping.items():
+ if name in self.positional_only_args:
+ args.append(values.__dict__.get(name))
+ else:
+ kwargs[name] = values.__dict__.get(name)
+
+ return tuple(args + list(v_args)), {**kwargs, **v_kwargs}
diff --git a/pydantic/deprecated/json.py b/pydantic/deprecated/json.py
index 062beef9..aa42f16f 100644
--- a/pydantic/deprecated/json.py
+++ b/pydantic/deprecated/json.py
@@ -33,7 +33,10 @@ def decimal_encoder(dec_value: Decimal) ->Union[int, float]:
>>> decimal_encoder(Decimal("1"))
1
"""
- pass
+ if dec_value.as_tuple().exponent == 0:
+ return int(dec_value)
+ else:
+ return float(dec_value)
ENCODERS_BY_TYPE: Dict[Type[Any], Callable[[Any], Any]] = {bytes: lambda o:
@@ -49,4 +52,16 @@ ENCODERS_BY_TYPE: Dict[Type[Any], Callable[[Any], Any]] = {bytes: lambda o:
@deprecated('`timedelta_isoformat` is deprecated.', category=None)
def timedelta_isoformat(td: datetime.timedelta) ->str:
"""ISO 8601 encoding for Python timedelta object."""
- pass
+ total_seconds = td.total_seconds()
+ hours, remainder = divmod(total_seconds, 3600)
+ minutes, seconds = divmod(remainder, 60)
+
+ hours = int(hours)
+ minutes = int(minutes)
+ seconds = int(seconds)
+ microseconds = int((total_seconds - int(total_seconds)) * 1e6)
+
+ if microseconds:
+ return f"P{hours}H{minutes}M{seconds}.{microseconds:06d}S"
+ else:
+ return f"P{hours}H{minutes}M{seconds}S"
diff --git a/pydantic/deprecated/tools.py b/pydantic/deprecated/tools.py
index 8bd38299..2404d50d 100644
--- a/pydantic/deprecated/tools.py
+++ b/pydantic/deprecated/tools.py
@@ -20,7 +20,18 @@ def schema_of(type_: Any, *, title: (NameFactory | None)=None, by_alias:
bool=True, ref_template: str=DEFAULT_REF_TEMPLATE, schema_generator:
type[GenerateJsonSchema]=GenerateJsonSchema) ->dict[str, Any]:
"""Generate a JSON schema (as dict) for the passed model or dynamically generated one."""
- pass
+ warnings.warn(
+ '`schema_of` is deprecated. Use `pydantic.TypeAdapter.json_schema` instead.',
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ type_adapter = TypeAdapter(type_)
+ return type_adapter.json_schema(
+ by_alias=by_alias,
+ ref_template=ref_template,
+ schema_generator=schema_generator,
+ title=title(type_) if callable(title) else title,
+ )
@deprecated(
@@ -31,4 +42,16 @@ def schema_json_of(type_: Any, *, title: (NameFactory | None)=None,
schema_generator: type[GenerateJsonSchema]=GenerateJsonSchema, **
dumps_kwargs: Any) ->str:
"""Generate a JSON schema (as JSON) for the passed model or dynamically generated one."""
- pass
+ warnings.warn(
+ '`schema_json_of` is deprecated. Use `pydantic.TypeAdapter.json_schema` instead.',
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ schema = schema_of(
+ type_,
+ title=title,
+ by_alias=by_alias,
+ ref_template=ref_template,
+ schema_generator=schema_generator,
+ )
+ return json.dumps(schema, **dumps_kwargs)
diff --git a/pydantic/errors.py b/pydantic/errors.py
index 3a0976c3..9ba6e4b3 100644
--- a/pydantic/errors.py
+++ b/pydantic/errors.py
@@ -79,7 +79,15 @@ class PydanticUndefinedAnnotation(PydanticErrorMixin, NameError):
Returns:
Converted `PydanticUndefinedAnnotation` error.
"""
- pass
+ error_message = str(name_error)
+ match = re.search(r"name '(\w+)' is not defined", error_message)
+ if match:
+ name = match.group(1)
+ message = f"Annotation '{name}' is not defined."
+ else:
+ name = "unknown"
+ message = f"Undefined annotation: {error_message}"
+ return cls(name=name, message=message)
class PydanticImportError(PydanticErrorMixin, ImportError):
diff --git a/pydantic/experimental/pipeline.py b/pydantic/experimental/pipeline.py
index 253728a3..5da03886 100644
--- a/pydantic/experimental/pipeline.py
+++ b/pydantic/experimental/pipeline.py
@@ -107,7 +107,7 @@ class _Pipeline(Generic[_InT, _OutT]):
If used as the first step in a pipeline, the type of the field is used.
That is, the transformation is applied to after the value is parsed to the field's type.
"""
- pass
+ return _Pipeline(self._steps + (_Transform(func),))
def validate_as(self, tp: (type[_NewOutT] | EllipsisType), *, strict:
bool=False) ->_Pipeline[_InT, Any]:
@@ -118,7 +118,9 @@ class _Pipeline(Generic[_InT, _OutT]):
Types are parsed in Pydantic's `lax` mode by default,
but you can enable `strict` mode by passing `strict=True`.
"""
- pass
+ if tp is Ellipsis:
+ tp = _FieldTypeMarker
+ return _Pipeline(self._steps + (_ValidateAs(tp, strict),))
def validate_as_deferred(self, func: Callable[[], type[_NewOutT]]
) ->_Pipeline[_InT, _NewOutT]:
@@ -127,7 +129,7 @@ class _Pipeline(Generic[_InT, _OutT]):
This is useful when you need to reference the class in it's own type annotations.
"""
- pass
+ return _Pipeline(self._steps + (_ValidateAsDefer(func),))
def constrain(self, constraint: _ConstraintAnnotation) ->Any:
"""Constrain a value to meet a certain condition.
@@ -137,73 +139,73 @@ class _Pipeline(Generic[_InT, _OutT]):
Most of the time you'll be calling a shortcut method like `gt`, `lt`, `len`, etc
so you don't need to call this directly.
"""
- pass
+ return _Pipeline(self._steps + (_Constraint(constraint),))
def predicate(self: _Pipeline[_InT, _NewOutT], func: Callable[[_NewOutT
], bool]) ->_Pipeline[_InT, _NewOutT]:
"""Constrain a value to meet a certain predicate."""
- pass
+ return self.constrain(annotated_types.Predicate(func))
def gt(self: _Pipeline[_InT, _NewOutGt], gt: _NewOutGt) ->_Pipeline[
_InT, _NewOutGt]:
"""Constrain a value to be greater than a certain value."""
- pass
+ return self.constrain(annotated_types.Gt(gt))
def lt(self: _Pipeline[_InT, _NewOutLt], lt: _NewOutLt) ->_Pipeline[
_InT, _NewOutLt]:
"""Constrain a value to be less than a certain value."""
- pass
+ return self.constrain(annotated_types.Lt(lt))
def ge(self: _Pipeline[_InT, _NewOutGe], ge: _NewOutGe) ->_Pipeline[
_InT, _NewOutGe]:
"""Constrain a value to be greater than or equal to a certain value."""
- pass
+ return self.constrain(annotated_types.Ge(ge))
def le(self: _Pipeline[_InT, _NewOutLe], le: _NewOutLe) ->_Pipeline[
_InT, _NewOutLe]:
"""Constrain a value to be less than or equal to a certain value."""
- pass
+ return self.constrain(annotated_types.Le(le))
def len(self: _Pipeline[_InT, _NewOutLen], min_len: int, max_len: (int |
None)=None) ->_Pipeline[_InT, _NewOutLen]:
"""Constrain a value to have a certain length."""
- pass
+ return self.constrain(annotated_types.Len(min_len, max_len))
def multiple_of(self: _Pipeline[_InT, Any], multiple_of: Any) ->_Pipeline[
_InT, Any]:
"""Constrain a value to be a multiple of a certain number."""
- pass
+ return self.constrain(annotated_types.MultipleOf(multiple_of))
def eq(self: _Pipeline[_InT, _OutT], value: _OutT) ->_Pipeline[_InT, _OutT
]:
"""Constrain a value to be equal to a certain value."""
- pass
+ return self.constrain(_Eq(value))
def not_eq(self: _Pipeline[_InT, _OutT], value: _OutT) ->_Pipeline[_InT,
_OutT]:
"""Constrain a value to not be equal to a certain value."""
- pass
+ return self.constrain(_NotEq(value))
def in_(self: _Pipeline[_InT, _OutT], values: Container[_OutT]
) ->_Pipeline[_InT, _OutT]:
"""Constrain a value to be in a certain set."""
- pass
+ return self.constrain(_In(values))
def not_in(self: _Pipeline[_InT, _OutT], values: Container[_OutT]
) ->_Pipeline[_InT, _OutT]:
"""Constrain a value to not be in a certain set."""
- pass
+ return self.constrain(_NotIn(values))
def otherwise(self, other: _Pipeline[_OtherIn, _OtherOut]) ->_Pipeline[
_InT | _OtherIn, _OutT | _OtherOut]:
"""Combine two validation chains, returning the result of the first chain if it succeeds, and the second chain if it fails."""
- pass
+ return _Pipeline((_PipelineOr(self, other),))
__or__ = otherwise
def then(self, other: _Pipeline[_OutT, _OtherOut]) ->_Pipeline[_InT,
_OtherOut]:
"""Pipe the result of one validation chain into another."""
- pass
+ return _Pipeline((_PipelineAnd(self, other),))
__and__ = then
def __get_pydantic_core_schema__(self, source_type: Any, handler:
@@ -229,7 +231,29 @@ transform = _Pipeline[Any, Any]((_ValidateAs(_FieldTypeMarker),)).transform
def _apply_constraint(s: (cs.CoreSchema | None), constraint:
_ConstraintAnnotation) ->cs.CoreSchema:
"""Apply a single constraint to a schema."""
- pass
+ from pydantic_core import core_schema as cs
+
+ if s is None:
+ s = cs.any_schema()
+
+ if isinstance(constraint, (annotated_types.Gt, annotated_types.Ge, annotated_types.Lt, annotated_types.Le)):
+ return cs.with_info_after_validator_function(s, operator.attrgetter(constraint.__class__.__name__.lower())(constraint))
+ elif isinstance(constraint, annotated_types.MultipleOf):
+ return cs.with_info_after_validator_function(s, lambda v: v % constraint.multiple_of == 0)
+ elif isinstance(constraint, annotated_types.Len):
+ return cs.with_info_after_validator_function(s, lambda v: constraint.min_length <= len(v) <= (constraint.max_length or float('inf')))
+ elif isinstance(constraint, annotated_types.Predicate):
+ return cs.with_info_after_validator_function(s, constraint.func)
+ elif isinstance(constraint, (_Eq, _NotEq)):
+ op = operator.eq if isinstance(constraint, _Eq) else operator.ne
+ return cs.with_info_after_validator_function(s, lambda v: op(v, constraint.value))
+ elif isinstance(constraint, (_In, _NotIn)):
+ op = operator.contains if isinstance(constraint, _In) else lambda c, v: v not in c
+ return cs.with_info_after_validator_function(s, lambda v: op(constraint.values, v))
+ elif isinstance(constraint, Pattern):
+ return cs.with_info_after_validator_function(s, lambda v: bool(constraint.match(v)))
+ else:
+ raise ValueError(f"Unsupported constraint: {constraint}")
class _SupportsRange(annotated_types.SupportsLe, annotated_types.SupportsGe,
diff --git a/pydantic/fields.py b/pydantic/fields.py
index 6b34290a..2ba54aec 100644
--- a/pydantic/fields.py
+++ b/pydantic/fields.py
@@ -529,7 +529,27 @@ def Field(default: Any=PydanticUndefined, *, default_factory: (typing.
A new [`FieldInfo`][pydantic.fields.FieldInfo]. The return annotation is `Any` so `Field` can be used on
type-annotated fields without causing a type error.
"""
- pass
+ field_info_kwargs = {
+ k: v for k, v in locals().items()
+ if k not in {'default', 'default_factory', 'extra'} and v is not _Unset
+ }
+ if extra:
+ warnings.warn(
+ 'The `extra` kwargs is deprecated. Use `json_schema_extra` instead.',
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ if json_schema_extra is _Unset:
+ field_info_kwargs['json_schema_extra'] = extra
+ else:
+ raise ValueError('Cannot specify both `extra` and `json_schema_extra`')
+
+ if default is not PydanticUndefined:
+ field_info_kwargs['default'] = default
+ if default_factory is not _Unset:
+ field_info_kwargs['default_factory'] = default_factory
+
+ return FieldInfo(**field_info_kwargs)
_FIELD_ARG_NAMES = set(inspect.signature(Field).parameters)
@@ -616,7 +636,13 @@ def PrivateAttr(default: Any=PydanticUndefined, *, default_factory: (typing
Raises:
ValueError: If both `default` and `default_factory` are set.
"""
- pass
+ if default is not PydanticUndefined and default_factory is not None:
+ raise ValueError('cannot specify both default and default_factory')
+
+ if init is not False:
+ raise ValueError('init must be False for private attributes')
+
+ return ModelPrivateAttr(default=default, default_factory=default_factory)
@dataclasses.dataclass(**_internal_dataclass.slots_true)
@@ -816,4 +842,34 @@ def computed_field(func: (PropertyT | None)=None, /, *, alias: (str | None)
Returns:
A proxy wrapper for the property.
"""
- pass
+ def wrapper(func: PropertyT) -> PropertyT:
+ if not isinstance(func, (property, cached_property)):
+ func = property(func)
+
+ is_private = _wrapped_property_is_private(func)
+ if repr is None:
+ repr_value = not is_private
+ else:
+ repr_value = repr
+
+ computed_field_info = ComputedFieldInfo(
+ wrapped_property=func,
+ return_type=return_type,
+ alias=alias,
+ alias_priority=alias_priority,
+ title=title,
+ field_title_generator=field_title_generator,
+ description=description or func.__doc__,
+ deprecated=deprecated,
+ examples=examples,
+ json_schema_extra=json_schema_extra,
+ repr=repr_value
+ )
+
+ setattr(func, '__computed_field__', computed_field_info)
+ return func
+
+ if func is None:
+ return wrapper
+ else:
+ return wrapper(func)
diff --git a/pydantic/functional_serializers.py b/pydantic/functional_serializers.py
index a588b3b5..ab40f04f 100644
--- a/pydantic/functional_serializers.py
+++ b/pydantic/functional_serializers.py
@@ -233,7 +233,24 @@ def field_serializer(*fields: str, mode: Literal['plain', 'wrap']='plain',
Returns:
The decorator function.
"""
- pass
+ def decorator(func: Callable[..., Any]) -> Any:
+ serializer = PlainSerializer(func, return_type, when_used) if mode == 'plain' else WrapSerializer(func, return_type, when_used)
+
+ if check_fields is not None:
+ setattr(serializer, 'check_fields', check_fields)
+
+ if isinstance(func, (classmethod, staticmethod)):
+ func.__func__.__pydantic_serializer__ = serializer
+ for field in fields:
+ func.__func__.__pydantic_serializer_fields__ = getattr(func.__func__, '__pydantic_serializer_fields__', set()) | {field}
+ else:
+ func.__pydantic_serializer__ = serializer
+ for field in fields:
+ func.__pydantic_serializer_fields__ = getattr(func, '__pydantic_serializer_fields__', set()) | {field}
+
+ return func
+
+ return decorator
FuncType = TypeVar('FuncType', bound=Callable[..., Any])
@@ -284,7 +301,20 @@ def model_serializer(f: (Callable[..., Any] | None)=None, /, *, mode:
Returns:
The decorator function.
"""
- pass
+ def decorator(func: Callable[..., Any]) -> Any:
+ serializer = PlainSerializer(func, return_type, when_used) if mode == 'plain' else WrapSerializer(func, return_type, when_used)
+
+ if isinstance(func, (classmethod, staticmethod)):
+ func.__func__.__pydantic_serializer__ = serializer
+ else:
+ func.__pydantic_serializer__ = serializer
+
+ return func
+
+ if f is None:
+ return decorator
+ else:
+ return decorator(f)
AnyType = TypeVar('AnyType')
diff --git a/pydantic/functional_validators.py b/pydantic/functional_validators.py
index 92ed2fbb..9c943b5d 100644
--- a/pydantic/functional_validators.py
+++ b/pydantic/functional_validators.py
@@ -317,7 +317,27 @@ def field_validator(field: str, /, *fields: str, mode: FieldValidatorModes=
- If the args passed to `@field_validator` as fields are not strings.
- If `@field_validator` applied to instance methods.
"""
- pass
+ def decorator(f: Any) -> Any:
+ if isinstance(f, (classmethod, staticmethod)):
+ f = f.__func__
+
+ if not isinstance(f, FunctionType):
+ raise PydanticUserError("@field_validator must be applied to a function")
+
+ if not all(isinstance(field, str) for field in (field, *fields)):
+ raise PydanticUserError("All field arguments to @field_validator must be strings")
+
+ setattr(f, '_pydantic_validator', {
+ 'fields': (field, *fields),
+ 'mode': mode,
+ 'check_fields': check_fields
+ })
+ return f
+
+ if not isinstance(field, str):
+ raise PydanticUserError("@field_validator must be called with at least one field name argument")
+
+ return decorator
_ModelType = TypeVar('_ModelType')
@@ -445,7 +465,16 @@ def model_validator(*, mode: Literal['wrap', 'before', 'after']) ->Any:
Returns:
A decorator that can be used to decorate a function to be used as a model validator.
"""
- pass
+ def decorator(f: Any) -> Any:
+ if not isinstance(f, (FunctionType, classmethod, staticmethod)):
+ raise PydanticUserError("@model_validator must be applied to a function or method")
+
+ setattr(f, '_pydantic_model_validator', {
+ 'mode': mode,
+ })
+ return f
+
+ return decorator
AnyType = TypeVar('AnyType')
diff --git a/pydantic/json_schema.py b/pydantic/json_schema.py
index 32ba228d..91a71a9e 100644
--- a/pydantic/json_schema.py
+++ b/pydantic/json_schema.py
@@ -1078,7 +1078,9 @@ class GenerateJsonSchema:
core_schema: The core schema to get the validations from.
mapping: A mapping from core_schema attribute names to the corresponding JSON schema attribute names.
"""
- pass
+ for core_key, json_key in mapping.items():
+ if core_key in core_schema:
+ json_schema[json_key] = core_schema[core_key]
class ValidationsMapping:
@@ -1100,7 +1102,21 @@ class GenerateJsonSchema:
def get_json_ref_counts(self, json_schema: JsonSchemaValue) ->dict[
JsonRef, int]:
"""Get all values corresponding to the key '$ref' anywhere in the json_schema."""
- pass
+ ref_counts: dict[JsonRef, int] = {}
+
+ def count_refs(schema: Any) -> None:
+ if isinstance(schema, dict):
+ if '$ref' in schema:
+ ref = JsonRef(schema['$ref'])
+ ref_counts[ref] = ref_counts.get(ref, 0) + 1
+ for value in schema.values():
+ count_refs(value)
+ elif isinstance(schema, list):
+ for item in schema:
+ count_refs(item)
+
+ count_refs(json_schema)
+ return ref_counts
def emit_warning(self, kind: JsonSchemaWarningKind, detail: str) ->None:
"""This method simply emits PydanticJsonSchemaWarnings based on handling in the `warning_message` method."""
@@ -1242,7 +1258,20 @@ class Examples:
def _get_all_json_refs(item: Any) ->set[JsonRef]:
"""Get all the definitions references from a JSON schema."""
- pass
+ refs = set()
+
+ def collect_refs(schema: Any) -> None:
+ if isinstance(schema, dict):
+ if '$ref' in schema:
+ refs.add(JsonRef(schema['$ref']))
+ for value in schema.values():
+ collect_refs(value)
+ elif isinstance(schema, list):
+ for sub_item in schema:
+ collect_refs(sub_item)
+
+ collect_refs(item)
+ return refs
AnyType = TypeVar('AnyType')
diff --git a/pydantic/main.py b/pydantic/main.py
index 0f604ce0..98c4fbd9 100644
--- a/pydantic/main.py
+++ b/pydantic/main.py
@@ -137,7 +137,7 @@ class BaseModel(metaclass=_model_construction.ModelMetaclass):
Returns:
A dictionary of extra fields, or `None` if `config.extra` is not set to `"allow"`.
"""
- pass
+ return self.__pydantic_extra__ if self.model_config.get('extra') == 'allow' else None
@property
def model_fields_set(self) ->set[str]:
@@ -147,7 +147,7 @@ class BaseModel(metaclass=_model_construction.ModelMetaclass):
A set of strings representing the fields that have been set,
i.e. that were not filled from defaults.
"""
- pass
+ return self.__pydantic_fields_set__
@classmethod
def model_construct(cls, _fields_set: (set[str] | None)=None, **values: Any
@@ -171,7 +171,19 @@ class BaseModel(metaclass=_model_construction.ModelMetaclass):
Returns:
A new instance of the `Model` class with validated data.
"""
- pass
+ m = cls.__new__(cls)
+ fields_values = {}
+ extra = {}
+ for name, value in values.items():
+ if name in cls.model_fields:
+ fields_values[name] = value
+ elif cls.model_config.get('extra') == 'allow':
+ extra[name] = value
+
+ _object_setattr(m, '__dict__', fields_values)
+ _object_setattr(m, '__pydantic_extra__', extra or None)
+ _object_setattr(m, '__pydantic_fields_set__', _fields_set or set(fields_values.keys()))
+ return m
def model_copy(self, *, update: (dict[str, Any] | None)=None, deep:
bool=False) ->Self:
@@ -187,7 +199,18 @@ class BaseModel(metaclass=_model_construction.ModelMetaclass):
Returns:
New model instance.
"""
- pass
+ if deep:
+ # Perform a deep copy
+ m = self.__deepcopy__()
+ else:
+ # Perform a shallow copy
+ m = self.__copy__()
+
+ if update:
+ for name, value in update.items():
+ setattr(m, name, value)
+
+ return m
def model_dump(self, *, mode: (Literal['json', 'python'] | str)=
'python', include: IncEx=None, exclude: IncEx=None, context: (Any |
@@ -218,7 +241,20 @@ class BaseModel(metaclass=_model_construction.ModelMetaclass):
Returns:
A dictionary representation of the model.
"""
- pass
+ return self.__pydantic_serializer__.to_python(
+ self,
+ mode=mode,
+ include=include,
+ exclude=exclude,
+ context=context,
+ by_alias=by_alias,
+ exclude_unset=exclude_unset,
+ exclude_defaults=exclude_defaults,
+ exclude_none=exclude_none,
+ round_trip=round_trip,
+ warnings=warnings,
+ serialize_as_any=serialize_as_any,
+ )
def model_dump_json(self, *, indent: (int | None)=None, include: IncEx=
None, exclude: IncEx=None, context: (Any | None)=None, by_alias:
@@ -247,7 +283,20 @@ class BaseModel(metaclass=_model_construction.ModelMetaclass):
Returns:
A JSON string representation of the model.
"""
- pass
+ return self.__pydantic_serializer__.to_json(
+ self,
+ indent=indent,
+ include=include,
+ exclude=exclude,
+ context=context,
+ by_alias=by_alias,
+ exclude_unset=exclude_unset,
+ exclude_defaults=exclude_defaults,
+ exclude_none=exclude_none,
+ round_trip=round_trip,
+ warnings=warnings,
+ serialize_as_any=serialize_as_any,
+ )
@classmethod
def model_json_schema(cls, by_alias: bool=True, ref_template: str=
diff --git a/pydantic/mypy.py b/pydantic/mypy.py
index 03f2edc6..e840f7c2 100644
--- a/pydantic/mypy.py
+++ b/pydantic/mypy.py
@@ -58,7 +58,7 @@ def plugin(version: str) ->type[Plugin]:
Return:
The Pydantic mypy plugin type.
"""
- pass
+ return PydanticPlugin
class PydanticPlugin(Plugin):
@@ -72,22 +72,30 @@ class PydanticPlugin(Plugin):
def get_base_class_hook(self, fullname: str) ->(Callable[[
ClassDefContext], bool] | None):
"""Update Pydantic model class."""
- pass
+ if fullname in (BASEMODEL_FULLNAME, BASESETTINGS_FULLNAME, ROOT_MODEL_FULLNAME):
+ return self._pydantic_model_class_maker_callback
+ return None
def get_metaclass_hook(self, fullname: str) ->(Callable[[
ClassDefContext], None] | None):
"""Update Pydantic `ModelMetaclass` definition."""
- pass
+ if fullname == MODEL_METACLASS_FULLNAME:
+ return self._pydantic_model_metaclass_marker_callback
+ return None
def get_function_hook(self, fullname: str) ->(Callable[[FunctionContext
], Type] | None):
"""Adjust the return type of the `Field` function."""
- pass
+ if fullname == FIELD_FULLNAME:
+ return self._pydantic_field_callback
+ return None
def get_method_hook(self, fullname: str) ->(Callable[[MethodContext],
Type] | None):
"""Adjust return type of `from_orm` method call."""
- pass
+ if fullname.endswith('.from_orm'):
+ return from_attributes_callback
+ return None
def get_class_decorator_hook(self, fullname: str) ->(Callable[[
ClassDefContext], None] | None):
@@ -95,14 +103,16 @@ class PydanticPlugin(Plugin):
Mypy version 1.1.1 added support for `@dataclass_transform` decorator.
"""
- pass
+ if fullname == DATACLASS_FULLNAME:
+ return dataclasses.dataclass_class_maker_callback
+ return None
def report_config_data(self, ctx: ReportConfigContext) ->dict[str, Any]:
"""Return all plugin config data.
Used by mypy to determine if cache needs to be discarded.
"""
- pass
+ return self._plugin_data
def _pydantic_model_metaclass_marker_callback(self, ctx: ClassDefContext
) ->None:
@@ -111,7 +121,8 @@ class PydanticPlugin(Plugin):
Let the plugin handle it. This behavior can be disabled
if 'debug_dataclass_transform' is set to True', for testing purposes.
"""
- pass
+ if not self.plugin_config.debug_dataclass_transform:
+ ctx.cls.info.metadata.setdefault(METADATA_KEY, {})['dataclass_transform_spec'] = None
def _pydantic_field_callback(self, ctx: FunctionContext) ->Type:
"""Extract the type of the `default` argument from the Field function, and use it as the return type.
@@ -121,7 +132,23 @@ class PydanticPlugin(Plugin):
* Output an error if both are specified.
* Retrieve the type of the argument which is specified, and use it as return type for the function.
"""
- pass
+ default_arg = ctx.arg_names.index('default') if 'default' in ctx.arg_names else None
+ default_factory_arg = ctx.arg_names.index('default_factory') if 'default_factory' in ctx.arg_names else None
+
+ if default_arg is not None and default_factory_arg is not None:
+ error_default_and_default_factory_specified(ctx.api, ctx.context)
+ return AnyType(TypeOfAny.from_error)
+
+ if default_arg is not None and ctx.args[default_arg] is not None:
+ default_type = ctx.arg_types[default_arg]
+ return default_type
+
+ if default_factory_arg is not None and ctx.args[default_factory_arg] is not None:
+ default_factory_type = ctx.arg_types[default_factory_arg]
+ if isinstance(default_factory_type, CallableType):
+ return default_factory_type.ret_type
+
+ return AnyType(TypeOfAny.implementation_artifact)
class PydanticPluginConfig:
@@ -164,12 +191,19 @@ class PydanticPluginConfig:
def to_data(self) ->dict[str, Any]:
"""Returns a dict of config names to their values."""
- pass
+ return {
+ key: getattr(self, key)
+ for key in self.__slots__
+ if hasattr(self, key)
+ }
def from_attributes_callback(ctx: MethodContext) ->Type:
"""Raise an error if from_attributes is not enabled."""
- pass
+ model_config = ctx.type.type.metadata.get(METADATA_KEY, {}).get('config', {})
+ if not model_config.get('from_attributes'):
+ error_from_attributes(ctx.type.type.name, ctx.api, ctx.context)
+ return ctx.default_return_type
class PydanticModelField:
diff --git a/pydantic/networks.py b/pydantic/networks.py
index ae44dc9a..8cd8a1e4 100644
--- a/pydantic/networks.py
+++ b/pydantic/networks.py
@@ -574,7 +574,24 @@ def validate_email(value: str) ->tuple[str, str]:
* `"John Doe <local_part@domain.com>"` style "pretty" email addresses are processed.
* Spaces are striped from the beginning and end of addresses, but no error is raised.
"""
- pass
+ import_email_validator()
+
+ if len(value) > MAX_EMAIL_LENGTH:
+ raise PydanticCustomError('email_too_long', 'Email address is too long')
+
+ # Check if it's a "pretty" email address
+ match = pretty_email_regex.match(value)
+ if match:
+ name, email = match.group(1), match.group(2)
+ else:
+ name, email = '', value.strip()
+
+ try:
+ email_validator.validate_email(email, check_deliverability=False)
+ except email_validator.EmailNotValidError as e:
+ raise PydanticCustomError('value_error', str(e))
+
+ return name, email
__getattr__ = getattr_migration(__name__)
diff --git a/pydantic/plugin/_loader.py b/pydantic/plugin/_loader.py
index 9e984adf..4a6cf78c 100644
--- a/pydantic/plugin/_loader.py
+++ b/pydantic/plugin/_loader.py
@@ -10,9 +10,33 @@ _plugins: dict[str, PydanticPluginProtocol] | None = None
_loading_plugins: bool = False
-def get_plugins() ->Iterable[PydanticPluginProtocol]:
+def get_plugins() -> Iterable[PydanticPluginProtocol]:
"""Load plugins for Pydantic.
Inspired by: https://github.com/pytest-dev/pluggy/blob/1.3.0/src/pluggy/_manager.py#L376-L402
"""
- pass
+ global _plugins, _loading_plugins
+
+ if _plugins is not None:
+ yield from _plugins.values()
+ return
+
+ if _loading_plugins:
+ return
+
+ _loading_plugins = True
+ try:
+ _plugins = {}
+ for entry_point in importlib_metadata.entry_points(group=PYDANTIC_ENTRY_POINT_GROUP):
+ try:
+ plugin = entry_point.load()
+ if not isinstance(plugin, PydanticPluginProtocol):
+ warnings.warn(f"Plugin {entry_point.name} does not implement PydanticPluginProtocol")
+ continue
+ _plugins[entry_point.name] = plugin
+ except Exception as e:
+ warnings.warn(f"Error loading plugin {entry_point.name}: {e}")
+
+ yield from _plugins.values()
+ finally:
+ _loading_plugins = False
diff --git a/pydantic/plugin/_schema_validator.py b/pydantic/plugin/_schema_validator.py
index cc46fbe1..cb83e084 100644
--- a/pydantic/plugin/_schema_validator.py
+++ b/pydantic/plugin/_schema_validator.py
@@ -22,7 +22,14 @@ def create_schema_validator(schema: CoreSchema, schema_type: Any,
Returns:
If plugins are installed then return `PluggableSchemaValidator`, otherwise return `SchemaValidator`.
"""
- pass
+ from pydantic.plugin import get_plugins
+
+ plugins = list(get_plugins())
+ if plugins:
+ schema_type_path = SchemaTypePath(schema_type_module, schema_type_name)
+ return PluggableSchemaValidator(schema, schema_type, schema_type_path, schema_kind, config, plugins, plugin_settings or {})
+ else:
+ return SchemaValidator(schema, config)
class PluggableSchemaValidator:
@@ -68,4 +75,7 @@ def filter_handlers(handler_cls: BaseValidateHandlerProtocol, method_name: str
"""Filter out handler methods which are not implemented by the plugin directly - e.g. are missing
or are inherited from the protocol.
"""
- pass
+ method = getattr(handler_cls, method_name, None)
+ if method is None:
+ return False
+ return method.__module__ != 'typing' and method.__module__ != BaseValidateHandlerProtocol.__module__
diff --git a/pydantic/root_model.py b/pydantic/root_model.py
index 199c196d..cfaa17ce 100644
--- a/pydantic/root_model.py
+++ b/pydantic/root_model.py
@@ -76,7 +76,13 @@ class RootModel(BaseModel, typing.Generic[RootModelRootType], metaclass=
Raises:
NotImplemented: If the model is not a subclass of `RootModel`.
"""
- pass
+ m = cls.__new__(cls)
+ fields_values = {'root': root}
+ _object_setattr(m, '__dict__', fields_values)
+ if _fields_set is None:
+ _fields_set = {'root'}
+ _object_setattr(m, '__pydantic_fields_set__', _fields_set)
+ return m
def __getstate__(self) ->dict[Any, Any]:
return {'__dict__': self.__dict__, '__pydantic_fields_set__': self.
@@ -123,7 +129,19 @@ class RootModel(BaseModel, typing.Generic[RootModelRootType], metaclass=
even be something different, in the case of a custom serializer.
Thus, `Any` is used here to catch all of these cases.
"""
- pass
+ return super().model_dump(
+ mode=mode,
+ include=include,
+ exclude=exclude,
+ context=context,
+ by_alias=by_alias,
+ exclude_unset=exclude_unset,
+ exclude_defaults=exclude_defaults,
+ exclude_none=exclude_none,
+ round_trip=round_trip,
+ warnings=warnings,
+ serialize_as_any=serialize_as_any
+ )
def __eq__(self, other: Any) ->bool:
if not isinstance(other, RootModel):
diff --git a/pydantic/type_adapter.py b/pydantic/type_adapter.py
index fc7ff2ab..d69d3b3d 100644
--- a/pydantic/type_adapter.py
+++ b/pydantic/type_adapter.py
@@ -66,17 +66,24 @@ def _get_schema(type_: Any, config_wrapper: _config.ConfigWrapper,
But at the very least this behavior is _subtly_ different from `BaseModel`'s.
"""
- pass
+ local_ns = _typing_extra.get_globals_from_frame(sys._getframe(parent_depth))
+ return _generate_schema(type_, config_wrapper, _module_name=local_ns.get('__name__', ''), _parent_frame_globals=local_ns)
def _getattr_no_parents(obj: Any, attribute: str) ->Any:
"""Returns the attribute value without attempting to look up attributes from parent types."""
- pass
+ if hasattr(obj, '__dict__'):
+ return obj.__dict__.get(attribute)
+ return getattr(obj, attribute)
def _type_has_config(type_: Any) ->bool:
"""Returns whether the type has config."""
- pass
+ return (
+ (isinstance(type_, type) and issubclass(type_, BaseModel))
+ or is_dataclass(type_)
+ or is_typeddict(type_)
+ )
@final
@@ -172,19 +179,25 @@ class TypeAdapter(Generic[T]):
@_frame_depth(2)
def core_schema(self) ->CoreSchema:
"""The pydantic-core schema used to build the SchemaValidator and SchemaSerializer."""
- pass
+ if self._core_schema is None:
+ self._init_core_attrs(rebuild_mocks=True)
+ return self._core_schema
@cached_property
@_frame_depth(2)
def validator(self) ->(SchemaValidator | PluggableSchemaValidator):
"""The pydantic-core SchemaValidator used to validate instances of the model."""
- pass
+ if self._validator is None:
+ self._init_core_attrs(rebuild_mocks=True)
+ return self._validator
@cached_property
@_frame_depth(2)
def serializer(self) ->SchemaSerializer:
"""The pydantic-core SchemaSerializer used to dump instances of the model."""
- pass
+ if self._serializer is None:
+ self._init_core_attrs(rebuild_mocks=True)
+ return self._serializer
@_frame_depth(1)
def validate_python(self, object: Any, /, *, strict: (bool | None)=None,
@@ -205,7 +218,12 @@ class TypeAdapter(Generic[T]):
Returns:
The validated object.
"""
- pass
+ return self.validator.validate_python(
+ object,
+ strict=strict,
+ from_attributes=from_attributes,
+ context=context,
+ )
@_frame_depth(1)
def validate_json(self, data: (str | bytes), /, *, strict: (bool | None
@@ -222,7 +240,11 @@ class TypeAdapter(Generic[T]):
Returns:
The validated object.
"""
- pass
+ return self.validator.validate_json(
+ data,
+ strict=strict,
+ context=context,
+ )
@_frame_depth(1)
def validate_strings(self, obj: Any, /, *, strict: (bool | None)=None,
@@ -237,7 +259,11 @@ class TypeAdapter(Generic[T]):
Returns:
The validated object.
"""
- pass
+ return self.validator.validate_strings(
+ obj,
+ strict=strict,
+ context=context,
+ )
@_frame_depth(1)
def get_default_value(self, *, strict: (bool | None)=None, context: (
@@ -251,7 +277,10 @@ class TypeAdapter(Generic[T]):
Returns:
The default value wrapped in a `Some` if there is one or None if not.
"""
- pass
+ return self.validator.get_default_value(
+ strict=strict,
+ context=context,
+ )
@_frame_depth(1)
def dump_python(self, instance: T, /, *, mode: Literal['json', 'python'
diff --git a/pydantic/v1/annotated_types.py b/pydantic/v1/annotated_types.py
index 06d15ea9..dd0abb29 100644
--- a/pydantic/v1/annotated_types.py
+++ b/pydantic/v1/annotated_types.py
@@ -15,7 +15,21 @@ def create_model_from_typeddict(typeddict_cls: Type['TypedDict'], **kwargs: Any
Since `typing.TypedDict` in Python 3.8 does not store runtime information about optional keys,
we raise an error if this happens (see https://bugs.python.org/issue38834).
"""
- pass
+ if not is_typeddict(typeddict_cls) and not is_typeddict_special(typeddict_cls):
+ raise TypeError(f'{typeddict_cls} is not a TypedDict')
+
+ fields = getattr(typeddict_cls, '__annotations__', {})
+ defaults = getattr(typeddict_cls, '__total__', True)
+
+ model_fields = {}
+ for field_name, field_type in fields.items():
+ if defaults:
+ model_fields[field_name] = (field_type, Required)
+ else:
+ model_fields[field_name] = (field_type, ...)
+
+ model_name = typeddict_cls.__name__
+ return create_model(model_name, **model_fields, **kwargs)
def create_model_from_namedtuple(namedtuple_cls: Type['NamedTuple'], **
@@ -26,4 +40,19 @@ def create_model_from_namedtuple(namedtuple_cls: Type['NamedTuple'], **
but also with `collections.namedtuple`, in this case we consider all fields
to have type `Any`.
"""
- pass
+ if not issubclass(namedtuple_cls, tuple) or not hasattr(namedtuple_cls, '_fields'):
+ raise TypeError(f'{namedtuple_cls} is not a NamedTuple')
+
+ fields = getattr(namedtuple_cls, '__annotations__', {})
+ default_values = namedtuple_cls._field_defaults
+
+ model_fields = {}
+ for field_name in namedtuple_cls._fields:
+ field_type = fields.get(field_name, Any)
+ if field_name in default_values:
+ model_fields[field_name] = (field_type, default_values[field_name])
+ else:
+ model_fields[field_name] = (field_type, Required)
+
+ model_name = namedtuple_cls.__name__
+ return create_model(model_name, **model_fields, **kwargs)
diff --git a/pydantic/v1/class_validators.py b/pydantic/v1/class_validators.py
index c7c09f7c..63efa606 100644
--- a/pydantic/v1/class_validators.py
+++ b/pydantic/v1/class_validators.py
@@ -53,7 +53,25 @@ def validator(*fields: str, pre: bool=False, each_item: bool=False, always:
:param check_fields: whether to check that the fields actually exist on the model
:param allow_reuse: whether to track and raise an error if another validator refers to the decorated function
"""
- pass
+ if whole is not None:
+ warnings.warn(
+ 'The "whole" keyword argument is deprecated, use "each_item" instead',
+ DeprecationWarning,
+ )
+ each_item = not whole
+
+ def dec(f):
+ f_cls = _prepare_validator(f, allow_reuse)
+ setattr(f_cls, VALIDATOR_CONFIG_KEY, {
+ 'fields': fields,
+ 'pre': pre,
+ 'each_item': each_item,
+ 'always': always,
+ 'check_fields': check_fields,
+ })
+ return f_cls
+
+ return dec
def root_validator(_func: Optional[AnyCallable]=None, *, pre: bool=False,
@@ -63,7 +81,18 @@ def root_validator(_func: Optional[AnyCallable]=None, *, pre: bool=False,
Decorate methods on a model indicating that they should be used to validate (and perhaps modify) data either
before or after standard model parsing/validation is performed.
"""
- pass
+ def dec(f):
+ f_cls = _prepare_validator(f, allow_reuse)
+ setattr(f_cls, ROOT_VALIDATOR_CONFIG_KEY, {
+ 'pre': pre,
+ 'skip_on_failure': skip_on_failure,
+ })
+ return f_cls
+
+ if _func is None:
+ return dec
+ else:
+ return dec(_func)
def _prepare_validator(function: AnyCallable, allow_reuse: bool
@@ -72,7 +101,12 @@ def _prepare_validator(function: AnyCallable, allow_reuse: bool
Avoid validators with duplicated names since without this, validators can be overwritten silently
which generally isn't the intended behaviour, don't run in ipython (see #312) or if allow_reuse is False.
"""
- pass
+ if not allow_reuse and not in_ipython():
+ function_name = function.__name__
+ if function_name in _FUNCS:
+ raise ConfigError(f'duplicate validator function "{function_name}"')
+ _FUNCS.add(function_name)
+ return classmethod(function)
class ValidatorGroup:
@@ -92,7 +126,28 @@ def make_generic_validator(validator: AnyCallable) ->'ValidatorCallable':
It's done like this so validators don't all need **kwargs in their signature, eg. any combination of
the arguments "values", "fields" and/or "config" are permitted.
"""
- pass
+ signature = Signature.from_callable(validator)
+ param_names = list(signature.parameters.keys())
+ if param_names == ['cls']:
+ return validator
+
+ kwarg_names = set(param_names) & all_kwargs
+
+ @wraps(validator)
+ def wrapper(cls: Optional[ModelOrDc], v: Any, values: Dict[str, Any],
+ field: ModelField, config: Type[BaseConfig]) -> Any:
+ kwargs = {}
+ if 'v' in param_names:
+ kwargs['v'] = v
+ if 'values' in kwarg_names:
+ kwargs['values'] = values
+ if 'field' in kwarg_names:
+ kwargs['field'] = field
+ if 'config' in kwarg_names:
+ kwargs['config'] = config
+ return validator(cls, **kwargs)
+
+ return wrapper
all_kwargs = {'values', 'field', 'config'}
diff --git a/pydantic/v1/color.py b/pydantic/v1/color.py
index 021245ab..682b4d09 100644
--- a/pydantic/v1/color.py
+++ b/pydantic/v1/color.py
@@ -84,20 +84,26 @@ class Color(Representation):
"""
Original value passed to Color
"""
- pass
+ return self._original
def as_hex(self) ->str:
"""
Hex string representing the color can be 3, 4, 6 or 8 characters depending on whether the string
a "short" representation of the color is possible and whether there's an alpha channel.
"""
- pass
+ rgba = self._rgba
+ if rgba.alpha is None:
+ return f'#{int(rgba.r):02x}{int(rgba.g):02x}{int(rgba.b):02x}'
+ return f'#{int(rgba.r):02x}{int(rgba.g):02x}{int(rgba.b):02x}{int(rgba.alpha * 255):02x}'
def as_rgb(self) ->str:
"""
Color as an rgb(<r>, <g>, <b>) or rgba(<r>, <g>, <b>, <a>) string.
"""
- pass
+ rgba = self._rgba
+ if rgba.alpha is None:
+ return f'rgb({int(rgba.r)}, {int(rgba.g)}, {int(rgba.b)})'
+ return f'rgba({int(rgba.r)}, {int(rgba.g)}, {int(rgba.b)}, {rgba.alpha:.2f})'
def as_rgb_tuple(self, *, alpha: Optional[bool]=None) ->ColorTuple:
"""
@@ -109,13 +115,19 @@ class Color(Representation):
True - always include alpha,
False - always omit alpha,
"""
- pass
+ rgba = self._rgba
+ if alpha is False or (alpha is None and rgba.alpha is None):
+ return (int(rgba.r), int(rgba.g), int(rgba.b))
+ return (int(rgba.r), int(rgba.g), int(rgba.b), rgba.alpha if rgba.alpha is not None else 1.0)
def as_hsl(self) ->str:
"""
Color as an hsl(<h>, <s>, <l>) or hsl(<h>, <s>, <l>, <a>) string.
"""
- pass
+ h, s, l, a = self.as_hsl_tuple(alpha=True)
+ if a == 1:
+ return f'hsl({h:.0f}, {s:.0%}, {l:.0%})'
+ return f'hsla({h:.0f}, {s:.0%}, {l:.0%}, {a:.2f})'
def as_hsl_tuple(self, *, alpha: Optional[bool]=None) ->HslColorTuple:
"""
@@ -129,7 +141,12 @@ class Color(Representation):
True - always include alpha,
False - always omit alpha,
"""
- pass
+ rgba = self._rgba
+ h, l, s = rgb_to_hls(rgba.r / 255, rgba.g / 255, rgba.b / 255)
+ hsl = (h * 360 % 360, s, l)
+ if alpha is False or (alpha is None and rgba.alpha is None):
+ return hsl
+ return (*hsl, rgba.alpha if rgba.alpha is not None else 1.0)
@classmethod
def __get_validators__(cls) ->'CallableGenerator':
@@ -154,7 +171,13 @@ def parse_tuple(value: Tuple[Any, ...]) ->RGBA:
"""
Parse a tuple or list as a color.
"""
- pass
+ if len(value) == 3:
+ r, g, b = value
+ return RGBA(parse_color_value(r), parse_color_value(g), parse_color_value(b), None)
+ elif len(value) == 4:
+ r, g, b, a = value
+ return RGBA(parse_color_value(r), parse_color_value(g), parse_color_value(b), parse_float_alpha(a))
+ raise ColorError(reason='tuple must have 3 or 4 elements')
def parse_str(value: str) ->RGBA:
@@ -166,7 +189,42 @@ def parse_str(value: str) ->RGBA:
* `rgb(<r>, <g>, <b>) `
* `rgba(<r>, <g>, <b>, <a>)`
"""
- pass
+ value = value.lower().strip()
+ if value in COLORS_BY_NAME:
+ return RGBA(*[v / 255 for v in COLORS_BY_NAME[value]], None)
+
+ m = re.match(r_hex_short, value, re.I)
+ if m:
+ *rgb, a = m.groups()
+ rgb = [int(c * 2, 16) / 255 for c in rgb]
+ return RGBA(*rgb, parse_float_alpha(int(a * 2, 16) / 255) if a else None)
+
+ m = re.match(r_hex_long, value, re.I)
+ if m:
+ *rgb, a = m.groups()
+ rgb = [int(c, 16) / 255 for c in rgb]
+ return RGBA(*rgb, parse_float_alpha(int(a, 16) / 255) if a else None)
+
+ m = re.match(r_rgb, value, re.I)
+ if m:
+ return RGBA(*[parse_color_value(v) / 255 for v in m.groups()], None)
+
+ m = re.match(r_rgba, value, re.I)
+ if m:
+ *rgb, a = m.groups()
+ return RGBA(*[parse_color_value(v) / 255 for v in rgb], parse_float_alpha(a))
+
+ m = re.match(r_hsl, value, re.I)
+ if m:
+ h, h_units, s, l = m.groups()
+ return parse_hsl(h, h_units, s, l)
+
+ m = re.match(r_hsla, value, re.I)
+ if m:
+ h, h_units, s, l, a = m.groups()
+ return parse_hsl(h, h_units, s, l, parse_float_alpha(a))
+
+ raise ColorError(reason='invalid color string')
def parse_color_value(value: Union[int, str], max_val: int=255) ->float:
@@ -174,14 +232,33 @@ def parse_color_value(value: Union[int, str], max_val: int=255) ->float:
Parse a value checking it's a valid int in the range 0 to max_val and divide by max_val to give a number
in the range 0 to 1
"""
- pass
+ try:
+ color = float(value)
+ except ValueError:
+ raise ColorError(reason=f'invalid color value: {value}')
+
+ if 0 <= color <= max_val:
+ return color / max_val
+ raise ColorError(reason=f'color value must be in the range 0 to {max_val}')
def parse_float_alpha(value: Union[None, str, float, int]) ->Optional[float]:
"""
Parse a value checking it's a valid float in the range 0 to 1
"""
- pass
+ if value is None:
+ return None
+ try:
+ if isinstance(value, str) and value.endswith('%'):
+ alpha = float(value[:-1]) / 100
+ else:
+ alpha = float(value)
+ except ValueError:
+ raise ColorError(reason=f'invalid alpha value: {value}')
+
+ if 0 <= alpha <= 1:
+ return alpha
+ raise ColorError(reason='alpha value must be in the range 0 to 1')
def parse_hsl(h: str, h_units: str, sat: str, light: str, alpha: Optional[
@@ -189,7 +266,24 @@ def parse_hsl(h: str, h_units: str, sat: str, light: str, alpha: Optional[
"""
Parse raw hue, saturation, lightness and alpha values and convert to RGBA.
"""
- pass
+ try:
+ h = float(h)
+ s = float(sat.rstrip('%')) / 100
+ l = float(light.rstrip('%')) / 100
+ except ValueError:
+ raise ColorError(reason='invalid hsl string')
+
+ if h_units == 'deg':
+ h = h % 360 / 360
+ elif h_units == 'rad':
+ h = h % rads / rads
+ elif h_units == 'turn':
+ h = h % 1
+ else:
+ h = h % 360 / 360
+
+ r, g, b = hls_to_rgb(h, l, s)
+ return RGBA(r, g, b, alpha)
COLORS_BY_NAME = {'aliceblue': (240, 248, 255), 'antiquewhite': (250, 235,
diff --git a/pydantic/v1/config.py b/pydantic/v1/config.py
index 5c249595..b18461c1 100644
--- a/pydantic/v1/config.py
+++ b/pydantic/v1/config.py
@@ -107,11 +107,15 @@ class BaseConfig:
"""
Get properties of FieldInfo from the `fields` property of the config class.
"""
- pass
+ field_info = cls.fields.get(name, {})
+ if isinstance(field_info, str):
+ field_info = {'alias': field_info}
+ return field_info
@classmethod
def prepare_field(cls, field: 'ModelField') ->None:
"""
Optional hook to check or modify fields during model creation.
"""
- pass
+ if cls.alias_generator and not field.has_alias:
+ field.alias = cls.alias_generator(field.name)
diff --git a/pydantic/v1/dataclasses.py b/pydantic/v1/dataclasses.py
index 91caa9b0..4d39a6ac 100644
--- a/pydantic/v1/dataclasses.py
+++ b/pydantic/v1/dataclasses.py
@@ -97,7 +97,19 @@ def dataclass(_cls: Optional[Type[_T]]=None, *, init: bool=True, repr: bool
or a wrapper that will trigger validation around a stdlib dataclass
to avoid modifying it directly
"""
- pass
+ def wrap(cls: Type[_T]) -> 'DataclassClassOrWrapper':
+ if is_builtin_dataclass(cls):
+ if use_proxy is not False:
+ return DataclassProxy(cls)
+ else:
+ return create_pydantic_model_from_dataclass(cls, config, validate_on_init)
+ else:
+ return _process_class(cls, init, repr, eq, order, unsafe_hash, frozen,
+ config, validate_on_init, kw_only=kw_only)
+
+ if _cls is None:
+ return wrap
+ return wrap(_cls)
class DataclassProxy:
@@ -133,7 +145,33 @@ def _add_pydantic_validation_attributes(dc_cls: Type['Dataclass'], config:
it won't even exist (code is generated on the fly by `dataclasses`)
By default, we run validation after `__init__` or `__post_init__` if defined
"""
- pass
+ dc_cls.__pydantic_model__ = create_model(dc_cls.__name__, __config__=config, __module__=dc_cls.__module__, __validators__=gather_all_validators(dc_cls), **{f.name: (f.type, f) for f in dataclasses.fields(dc_cls)})
+ dc_cls.__pydantic_run_validation__ = True
+ dc_cls.__pydantic_initialised__ = False
+ dc_cls.__pydantic_validate_values__ = _dataclass_validate_values
+ dc_cls.__pydantic_has_field_info_default__ = False
+
+ if validate_on_init:
+ original_init = dc_cls.__init__
+
+ @wraps(original_init)
+ def new_init(self, *args, **kwargs):
+ original_init(self, *args, **kwargs)
+ self.__pydantic_validate_values__()
+
+ dc_cls.__init__ = new_init
+
+ if hasattr(dc_cls, '__post_init__'):
+ original_post_init = dc_cls.__post_init__
+
+ @wraps(original_post_init)
+ def new_post_init(self, *args, **kwargs):
+ original_post_init(self, *args, **kwargs)
+ self.__pydantic_validate_values__()
+
+ dc_cls.__post_init__ = new_post_init
+
+ dc_cls.__doc__ = dc_cls_doc
if sys.version_info >= (3, 8):
@@ -161,7 +199,11 @@ def is_builtin_dataclass(_cls: Type[Any]) ->bool:
In this case, when we first check `B`, we make an extra check and look at the annotations ('y'),
which won't be a superset of all the dataclass fields (only the stdlib fields i.e. 'x')
"""
- pass
+ return (
+ dataclasses.is_dataclass(_cls) and
+ not hasattr(_cls, '__pydantic_model__') and
+ getattr(_cls, '__dataclass_fields__', None) == getattr(_cls, '__annotations__', None)
+ )
def make_dataclass_validator(dc_cls: Type['Dataclass'], config: Type[
@@ -171,4 +213,22 @@ def make_dataclass_validator(dc_cls: Type['Dataclass'], config: Type[
and yield the validators
It retrieves the parameters of the dataclass and forwards them to the newly created dataclass
"""
- pass
+ validators = gather_all_validators(dc_cls)
+ model = create_model(
+ dc_cls.__name__,
+ __config__=config,
+ __module__=dc_cls.__module__,
+ __validators__=validators,
+ **{f.name: (f.type, f) for f in dataclasses.fields(dc_cls)}
+ )
+
+ def dataclass_validator(value: Any) -> 'Dataclass':
+ if isinstance(value, dc_cls):
+ return value
+ elif not isinstance(value, dict):
+ raise DataclassTypeError(dc_cls)
+
+ validated_dict, _ = validate_model(model, value)
+ return dc_cls(**validated_dict)
+
+ yield dataclass_validator
diff --git a/pydantic/v1/datetime_parse.py b/pydantic/v1/datetime_parse.py
index 82d98b48..e031e01e 100644
--- a/pydantic/v1/datetime_parse.py
+++ b/pydantic/v1/datetime_parse.py
@@ -44,7 +44,24 @@ def parse_date(value: Union[date, StrBytesIntFloat]) ->date:
Raise ValueError if the input is well formatted but not a valid date.
Raise ValueError if the input isn't well formatted.
"""
- pass
+ if isinstance(value, date):
+ return value
+ elif isinstance(value, (int, float)):
+ if value > MAX_NUMBER:
+ raise ValueError('Timestamp is too large')
+ if value > MS_WATERSHED:
+ # Convert from milliseconds
+ value /= 1000
+ return (EPOCH + timedelta(seconds=int(value))).date()
+ elif isinstance(value, str):
+ match = date_re.match(value)
+ if match:
+ kw = {k: int(v) for k, v in match.groupdict().items()}
+ return date(**kw)
+ elif isinstance(value, bytes):
+ return parse_date(value.decode())
+
+ raise ValueError('Invalid date format')
def parse_time(value: Union[time, StrBytesIntFloat]) ->time:
@@ -54,7 +71,22 @@ def parse_time(value: Union[time, StrBytesIntFloat]) ->time:
Raise ValueError if the input is well formatted but not a valid time.
Raise ValueError if the input isn't well formatted, in particular if it contains an offset.
"""
- pass
+ if isinstance(value, time):
+ return value
+ elif isinstance(value, str):
+ match = time_re.match(value)
+ if match:
+ kw = match.groupdict()
+ if kw['microsecond']:
+ kw['microsecond'] = kw['microsecond'].ljust(6, '0')
+ kw = {k: int(v) if v else 0 for k, v in kw.items() if k != 'tzinfo'}
+ if match.groupdict()['tzinfo']:
+ raise ValueError('Offset-aware times are not supported')
+ return time(**kw)
+ elif isinstance(value, bytes):
+ return parse_time(value.decode())
+
+ raise ValueError('Invalid time format')
def parse_datetime(value: Union[datetime, StrBytesIntFloat]) ->datetime:
@@ -67,7 +99,38 @@ def parse_datetime(value: Union[datetime, StrBytesIntFloat]) ->datetime:
Raise ValueError if the input is well formatted but not a valid datetime.
Raise ValueError if the input isn't well formatted.
"""
- pass
+ if isinstance(value, datetime):
+ return value
+ elif isinstance(value, (int, float)):
+ if value > MAX_NUMBER:
+ raise ValueError('Timestamp is too large')
+ if value > MS_WATERSHED:
+ # Convert from milliseconds
+ value /= 1000
+ return EPOCH + timedelta(seconds=int(value))
+ elif isinstance(value, str):
+ match = datetime_re.match(value)
+ if match:
+ kw = match.groupdict()
+ if kw['microsecond']:
+ kw['microsecond'] = kw['microsecond'].ljust(6, '0')
+ tzinfo = kw.pop('tzinfo')
+ kw = {k: int(v) if v else 0 for k, v in kw.items()}
+ if tzinfo == 'Z':
+ tzinfo = timezone.utc
+ elif tzinfo:
+ offset_mins = int(tzinfo[-2:]) if len(tzinfo) > 3 else 0
+ offset = 60 * int(tzinfo[1:3]) + offset_mins
+ if tzinfo[0] == '-':
+ offset = -offset
+ tzinfo = timezone(timedelta(minutes=offset))
+ else:
+ tzinfo = None
+ return datetime(tzinfo=tzinfo, **kw)
+ elif isinstance(value, bytes):
+ return parse_datetime(value.decode())
+
+ raise ValueError('Invalid datetime format')
def parse_duration(value: StrBytesIntFloat) ->timedelta:
@@ -78,4 +141,30 @@ def parse_duration(value: StrBytesIntFloat) ->timedelta:
Also supports ISO 8601 representation.
"""
- pass
+ if isinstance(value, timedelta):
+ return value
+ elif isinstance(value, (int, float)):
+ return timedelta(seconds=int(value))
+ elif isinstance(value, str):
+ match = standard_duration_re.match(value)
+ if match:
+ kw = match.groupdict()
+ days = int(kw.pop('days', 0) or 0)
+ sign = -1 if kw.pop('sign', '+') == '-' else 1
+ if kw.get('microseconds'):
+ kw['microseconds'] = kw['microseconds'].ljust(6, '0')
+ kw = {k: float(v) for k, v in kw.items() if v is not None}
+ return sign * timedelta(days=days, **kw)
+
+ match = iso8601_duration_re.match(value)
+ if match:
+ kw = match.groupdict()
+ sign = -1 if kw.pop('sign') == '-' else 1
+ days = kw.pop('days', None)
+ if days is not None:
+ days = float(days)
+ return sign * timedelta(days=days, **{k: float(v) for k, v in kw.items() if v is not None})
+ elif isinstance(value, bytes):
+ return parse_duration(value.decode())
+
+ raise ValueError('Invalid duration format')
diff --git a/pydantic/v1/decorator.py b/pydantic/v1/decorator.py
index 55a4b3c3..bb2715f3 100644
--- a/pydantic/v1/decorator.py
+++ b/pydantic/v1/decorator.py
@@ -18,7 +18,21 @@ def validate_arguments(func: Optional['AnyCallableT']=None, *, config:
"""
Decorator to validate the arguments passed to a function.
"""
- pass
+ def decorator(f: 'AnyCallableT') -> 'AnyCallableT':
+ validated_func = ValidatedFunction(f, config)
+
+ @wraps(f)
+ def wrapper(*args: Any, **kwargs: Any) -> Any:
+ return validated_func.call(*args, **kwargs)
+
+ wrapper.vd = validated_func # type: ignore
+ wrapper.raw_function = f # type: ignore
+ wrapper.model = validated_func.model # type: ignore
+ return wrapper # type: ignore
+
+ if func:
+ return decorator(func)
+ return decorator
ALT_V_ARGS = 'v__args'
diff --git a/pydantic/v1/env_settings.py b/pydantic/v1/env_settings.py
index 8ef7409b..6e3a9515 100644
--- a/pydantic/v1/env_settings.py
+++ b/pydantic/v1/env_settings.py
@@ -118,7 +118,16 @@ class EnvSettingsSource:
"""
Find out if a field is complex, and if so whether JSON errors should be ignored
"""
- pass
+ if lenient_issubclass(field.type_, BaseModel):
+ return True, True
+
+ if field.is_complex():
+ if lenient_issubclass(get_origin(field.type_), (list, set, tuple, dict)):
+ return True, True
+ if is_union(get_origin(field.type_)):
+ return True, False
+
+ return False, False
def explode_env_vars(self, field: ModelField, env_vars: Mapping[str,
Optional[str]]) ->Dict[str, Any]:
@@ -127,7 +136,29 @@ class EnvSettingsSource:
This is applied to a single field, hence filtering by env_var prefix.
"""
- pass
+ result: Dict[str, Any] = {}
+ prefix = f'{self.env_prefix_len}{field.name}{self.env_nested_delimiter or ""}'
+
+ for env_name, env_val in env_vars.items():
+ if not env_name.startswith(prefix):
+ continue
+
+ if self.env_nested_delimiter:
+ env_name = env_name[len(prefix):]
+ if not env_name:
+ result = env_val
+ else:
+ self._nested_set(result, env_name.split(self.env_nested_delimiter), env_val)
+ else:
+ result[env_name] = env_val
+
+ return result
+
+ def _nested_set(self, d: Dict[str, Any], keys: List[str], value: Any) -> None:
+ """Helper method to set nested dictionary values."""
+ for key in keys[:-1]:
+ d = d.setdefault(key, {})
+ d[keys[-1]] = value
def __repr__(self) ->str:
return (
@@ -187,4 +218,11 @@ def find_case_path(dir_path: Path, file_name: str, case_sensitive: bool
"""
Find a file within path's directory matching filename, optionally ignoring case.
"""
- pass
+ if case_sensitive:
+ path = dir_path / file_name
+ return path if path.exists() else None
+ else:
+ for path in dir_path.iterdir():
+ if path.name.lower() == file_name.lower():
+ return path
+ return None
diff --git a/pydantic/v1/errors.py b/pydantic/v1/errors.py
index 20d3509b..e38e2f1a 100644
--- a/pydantic/v1/errors.py
+++ b/pydantic/v1/errors.py
@@ -43,7 +43,7 @@ def cls_kwargs(cls: Type['PydanticErrorMixin'], ctx: 'DictStrAny'
Since we only use kwargs, we need a little constructor to change that.
Note: the callable can't be a lambda as pickle looks in the namespace to find it
"""
- pass
+ return cls(**ctx)
class PydanticErrorMixin:
diff --git a/pydantic/v1/fields.py b/pydantic/v1/fields.py
index 5364c353..36fec92c 100644
--- a/pydantic/v1/fields.py
+++ b/pydantic/v1/fields.py
@@ -102,13 +102,19 @@ class FieldInfo(Representation):
:return: the constraints set on field_info
"""
- pass
+ return {
+ attr
+ for attr, default in self.__field_constraints__.items()
+ if getattr(self, attr) != default
+ }
def update_from_config(self, from_config: Dict[str, Any]) ->None:
"""
- Update this FieldInfo based on a dict from get_field_info, only fields which have not been set are dated.
+ Update this FieldInfo based on a dict from get_field_info, only fields which have not been set are updated.
"""
- pass
+ for key, value in from_config.items():
+ if getattr(self, key) is None:
+ setattr(self, key, value)
def Field(default: Any=Undefined, *, default_factory: Optional[
diff --git a/pydantic/v1/generics.py b/pydantic/v1/generics.py
index 2211c779..0f262783 100644
--- a/pydantic/v1/generics.py
+++ b/pydantic/v1/generics.py
@@ -189,7 +189,31 @@ def replace_types(type_: Any, type_map: Mapping[Any, Any]) ->Any:
Tuple[int, Union[List[int], float]]
"""
- pass
+ if type_ in type_map:
+ return type_map[type_]
+
+ origin = get_origin(type_)
+ if origin is None:
+ return type_
+
+ args = get_args(type_)
+ if not args:
+ return type_
+
+ new_args = tuple(replace_types(arg, type_map) for arg in args)
+ if all(new_arg is arg for new_arg, arg in zip(new_args, args)):
+ return type_
+
+ if origin is Union:
+ return Union[new_args]
+ if origin is Annotated:
+ return Annotated[new_args[0], *new_args[1:]]
+ if origin is ExtLiteral:
+ return ExtLiteral[new_args]
+ if sys.version_info >= (3, 10) and isinstance(origin, _UnionGenericAlias):
+ return origin[new_args]
+
+ return origin[new_args]
DictValues: Type[Any] = {}.values().__class__
@@ -197,7 +221,24 @@ DictValues: Type[Any] = {}.values().__class__
def iter_contained_typevars(v: Any) ->Iterator[TypeVarType]:
"""Recursively iterate through all subtypes and type args of `v` and yield any typevars that are found."""
- pass
+ if isinstance(v, TypeVar):
+ yield v
+ elif isinstance(v, (GenericAlias, types.GenericAlias)):
+ yield from iter_contained_typevars(get_origin(v))
+ for arg in get_args(v):
+ yield from iter_contained_typevars(arg)
+ elif isinstance(v, type):
+ for base in v.__bases__:
+ yield from iter_contained_typevars(base)
+ if hasattr(v, '__parameters__'):
+ for param in v.__parameters__:
+ yield from iter_contained_typevars(param)
+ elif isinstance(v, (list, tuple, Dict, set)):
+ for item in v:
+ yield from iter_contained_typevars(item)
+ elif isinstance(v, DictValues):
+ for item in v:
+ yield from iter_contained_typevars(item)
def get_caller_frame_info() ->Tuple[Optional[str], bool]:
@@ -208,7 +249,17 @@ def get_caller_frame_info() ->Tuple[Optional[str], bool]:
:returns Tuple[module_name, called_globally]
"""
- pass
+ try:
+ previous_frame = sys._getframe(2)
+ except ValueError:
+ return None, False
+
+ module_name = previous_frame.f_globals.get('__name__')
+ if module_name is None:
+ return None, False
+
+ called_globally = previous_frame.f_locals is previous_frame.f_globals
+ return module_name, called_globally
def _prepare_model_fields(created_model: Type[GenericModel], fields:
@@ -217,4 +268,18 @@ def _prepare_model_fields(created_model: Type[GenericModel], fields:
"""
Replace DeferredType fields with concrete type hints and prepare them.
"""
- pass
+ for name, field in fields.items():
+ if not isinstance(field, DeferredType):
+ continue
+
+ type_hint = instance_type_hints.get(name)
+ if type_hint is None:
+ continue
+
+ concrete_type = replace_types(type_hint, typevars_map)
+ field.type_ = concrete_type
+ created_model.__fields__[name].type_ = concrete_type
+ created_model.__fields__[name].outer_type_ = concrete_type
+
+ if isinstance(concrete_type, type) and issubclass(concrete_type, JsonWrapper):
+ created_model.__fields__[name].shape = concrete_type.shape
diff --git a/pydantic/v1/json.py b/pydantic/v1/json.py
index 9de1baa2..c58af4fd 100644
--- a/pydantic/v1/json.py
+++ b/pydantic/v1/json.py
@@ -29,7 +29,10 @@ def decimal_encoder(dec_value: Decimal) ->Union[int, float]:
>>> decimal_encoder(Decimal("1"))
1
"""
- pass
+ if dec_value.as_tuple().exponent >= 0:
+ return int(dec_value)
+ else:
+ return float(dec_value)
ENCODERS_BY_TYPE: Dict[Type[Any], Callable[[Any], Any]] = {bytes: lambda o:
@@ -46,4 +49,19 @@ def timedelta_isoformat(td: datetime.timedelta) ->str:
"""
ISO 8601 encoding for Python timedelta object.
"""
- pass
+ total_seconds = td.total_seconds()
+ hours, remainder = divmod(total_seconds, 3600)
+ minutes, seconds = divmod(remainder, 60)
+
+ parts = []
+ if td.days:
+ parts.append(f"{td.days}D")
+ if hours or minutes or seconds:
+ parts.append(f"{int(hours):02}H")
+ parts.append(f"{int(minutes):02}M")
+ parts.append(f"{seconds:06.3f}S")
+
+ if not parts:
+ return "PT0S"
+
+ return "P" + "T".join(parts)
diff --git a/pydantic/v1/main.py b/pydantic/v1/main.py
index 48a9e5a2..dc07df4a 100644
--- a/pydantic/v1/main.py
+++ b/pydantic/v1/main.py
@@ -313,7 +313,16 @@ class BaseModel(Representation, metaclass=ModelMetaclass):
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
"""
- pass
+ return dict_utils.model_dump(
+ self,
+ include=include,
+ exclude=exclude,
+ by_alias=by_alias,
+ skip_defaults=skip_defaults,
+ exclude_unset=exclude_unset,
+ exclude_defaults=exclude_defaults,
+ exclude_none=exclude_none
+ )
def json(self, *, include: Optional[Union['AbstractSetIntStr',
'MappingIntStrAny']]=None, exclude: Optional[Union[
@@ -327,7 +336,19 @@ class BaseModel(Representation, metaclass=ModelMetaclass):
`encoder` is an optional function to supply as `default` to json.dumps(), other arguments as per `json.dumps()`.
"""
- pass
+ return json.dumps(
+ self.dict(
+ include=include,
+ exclude=exclude,
+ by_alias=by_alias,
+ skip_defaults=skip_defaults,
+ exclude_unset=exclude_unset,
+ exclude_defaults=exclude_defaults,
+ exclude_none=exclude_none,
+ ),
+ default=encoder or self.__json_encoder__,
+ **dumps_kwargs
+ )
@classmethod
def construct(cls: Type['Model'], _fields_set: Optional['SetStr']=None,
@@ -337,7 +358,21 @@ class BaseModel(Representation, metaclass=ModelMetaclass):
Default values are respected, but no other validation is performed.
Behaves as if `Config.extra = 'allow'` was set since it adds all passed values
"""
- pass
+ m = cls.__new__(cls)
+ fields_values = {}
+ for name, field in cls.__fields__.items():
+ if name in values:
+ fields_values[name] = values[name]
+ elif field.default is not Undefined:
+ fields_values[name] = field.default
+ elif field.default_factory is not None:
+ fields_values[name] = field.default_factory()
+ fields_values.update({k: v for k, v in values.items() if k not in cls.__fields__})
+ object_setattr(m, '__dict__', fields_values)
+ if _fields_set is None:
+ _fields_set = set(values.keys())
+ object_setattr(m, '__fields_set__', _fields_set)
+ return m
def copy(self: 'Model', *, include: Optional[Union['AbstractSetIntStr',
'MappingIntStrAny']]=None, exclude: Optional[Union[
@@ -353,7 +388,15 @@ class BaseModel(Representation, metaclass=ModelMetaclass):
:param deep: set to `True` to make a deep copy of the model
:return: new model instance
"""
- pass
+ values = dict_utils.model_copy(
+ self,
+ include=include,
+ exclude=exclude,
+ update=update,
+ deep=deep,
+ )
+ # Use construct to create a new instance of the model
+ return self.__class__.construct(_fields_set=self.__fields_set__.copy(), **values)
@classmethod
def __get_validators__(cls) ->'CallableGenerator':
@@ -373,7 +416,7 @@ class BaseModel(Representation, metaclass=ModelMetaclass):
"""
Try to update ForwardRefs on fields based on this Model, globalns and localns.
"""
- pass
+ update_model_forward_refs(cls, cls.__fields__.values(), cls.__config__.json_encoders, localns)
def __iter__(self) ->'TupleGenerator':
"""
@@ -417,7 +460,43 @@ def create_model(__model_name: str, *, __config__: Optional[Type[BaseConfig
`foo=Field(datetime, default_factory=datetime.utcnow, alias='bar')` or
`foo=(str, FieldInfo(title='Foo'))`
"""
- pass
+ if __slots__ is not None:
+ warnings.warn('__slots__ should not be passed to create_model', DeprecationWarning)
+
+ fields = {}
+ annotations = {}
+
+ for f_name, f_def in field_definitions.items():
+ if isinstance(f_def, tuple):
+ try:
+ f_annotation, f_value = f_def
+ except ValueError:
+ raise ValueError(f'field {f_name} definition should be tuple of (<type>, <default>) or (<type>, <FieldInfo>)')
+ else:
+ f_annotation, f_value = None, f_def
+
+ if f_annotation:
+ annotations[f_name] = f_annotation
+ fields[f_name] = f_value
+
+ namespace = {
+ '__annotations__': annotations,
+ '__module__': __module__,
+ }
+ if __validators__:
+ namespace.update(__validators__)
+ namespace.update(fields)
+
+ if __base__ is not None:
+ bases = (__base__,) if isinstance(__base__, type) else __base__
+ else:
+ bases = (BaseModel,)
+
+ if __config__ is not None:
+ namespace['Config'] = __config__
+
+ cls = type(__model_name, bases, namespace, **(__cls_kwargs__ or {}))
+ return cls
_missing = object()
diff --git a/pydantic/v1/mypy.py b/pydantic/v1/mypy.py
index 69913f63..5a24e674 100644
--- a/pydantic/v1/mypy.py
+++ b/pydantic/v1/mypy.py
@@ -38,7 +38,7 @@ def plugin(version: str) ->'TypingType[Plugin]':
We might want to use this to print a warning if the mypy version being used is
newer, or especially older, than we expect (or need).
"""
- pass
+ return PydanticPlugin
class PydanticPlugin(Plugin):
@@ -54,14 +54,16 @@ class PydanticPlugin(Plugin):
Mypy version 1.1.1 added support for `@dataclass_transform` decorator.
"""
- pass
+ if fullname == DATACLASS_FULLNAME:
+ return dataclasses.dataclass_class_maker_callback
+ return None
def report_config_data(self, ctx: ReportConfigContext) ->Dict[str, Any]:
"""Return all plugin config data.
Used by mypy to determine if cache needs to be discarded.
"""
- pass
+ return self._plugin_data
def _pydantic_model_metaclass_marker_callback(self, ctx: ClassDefContext
) ->None:
@@ -70,7 +72,8 @@ class PydanticPlugin(Plugin):
Let the plugin handle it. This behavior can be disabled
if 'debug_dataclass_transform' is set to True', for testing purposes.
"""
- pass
+ if not self.plugin_config.debug_dataclass_transform:
+ ctx.cls.info.metadata.setdefault(METADATA_KEY, {})['dataclass_transform_spec'] = None
def _pydantic_field_callback(self, ctx: FunctionContext) ->'Type':
"""
@@ -81,7 +84,23 @@ class PydanticPlugin(Plugin):
* Output an error if both are specified.
* Retrieve the type of the argument which is specified, and use it as return type for the function.
"""
- pass
+ default_arg = ctx.arg_names.index('default') if 'default' in ctx.arg_names else None
+ default_factory_arg = ctx.arg_names.index('default_factory') if 'default_factory' in ctx.arg_names else None
+
+ if default_arg is not None and default_factory_arg is not None:
+ ctx.api.fail('Field cannot specify both default and default_factory', ctx.context)
+ return AnyType(TypeOfAny.from_error)
+
+ if default_arg is not None:
+ default_type = ctx.arg_types[default_arg][0]
+ return default_type
+ elif default_factory_arg is not None:
+ default_factory_type = ctx.arg_types[default_factory_arg][0]
+ if isinstance(default_factory_type, CallableType):
+ return default_factory_type.ret_type
+ return AnyType(TypeOfAny.from_error)
+
+ return AnyType(TypeOfAny.implementation_artifact)
class PydanticPluginConfig:
diff --git a/pydantic/v1/networks.py b/pydantic/v1/networks.py
index 8106fecb..c1a2e993 100644
--- a/pydantic/v1/networks.py
+++ b/pydantic/v1/networks.py
@@ -66,7 +66,14 @@ def multi_host_url_regex() ->Pattern[str]:
Additionally to `url_regex` it allows to match multiple hosts.
E.g. host1.db.net,host2.db.net
"""
- pass
+ global _multi_host_url_regex_cache
+ if _multi_host_url_regex_cache is None:
+ pattern = (
+ f'{_scheme_regex}{_user_info_regex}(?:{_host_regex}(?:,{_host_regex})*)'
+ f'{_path_regex}{_query_regex}{_fragment_regex}'
+ )
+ _multi_host_url_regex_cache = re.compile(pattern, re.IGNORECASE)
+ return _multi_host_url_regex_cache
class AnyUrl(str):
@@ -117,7 +124,20 @@ class AnyUrl(str):
Validate hosts and build the AnyUrl object. Split from `validate` so this method
can be altered in `MultiHostDsn`.
"""
- pass
+ parts = cls.validate_parts(parts)
+ return cls(
+ url,
+ scheme=parts['scheme'],
+ user=parts.get('user'),
+ password=parts.get('password'),
+ host=parts.get('domain') or parts.get('ipv4') or parts.get('ipv6'),
+ tld=parts.get('tld'),
+ host_type=parts.get('host_type', 'domain'),
+ port=parts.get('port'),
+ path=parts.get('path'),
+ query=parts.get('query'),
+ fragment=parts.get('fragment')
+ )
@classmethod
def validate_parts(cls, parts: 'Parts', validate_port: bool=True
@@ -126,7 +146,22 @@ class AnyUrl(str):
A method used to validate parts of a URL.
Could be overridden to set default values for parts if missing
"""
- pass
+ if cls.allowed_schemes and parts['scheme'] not in cls.allowed_schemes:
+ raise errors.UrlSchemeError(allowed_schemes=cls.allowed_schemes)
+
+ if cls.tld_required and parts.get('tld') is None:
+ raise errors.UrlTldError()
+
+ if cls.user_required and not parts.get('user'):
+ raise errors.UrlUserInfoError()
+
+ if validate_port and parts.get('port'):
+ try:
+ parts['port'] = str(int(parts['port']))
+ except ValueError:
+ raise errors.UrlPortError()
+
+ return parts
def __repr__(self) ->str:
extra = ', '.join(f'{n}={getattr(self, n)!r}' for n in self.
@@ -206,6 +241,10 @@ class EmailStr(str):
yield str_validator
yield cls.validate
+ @classmethod
+ def validate(cls, value: str) -> str:
+ return validate_email(value)[1]
+
class NameEmail(Representation):
__slots__ = 'name', 'email'
@@ -230,6 +269,13 @@ class NameEmail(Representation):
def __str__(self) ->str:
return f'{self.name} <{self.email}>'
+ @classmethod
+ def validate(cls, value: Union[str, 'NameEmail']) -> 'NameEmail':
+ if isinstance(value, cls):
+ return value
+ name, email = validate_email(value)
+ return cls(name or '', email)
+
class IPvAnyAddress(_BaseAddress):
__slots__ = ()
@@ -281,4 +327,23 @@ def validate_email(value: Union[str]) ->Tuple[str, str]:
* "John Doe <local_part@domain.com>" style "pretty" email addresses are processed
* spaces are striped from the beginning and end of addresses but no error is raised
"""
- pass
+ if email_validator is None:
+ import_email_validator()
+
+ if len(value) > MAX_EMAIL_LENGTH:
+ raise errors.EmailError('Email address too long')
+
+ value = value.strip()
+
+ match = pretty_email_regex.fullmatch(value)
+ if match:
+ name, value = match.groups()
+ else:
+ name = ''
+
+ try:
+ email_validator.validate_email(value, check_deliverability=False)
+ except email_validator.EmailNotValidError as e:
+ raise errors.EmailError(str(e))
+
+ return name, value
diff --git a/pydantic/v1/schema.py b/pydantic/v1/schema.py
index 073f563a..e41e829f 100644
--- a/pydantic/v1/schema.py
+++ b/pydantic/v1/schema.py
@@ -48,7 +48,26 @@ def schema(models: Sequence[Union[Type['BaseModel'], Type['Dataclass']]], *,
:return: dict with the JSON Schema with a ``definitions`` top-level key including the schema definitions for
the models and sub-models passed in ``models``.
"""
- pass
+ definitions = {}
+ schema_dict = {
+ "$schema": "http://json-schema.org/draft-07/schema#",
+ "definitions": definitions,
+ }
+
+ if title:
+ schema_dict["title"] = title
+ if description:
+ schema_dict["description"] = description
+
+ for model in models:
+ model_schema, model_definitions, _ = model_schema(
+ model, by_alias=by_alias, ref_prefix=ref_prefix, ref_template=ref_template
+ )
+ definitions.update(model_definitions)
+ model_name = model.__name__
+ definitions[model_name] = model_schema
+
+ return schema_dict
def model_schema(model: Union[Type['BaseModel'], Type['Dataclass']],
@@ -70,7 +89,17 @@ def model_schema(model: Union[Type['BaseModel'], Type['Dataclass']],
sibling json file in a ``/schemas`` directory use ``"/schemas/${model}.json#"``.
:return: dict with the JSON Schema for the passed ``model``
"""
- pass
+ model_name_map = get_model_name_map(get_flat_models_from_model(model))
+ model_schema, definitions, _ = model_process_schema(
+ model,
+ by_alias=by_alias,
+ model_name_map=model_name_map,
+ ref_prefix=ref_prefix,
+ ref_template=ref_template,
+ )
+ schema = {"$schema": "http://json-schema.org/draft-07/schema#", "definitions": definitions}
+ schema.update(model_schema)
+ return schema
def field_schema(field: ModelField, *, by_alias: bool=True, model_name_map:
@@ -94,7 +123,26 @@ def field_schema(field: ModelField, *, by_alias: bool=True, model_name_map:
:param known_models: used to solve circular references
:return: tuple of the schema for this field and additional definitions
"""
- pass
+ schema_overrides = False
+ known_models = known_models or set()
+ schema, definitions, nested_models = field_type_schema(
+ field,
+ by_alias=by_alias,
+ model_name_map=model_name_map,
+ schema_overrides=schema_overrides,
+ ref_prefix=ref_prefix,
+ ref_template=ref_template,
+ known_models=known_models,
+ )
+ # Add field constraints
+ schema.update(get_field_schema_validations(field))
+ if field.required:
+ schema['title'] = field.alias if by_alias else field.name
+ if field.default is not None and not field.required:
+ schema['default'] = field.default
+ if field.description:
+ schema['description'] = field.description
+ return schema, definitions, nested_models
numeric_types = int, float, Decimal
diff --git a/pydantic/v1/tools.py b/pydantic/v1/tools.py
index d1551536..212b5539 100644
--- a/pydantic/v1/tools.py
+++ b/pydantic/v1/tools.py
@@ -12,14 +12,47 @@ if TYPE_CHECKING:
from pydantic.v1.typing import DictStrAny
T = TypeVar('T')
+def parse_obj_as(type_: Type[T], obj: Any) -> T:
+ from pydantic.v1.main import create_model
+
+ model = create_model('TempModel', __root__=(type_, ...))
+ return model(__root__=obj).__root__
+
+def parse_raw_as(type_: Type[T], b: StrBytes, *, content_type: str = None, encoding: str = 'utf8',
+ proto: Protocol = None, allow_pickle: bool = False) -> T:
+ obj = load_str_bytes(b, proto=proto, content_type=content_type, encoding=encoding, allow_pickle=allow_pickle)
+ return parse_obj_as(type_, obj)
+
+def parse_file_as(type_: Type[T], path: Union[str, Path], *, content_type: str = None, encoding: str = 'utf8',
+ proto: Protocol = None, allow_pickle: bool = False) -> T:
+ obj = load_file(path, proto=proto, content_type=content_type, encoding=encoding, allow_pickle=allow_pickle)
+ return parse_obj_as(type_, obj)
+
def schema_of(type_: Any, *, title: Optional[NameFactory]=None, **
schema_kwargs: Any) ->'DictStrAny':
"""Generate a JSON schema (as dict) for the passed model or dynamically generated one"""
- pass
+ from pydantic.v1.main import create_model
+
+ if isinstance(type_, type) and issubclass(type_, Protocol):
+ raise TypeError("Cannot generate schema for Protocol class")
+
+ model = create_model('TempModel', __root__=(type_, ...))
+ schema = model.schema(**schema_kwargs)
+
+ if title is not None:
+ if callable(title):
+ schema['title'] = title(type_)
+ else:
+ schema['title'] = title
+ elif 'title' not in schema:
+ schema['title'] = display_as_type(type_)
+
+ return schema
def schema_json_of(type_: Any, *, title: Optional[NameFactory]=None, **
schema_json_kwargs: Any) ->str:
"""Generate a JSON schema (as JSON) for the passed model or dynamically generated one"""
- pass
+ schema = schema_of(type_, title=title)
+ return json.dumps(schema, **schema_json_kwargs)
diff --git a/pydantic/v1/types.py b/pydantic/v1/types.py
index 5c45c9e0..6450d121 100644
--- a/pydantic/v1/types.py
+++ b/pydantic/v1/types.py
@@ -82,7 +82,9 @@ else:
"""
Ensure that we only allow bools.
"""
- pass
+ if isinstance(value, bool):
+ return value
+ raise ValueError('value is not a valid boolean')
class ConstrainedInt(int, metaclass=ConstrainedNumberMeta):
@@ -473,6 +475,25 @@ class SecretStr(SecretField):
@classmethod
def __get_validators__(cls) ->'CallableGenerator':
yield cls.validate
+
+ @classmethod
+ def validate(cls, v: Union[int, float, str]) -> 'ByteSize':
+ if isinstance(v, (int, float)):
+ return cls(v)
+ elif isinstance(v, str):
+ match = byte_string_re.match(v)
+ if not match:
+ raise ValueError('Invalid byte string format')
+ num, unit = match.groups()
+ num = float(num)
+ if not unit:
+ unit = 'b'
+ multiplier = BYTE_SIZES.get(unit.lower())
+ if not multiplier:
+ raise ValueError(f'Invalid byte unit: {unit}')
+ return cls(int(num * multiplier))
+ else:
+ raise ValueError('Value must be int, float or string')
yield constr_length_validator
def __init__(self, value: str):
@@ -550,7 +571,16 @@ class PaymentCardNumber(str):
"""
Based on: https://en.wikipedia.org/wiki/Luhn_algorithm
"""
- pass
+ digits = [int(d) for d in card_number if d.isdigit()]
+ checksum = 0
+ odd_digits = digits[-1::-2]
+ even_digits = digits[-2::-2]
+ checksum += sum(odd_digits)
+ for digit in even_digits:
+ checksum += sum(divmod(digit * 2, 10))
+ if checksum % 10 == 0:
+ return card_number
+ raise ValueError('Invalid card number (failed Luhn check)')
@classmethod
def validate_length_for_brand(cls, card_number: 'PaymentCardNumber'
@@ -559,7 +589,14 @@ class PaymentCardNumber(str):
Validate length based on BIN for major brands:
https://en.wikipedia.org/wiki/Payment_card_number#Issuer_identification_number_(IIN)
"""
- pass
+ length = len(card_number)
+ if card_number.brand == PaymentCardBrand.amex and length == 15:
+ return card_number
+ elif card_number.brand in (PaymentCardBrand.mastercard, PaymentCardBrand.visa) and length in (16, 19):
+ return card_number
+ elif card_number.brand == PaymentCardBrand.other and 12 <= length <= 19:
+ return card_number
+ raise ValueError(f'Invalid length for {card_number.brand} card')
BYTE_SIZES = {'b': 1, 'kb': 10 ** 3, 'mb': 10 ** 6, 'gb': 10 ** 9, 'tb': 10 **
diff --git a/pydantic/v1/typing.py b/pydantic/v1/typing.py
index 72034371..2581329e 100644
--- a/pydantic/v1/typing.py
+++ b/pydantic/v1/typing.py
@@ -38,7 +38,10 @@ else:
It should be useless once https://github.com/cython/cython/issues/3537 is
solved and https://github.com/pydantic/pydantic/pull/1753 is merged.
"""
- pass
+ origin = _typing_get_origin(tp)
+ if origin is None and isinstance(tp, type):
+ return tp
+ return origin
if sys.version_info < (3, 8):
from typing import _GenericAlias
@@ -48,7 +51,12 @@ if sys.version_info < (3, 8):
Mostly compatible with the python 3.8 `typing` module version
and able to handle almost all use cases.
"""
- pass
+ if isinstance(t, _GenericAlias):
+ res = t.__args__
+ if get_origin(t) is Callable and res[0] is not Ellipsis:
+ res = (list(res[:-1]), res[-1])
+ return res
+ return ()
else:
from typing import get_args as _typing_get_args
@@ -59,7 +67,10 @@ else:
In order to still support `Dict` for example and consider it as `Dict[Any, Any]`,
we retrieve the `_nparams` value that tells us how many parameters it needs.
"""
- pass
+ args = _typing_get_args(tp)
+ if not args and isinstance(tp, typing._GenericAlias) and hasattr(tp, '_nparams'):
+ return (Any,) * tp._nparams
+ return args
def get_args(tp: Type[Any]) ->Tuple[Any, ...]:
"""Get type arguments with all substitutions performed.
@@ -72,7 +83,9 @@ else:
get_args(Union[int, Tuple[T, int]][str]) == (int, Tuple[str, int])
get_args(Callable[[], T][int]) == ([], int)
"""
- pass
+ if isinstance(tp, WithArgsTypes):
+ return _generic_get_args(tp)
+ return _typing_get_args(tp)
if sys.version_info < (3, 9):
def convert_generics(tp: Type[Any]) ->Type[Any]:
@@ -82,7 +95,7 @@ if sys.version_info < (3, 9):
Examples::
typing.List['Hero'] == typing.List[ForwardRef('Hero')]
"""
- pass
+ return tp
else:
from typing import _UnionGenericAlias
from typing_extensions import _AnnotatedAlias
@@ -97,7 +110,13 @@ else:
convert_generics(typing.Dict['Hero', 'Team']) == typing.Dict[ForwardRef('Hero'), ForwardRef('Team')]
convert_generics(list[str | 'Hero'] | int) == list[str | ForwardRef('Hero')] | int
"""
- pass
+ if isinstance(tp, str):
+ return ForwardRef(tp)
+ if isinstance(tp, (_UnionGenericAlias, TypesUnionType)):
+ return tp.__class__(*[convert_generics(a) for a in tp.__args__])
+ if isinstance(tp, (_AnnotatedAlias, TypingGenericAlias)):
+ return tp.__class__(tp.__origin__, *[convert_generics(a) for a in tp.__args__])
+ return tp
if sys.version_info < (3, 10):
WithArgsTypes = TypingGenericAlias,
else:
@@ -147,7 +166,23 @@ def resolve_annotations(raw_annotations: Dict[str, Type[Any]], module_name:
Resolve string or ForwardRef annotations into type objects if possible.
"""
- pass
+ resolved_annotations = {}
+ for name, value in raw_annotations.items():
+ if isinstance(value, str):
+ try:
+ value = ForwardRef(value)
+ except TypeError:
+ # TypeErrors can be raised when using Literal['string-literal']
+ # String literals as strings are not supported in 3.7
+ pass
+ if isinstance(value, ForwardRef):
+ try:
+ value = _eval_type(value, globals(), locals())
+ except NameError:
+ # Fall back to ForwardRef if the type cannot be evaluated
+ pass
+ resolved_annotations[name] = value
+ return resolved_annotations
def all_literal_values(type_: Type[Any]) ->Tuple[Any, ...]:
@@ -156,7 +191,20 @@ def all_literal_values(type_: Type[Any]) ->Tuple[Any, ...]:
Literal can be used recursively (see https://www.python.org/dev/peps/pep-0586)
e.g. `Literal[Literal[Literal[1, 2, 3], "foo"], 5, None]`
"""
- pass
+ if hasattr(type_, '__values__'):
+ values = type_.__values__
+ elif hasattr(type_, '__args__'):
+ values = type_.__args__
+ else:
+ raise ValueError(f'Cannot extract literal values from {type_}')
+
+ literal_values = []
+ for value in values:
+ if hasattr(value, '__values__') or hasattr(value, '__args__'):
+ literal_values.extend(all_literal_values(value))
+ else:
+ literal_values.append(value)
+ return tuple(literal_values)
def is_namedtuple(type_: Type[Any]) ->bool:
@@ -164,7 +212,13 @@ def is_namedtuple(type_: Type[Any]) ->bool:
Check if a given class is a named tuple.
It can be either a `typing.NamedTuple` or `collections.namedtuple`
"""
- pass
+ return (
+ isinstance(type_, type) and
+ issubclass(type_, tuple) and
+ hasattr(type_, '_fields') and
+ hasattr(type_, '_field_defaults') and
+ hasattr(type_, '_asdict')
+ )
def is_typeddict(type_: Type[Any]) ->bool:
@@ -172,14 +226,14 @@ def is_typeddict(type_: Type[Any]) ->bool:
Check if a given class is a typed dict (from `typing` or `typing_extensions`)
In 3.10, there will be a public method (https://docs.python.org/3.10/library/typing.html#typing.is_typeddict)
"""
- pass
+ return hasattr(type_, '__annotations__') and hasattr(type_, '__total__')
def is_typeddict_special(type_: Any) ->bool:
"""
Check if type is a TypedDict special form (Required or NotRequired).
"""
- pass
+ return type_ in (TypedDictRequired, TypedDictNotRequired)
test_type = NewType('test_type', str)
@@ -189,14 +243,14 @@ def is_new_type(type_: Type[Any]) ->bool:
"""
Check whether type_ was created using typing.NewType
"""
- pass
+ return hasattr(type_, '__supertype__')
def _check_finalvar(v: Optional[Type[Any]]) ->bool:
"""
Check if a given type is a `typing.Final` type.
"""
- pass
+ return isinstance(v, Final) or (isinstance(v, type) and issubclass(v, Final))
def update_field_forward_refs(field: 'ModelField', globalns: Any, localns: Any
@@ -204,7 +258,12 @@ def update_field_forward_refs(field: 'ModelField', globalns: Any, localns: Any
"""
Try to update ForwardRefs on fields based on this ModelField, globalns and localns.
"""
- pass
+ if field.type_.__class__ == ForwardRef:
+ field.type_ = field.type_._evaluate(globalns, localns)
+ field.prepare()
+ if field.sub_fields:
+ for sub_field in field.sub_fields:
+ update_field_forward_refs(sub_field, globalns=globalns, localns=localns)
def update_model_forward_refs(model: Type[Any], fields: Iterable[
@@ -214,7 +273,25 @@ def update_model_forward_refs(model: Type[Any], fields: Iterable[
"""
Try to update model fields ForwardRefs based on model and localns.
"""
- pass
+ def _update_forward_refs(typ: Any) ->None:
+ try:
+ typ.update_forward_refs(**localns)
+ except exc_to_suppress:
+ pass
+
+ _update_forward_refs(model)
+ for field in fields:
+ try:
+ update_field_forward_refs(field, globalns=model.__module__, localns=localns)
+ except exc_to_suppress:
+ pass
+
+ for key, value in json_encoders.items():
+ if isinstance(key, ForwardRef):
+ try:
+ json_encoders[key._evaluate(model.__module__, localns)] = value
+ except exc_to_suppress:
+ continue
def get_class(type_: Type[Any]) ->Union[None, bool, Type[Any]]:
@@ -222,7 +299,13 @@ def get_class(type_: Type[Any]) ->Union[None, bool, Type[Any]]:
Tries to get the class of a Type[T] annotation. Returns True if Type is used
without brackets. Otherwise returns None.
"""
- pass
+ if type_ is Type:
+ return True
+ if hasattr(type_, '__origin__') and type_.__origin__ is Type:
+ if not type_.__args__:
+ return True
+ return type_.__args__[0]
+ return None
def get_sub_types(tp: Any) ->List[Any]:
@@ -230,4 +313,9 @@ def get_sub_types(tp: Any) ->List[Any]:
Return all the types that are allowed by type `tp`
`tp` can be a `Union` of allowed types or an `Annotated` type
"""
- pass
+ if is_union(tp):
+ return list(get_args(tp))
+ elif get_origin(tp) is Annotated:
+ return [get_args(tp)[0]]
+ else:
+ return [tp]
diff --git a/pydantic/v1/utils.py b/pydantic/v1/utils.py
index effb78d4..3efe0942 100644
--- a/pydantic/v1/utils.py
+++ b/pydantic/v1/utils.py
@@ -42,14 +42,26 @@ def import_string(dotted_path: str) ->Any:
Stolen approximately from django. Import a dotted module path and return the attribute/class designated by the
last name in the path. Raise ImportError if the import fails.
"""
- pass
+ try:
+ module_path, class_name = dotted_path.rsplit('.', 1)
+ except ValueError as err:
+ raise ImportError("%s doesn't look like a module path" % dotted_path) from err
+
+ module = __import__(module_path, fromlist=[class_name])
+ try:
+ return getattr(module, class_name)
+ except AttributeError as err:
+ raise ImportError('Module "%s" does not define a "%s" attribute/class' % (module_path, class_name)) from err
def truncate(v: Union[str], *, max_len: int=80) ->str:
"""
Truncate a value and add a unicode ellipsis (three dots) to the end if it was too long
"""
- pass
+ s = str(v)
+ if len(s) <= max_len:
+ return s
+ return s[:max_len - 1].rstrip() + '…'
def validate_field_name(bases: List[Type['BaseModel']], field_name: str
@@ -57,14 +69,22 @@ def validate_field_name(bases: List[Type['BaseModel']], field_name: str
"""
Ensure that the field's name does not shadow an existing attribute of the model.
"""
- pass
+ for base in bases:
+ if getattr(base, field_name, None):
+ raise NameError(
+ f'Field name "{field_name}" shadows an attribute in parent "{base.__name__}"'
+ )
def in_ipython() ->bool:
"""
Check whether we're in an ipython environment, including jupyter notebooks.
"""
- pass
+ try:
+ from IPython import get_ipython
+ return get_ipython() is not None
+ except ImportError:
+ return False
def is_valid_identifier(identifier: str) ->bool:
@@ -73,7 +93,7 @@ def is_valid_identifier(identifier: str) ->bool:
:param identifier: The identifier to test.
:return: True if the identifier is valid.
"""
- pass
+ return identifier.isidentifier() and not keyword.iskeyword(identifier)
KeyType = TypeVar('KeyType')
diff --git a/pydantic/v1/validators.py b/pydantic/v1/validators.py
index fd53f500..81cfd234 100644
--- a/pydantic/v1/validators.py
+++ b/pydantic/v1/validators.py
@@ -35,7 +35,9 @@ def constant_validator(v: 'Any', field: 'ModelField') ->'Any':
of the field. This is to support the keyword of the same name in JSON
Schema.
"""
- pass
+ if v != field.default:
+ raise errors.ConstError(field=field)
+ return v
def ip_v4_network_validator(v: Any) ->IPv4Network:
@@ -45,7 +47,10 @@ def ip_v4_network_validator(v: Any) ->IPv4Network:
See more:
https://docs.python.org/library/ipaddress.html#ipaddress.IPv4Network
"""
- pass
+ try:
+ return IPv4Network(v)
+ except ValueError:
+ raise errors.IPvAnyNetworkError()
def ip_v6_network_validator(v: Any) ->IPv6Network:
@@ -55,7 +60,10 @@ def ip_v6_network_validator(v: Any) ->IPv6Network:
See more:
https://docs.python.org/library/ipaddress.html#ipaddress.IPv6Network
"""
- pass
+ try:
+ return IPv6Network(v)
+ except ValueError:
+ raise errors.IPvAnyNetworkError()
def callable_validator(v: Any) ->AnyCallable:
@@ -64,7 +72,9 @@ def callable_validator(v: Any) ->AnyCallable:
Note: complete matching of argument type hints and return types is not performed
"""
- pass
+ if not callable(v):
+ raise errors.CallableError(value=v)
+ return v
T = TypeVar('T')
diff --git a/pydantic/validate_call_decorator.py b/pydantic/validate_call_decorator.py
index 5eb2596a..dd2486a3 100644
--- a/pydantic/validate_call_decorator.py
+++ b/pydantic/validate_call_decorator.py
@@ -26,4 +26,19 @@ def validate_call(func: (AnyCallableT | None)=None, /, *, config: (
Returns:
The decorated function.
"""
- pass
+ def decorator(f: AnyCallableT) -> AnyCallableT:
+ @functools.wraps(f)
+ def wrapper(*args: Any, **kwargs: Any) -> Any:
+ validated_args, validated_kwargs = _validate_call.validate_arguments(
+ f, args, kwargs, config
+ )
+ result = f(*validated_args, **validated_kwargs)
+ if validate_return:
+ return _validate_call.validate_return(f, result, config)
+ return result
+ return _typing_extra.cast(AnyCallableT, wrapper)
+
+ if func is None:
+ return decorator
+ else:
+ return decorator(func)
diff --git a/pydantic/version.py b/pydantic/version.py
index d63da2ae..19419595 100644
--- a/pydantic/version.py
+++ b/pydantic/version.py
@@ -10,12 +10,21 @@ def version_short() ->str:
It returns '2.1' if Pydantic version is '2.1.1'.
"""
- pass
+ return '.'.join(VERSION.split('.')[:2])
def version_info() ->str:
"""Return complete version information for Pydantic and its dependencies."""
- pass
+ import sys
+ import platform
+
+ info = [
+ f'pydantic version: {VERSION}',
+ f'python version: {platform.python_version()}',
+ f'platform: {platform.platform()}',
+ f'implementation: {platform.python_implementation()}',
+ ]
+ return '\n'.join(info)
def parse_mypy_version(version: str) ->tuple[int, ...]:
@@ -30,4 +39,6 @@ def parse_mypy_version(version: str) ->tuple[int, ...]:
Returns:
A tuple of ints. e.g. (0, 930).
"""
- pass
+ # Split the version string and take only the part before '+'
+ version_parts = version.split('+')[0].split('.')
+ return tuple(int(part) for part in version_parts)
diff --git a/pydantic/warnings.py b/pydantic/warnings.py
index 0d1b0a6a..cb5e9795 100644
--- a/pydantic/warnings.py
+++ b/pydantic/warnings.py
@@ -50,7 +50,7 @@ class PydanticDeprecatedSince26(PydanticDeprecationWarning):
"""A specific `PydanticDeprecationWarning` subclass defining functionality deprecated since Pydantic 2.6."""
def __init__(self, message: str, *args: object) ->None:
- super().__init__(message, *args, since=(2, 0), expected_removal=(3, 0))
+ super().__init__(message, *args, since=(2, 6), expected_removal=(3, 0))
class GenericBeforeBaseModelWarning(Warning):
diff --git a/tests/test_networks.py b/tests/test_networks.py
index dafa82c1..b3d15060 100644
--- a/tests/test_networks.py
+++ b/tests/test_networks.py
@@ -959,6 +959,46 @@ def test_email_validator_not_installed():
validate_email('s@muelcolvin.com')
+@pytest.mark.skipif(not email_validator, reason='email_validator not installed')
+def test_validate_email():
+ assert validate_email('simple@example.com') == ('simple', 'simple@example.com')
+ assert validate_email('very.common@example.com') == ('very.common', 'very.common@example.com')
+ assert validate_email('disposable.style.email.with+symbol@example.com') == ('disposable.style.email.with+symbol', 'disposable.style.email.with+symbol@example.com')
+ assert validate_email('other.email-with-hyphen@example.com') == ('other.email-with-hyphen', 'other.email-with-hyphen@example.com')
+ assert validate_email('fully-qualified-domain@example.com') == ('fully-qualified-domain', 'fully-qualified-domain@example.com')
+ assert validate_email('user.name+tag+sorting@example.com') == ('user.name+tag+sorting', 'user.name+tag+sorting@example.com')
+ assert validate_email('x@example.com') == ('x', 'x@example.com')
+ assert validate_email('example-indeed@strange-example.com') == ('example-indeed', 'example-indeed@strange-example.com')
+ assert validate_email('example@s.example') == ('example', 'example@s.example')
+ assert validate_email('" "@example.org') == ('" "', '" "@example.org')
+ assert validate_email('"john..doe"@example.org') == ('"john..doe"', '"john..doe"@example.org')
+ assert validate_email('mailhost!username@example.org') == ('mailhost!username', 'mailhost!username@example.org')
+ assert validate_email('user%example.com@example.org') == ('user%example.com', 'user%example.com@example.org')
+ assert validate_email('user-@example.org') == ('user-', 'user-@example.org')
+
+@pytest.mark.skipif(not email_validator, reason='email_validator not installed')
+def test_validate_email_with_name():
+ assert validate_email('John Doe <johndoe@example.com>') == ('John Doe', 'johndoe@example.com')
+ assert validate_email('"John Doe" <johndoe@example.com>') == ('John Doe', 'johndoe@example.com')
+ assert validate_email('John "Johnny" Doe <johndoe@example.com>') == ('John "Johnny" Doe', 'johndoe@example.com')
+
+@pytest.mark.skipif(not email_validator, reason='email_validator not installed')
+def test_validate_email_errors():
+ with pytest.raises(PydanticCustomError, match='value is not a valid email address: The email address contains invalid characters before the @-sign'):
+ validate_email('Abc.example.com')
+
+ with pytest.raises(PydanticCustomError, match='value is not a valid email address: An email address must have an @-sign'):
+ validate_email('A@b@c@example.com')
+
+ with pytest.raises(PydanticCustomError, match='value is not a valid email address: The part after the @-sign is not valid'):
+ validate_email('a"b(c)d,e:f;g<h>i[j\k]l@example.com')
+
+ with pytest.raises(PydanticCustomError, match='value is not a valid email address: There must be something before the @-sign'):
+ validate_email('@example.com')
+
+ with pytest.raises(PydanticCustomError, match='Email address is too long'):
+ validate_email('a' * 2000 + '@example.com')
+
@pytest.mark.skipif(not email_validator, reason='email_validator not installed')
def test_name_email():
class Model(BaseModel):