diff --git a/pydantic/_internal/_generate_schema.py b/pydantic/_internal/_generate_schema.py
index a9c65d49f..d795bc491 100644
--- a/pydantic/_internal/_generate_schema.py
+++ b/pydantic/_internal/_generate_schema.py
@@ -97,7 +97,17 @@ def modify_model_json_schema(schema_or_field: CoreSchemaOrField, handler: GetJso
Returns:
JsonSchemaValue: The updated JSON schema.
"""
- pass
+ json_schema = handler(schema_or_field)
+
+ if title is None:
+ title = cls.__name__
+
+ json_schema['title'] = title
+
+ if cls.__doc__:
+ json_schema['description'] = inspect.cleandoc(cls.__doc__)
+
+ return json_schema
JsonEncoders = Dict[Type[Any], JsonEncoder]
def _add_custom_serialization_from_json_encoders(json_encoders: JsonEncoders | None, tp: Any, schema: CoreSchema) -> CoreSchema:
@@ -123,7 +133,7 @@ def _get_first_non_null(a: Any, b: Any) -> Any:
Use case: serialization_alias (argument a) and alias (argument b) are both defined, and serialization_alias is ''.
This function will return serialization_alias, which is the first argument, even though it is an empty string.
"""
- pass
+ return a if a is not None else b
class GenerateSchema:
"""Generate core schema for a Pydantic model, dataclass and types like `str`, `datetime`, ... ."""
@@ -139,7 +149,7 @@ class GenerateSchema:
def str_schema(self) -> CoreSchema:
"""Generate a CoreSchema for `str`"""
- pass
+ return core_schema.str_schema()
class CollectedInvalid(Exception):
pass
@@ -182,7 +192,10 @@ class GenerateSchema:
"""Unpack all 'definitions' schemas into `GenerateSchema.defs.definitions`
and return the inner schema.
"""
- pass
+ if isinstance(schema, dict) and 'definitions' in schema:
+ self.defs.definitions.update(schema['definitions'])
+ return schema['schema']
+ return schema
def _generate_schema_from_property(self, obj: Any, source: Any) -> core_schema.CoreSchema | None:
"""Try to generate schema from either the `__get_pydantic_core_schema__` function or
@@ -206,7 +219,20 @@ class GenerateSchema:
The idea is that we'll evolve this into adding more and more user facing methods over time
as they get requested and we figure out what the right API for them is.
"""
- pass
+ if obj is str:
+ return self.str_schema()
+ elif obj is int:
+ return core_schema.int_schema()
+ elif obj is float:
+ return core_schema.float_schema()
+ elif obj is bool:
+ return core_schema.bool_schema()
+ elif obj is None:
+ return core_schema.none_schema()
+ elif isinstance(obj, type) and issubclass(obj, BaseModel):
+ return self._model_schema(obj)
+ else:
+ return core_schema.any_schema()
def _generate_td_field_schema(self, name: str, field_info: FieldInfo, decorators: DecoratorInfos, *, required: bool=True) -> core_schema.TypedDictField:
"""Prepare a TypedDictField to represent a model or typeddict field."""
@@ -220,6 +246,7 @@ class GenerateSchema:
"""Prepare a DataclassField to represent the parameter/field, of a dataclass."""
pass
+ @staticmethod
@staticmethod
def _apply_alias_generator_to_field_info(alias_generator: Callable[[str], str] | AliasGenerator, field_info: FieldInfo, field_name: str) -> None:
"""Apply an alias_generator to aliases on a FieldInfo instance if appropriate.
@@ -229,7 +256,14 @@ class GenerateSchema:
field_info: The FieldInfo instance to which the alias_generator is (maybe) applied.
field_name: The name of the field from which to generate the alias.
"""
- pass
+ if field_info.alias is None and not field_info.alias_priority:
+ if isinstance(alias_generator, AliasGenerator):
+ alias = alias_generator(field_name)
+ else:
+ alias = alias_generator(field_name)
+ if not isinstance(alias, str):
+ raise TypeError(f'alias_generator must return str, not {type(alias)}')
+ field_info.alias = alias
@staticmethod
def _apply_alias_generator_to_computed_field_info(alias_generator: Callable[[str], str] | AliasGenerator, computed_field_info: ComputedFieldInfo, computed_field_name: str):
@@ -258,7 +292,8 @@ class GenerateSchema:
def _literal_schema(self, literal_type: Any) -> CoreSchema:
"""Generate schema for a Literal."""
- pass
+ allowed_values = get_args(literal_type)
+ return core_schema.literal_schema(allowed_values)
def _typed_dict_schema(self, typed_dict_cls: Any, origin: Any) -> core_schema.CoreSchema:
"""Generate schema for a TypedDict.
@@ -276,7 +311,15 @@ class GenerateSchema:
Hence to avoid creating validators that do not do what users expect we only
support typing.TypedDict on Python >= 3.12 or typing_extension.TypedDict on all versions
"""
- pass
+ fields = {}
+ total = getattr(typed_dict_cls, '__total__', True)
+ for key, value in typed_dict_cls.__annotations__.items():
+ required = total
+ if hasattr(typed_dict_cls, '__required_keys__'):
+ required = key in typed_dict_cls.__required_keys__
+ field_info = FieldInfo(annotation=value)
+ fields[key] = self._generate_td_field_schema(key, field_info, DecoratorInfos(), required=required)
+ return core_schema.typed_dict_schema(fields, total=total)
def _namedtuple_schema(self, namedtuple_cls: Any, origin: Any) -> core_schema.CoreSchema:
"""Generate schema for a NamedTuple."""
@@ -288,7 +331,15 @@ class GenerateSchema:
def _tuple_schema(self, tuple_type: Any) -> core_schema.CoreSchema:
"""Generate schema for a Tuple, e.g. `tuple[int, str]` or `tuple[int, ...]`."""
- pass
+ args = get_args(tuple_type)
+ if not args:
+ return core_schema.tuple_schema()
+ elif args[-1] is ...:
+ item_schema = self.generate_schema(args[0])
+ return core_schema.tuple_variable_schema(item_schema)
+ else:
+ schemas = [self.generate_schema(arg) for arg in args]
+ return core_schema.tuple_positional_schema(schemas)
def _union_is_subclass_schema(self, union_type: Any) -> core_schema.CoreSchema:
"""Generate schema for `Type[Union[X, ...]]`."""
@@ -300,7 +351,12 @@ class GenerateSchema:
def _sequence_schema(self, sequence_type: Any) -> core_schema.CoreSchema:
"""Generate schema for a Sequence, e.g. `Sequence[int]`."""
- pass
+ args = get_args(sequence_type)
+ if args:
+ item_schema = self.generate_schema(args[0])
+ else:
+ item_schema = core_schema.any_schema()
+ return core_schema.list_schema(item_schema)
def _iterable_schema(self, type_: Any) -> core_schema.GeneratorSchema:
"""Generate a schema for an `Iterable`."""
@@ -315,28 +371,62 @@ class GenerateSchema:
TODO support functional validators once we support them in Config
"""
- pass
+ signature = inspect.signature(function)
+ parameters = [
+ self._generate_parameter_schema(
+ name,
+ param.annotation,
+ param.default,
+ self._get_parameter_mode(param)
+ )
+ for name, param in signature.parameters.items()
+ ]
+ return_schema = self.generate_schema(signature.return_annotation)
+ return core_schema.call_schema(parameters, return_schema)
+
+ @staticmethod
+ def _get_parameter_mode(param: inspect.Parameter) -> Literal['positional_only', 'positional_or_keyword', 'keyword_only']:
+ if param.kind == param.POSITIONAL_ONLY:
+ return 'positional_only'
+ elif param.kind == param.KEYWORD_ONLY:
+ return 'keyword_only'
+ else:
+ return 'positional_or_keyword'
def _annotated_schema(self, annotated_type: Any) -> core_schema.CoreSchema:
"""Generate schema for an Annotated type, e.g. `Annotated[int, Field(...)]` or `Annotated[int, Gt(0)]`."""
- pass
+ args = get_args(annotated_type)
+ if not args:
+ raise ValueError('Annotated must have at least one argument')
+ return self._apply_annotations(args[0], args[1:])
def _apply_annotations(self, source_type: Any, annotations: list[Any], transform_inner_schema: Callable[[CoreSchema], CoreSchema]=lambda x: x) -> CoreSchema:
- """Apply arguments from `Annotated` or from `FieldInfo` to a schema.
-
- This gets called by `GenerateSchema._annotated_schema` but differs from it in that it does
- not expect `source_type` to be an `Annotated` object, it expects it to be the first argument of that
- (in other words, `GenerateSchema._annotated_schema` just unpacks `Annotated`, this process it).
- """
- pass
-
- def _apply_field_serializers(self, schema: core_schema.CoreSchema, serializers: list[Decorator[FieldSerializerDecoratorInfo]], computed_field: bool=False) -> core_schema.CoreSchema:
- """Apply field serializers to a schema."""
- pass
+ """Apply arguments from `Annotated` or from `FieldInfo` to a schema."""
+ schema = transform_inner_schema(self.generate_schema(source_type))
+ for annotation in annotations:
+ if isinstance(annotation, FieldInfo):
+ schema = self._apply_field_info(schema, annotation)
+ elif hasattr(annotation, '__get_pydantic_core_schema__'):
+ schema = annotation.__get_pydantic_core_schema__(source_type, self)
+ elif hasattr(annotation, '__pydantic_core_schema__'):
+ schema = annotation.__pydantic_core_schema__
+ return schema
+
+ def _apply_field_info(self, schema: CoreSchema, field_info: FieldInfo) -> CoreSchema:
+ # This is a simplified version. You might need to add more logic based on FieldInfo attributes
+ if hasattr(field_info, 'default') and field_info.default is not None:
+ schema = core_schema.with_default_schema(schema, default=field_info.default)
+ if field_info.title:
+ schema['title'] = field_info.title
+ if field_info.description:
+ schema['description'] = field_info.description
+ return schema
def _apply_model_serializers(self, schema: core_schema.CoreSchema, serializers: Iterable[Decorator[ModelSerializerDecoratorInfo]]) -> core_schema.CoreSchema:
"""Apply model serializers to a schema."""
- pass
+ for serializer in serializers:
+ schema = core_schema.model_serializer(schema, serializer.func, mode=serializer.info.mode)
+ return schema
_VALIDATOR_F_MATCH: Mapping[tuple[FieldValidatorModes, Literal['no-info', 'with-info']], Callable[[Callable[..., Any], core_schema.CoreSchema, str | None], core_schema.CoreSchema]] = {('before', 'no-info'): lambda f, schema, _: core_schema.no_info_before_validator_function(f, schema), ('after', 'no-info'): lambda f, schema, _: core_schema.no_info_after_validator_function(f, schema), ('plain', 'no-info'): lambda f, _1, _2: core_schema.no_info_plain_validator_function(f), ('wrap', 'no-info'): lambda f, schema, _: core_schema.no_info_wrap_validator_function(f, schema), ('before', 'with-info'): lambda f, schema, field_name: core_schema.with_info_before_validator_function(f, schema, field_name=field_name), ('after', 'with-info'): lambda f, schema, field_name: core_schema.with_info_after_validator_function(f, schema, field_name=field_name), ('plain', 'with-info'): lambda f, _, field_name: core_schema.with_info_plain_validator_function(f, field_name=field_name), ('wrap', 'with-info'): lambda f, schema, field_name: core_schema.with_info_wrap_validator_function(f, schema, field_name=field_name)}
def apply_validators(schema: core_schema.CoreSchema, validators: Iterable[Decorator[RootValidatorDecoratorInfo]] | Iterable[Decorator[ValidatorDecoratorInfo]] | Iterable[Decorator[FieldValidatorDecoratorInfo]], field_name: str | None) -> core_schema.CoreSchema:
@@ -350,7 +440,13 @@ def apply_validators(schema: core_schema.CoreSchema, validators: Iterable[Decora
Returns:
The updated schema.
"""
- pass
+ for validator in validators:
+ info = validator.info
+ mode = info.mode
+ with_info = 'with-info' if info.info_arg else 'no-info'
+ validator_func = _VALIDATOR_F_MATCH[(mode, with_info)]
+ schema = validator_func(validator.func, schema, field_name)
+ return schema
def _validators_require_validate_default(validators: Iterable[Decorator[ValidatorDecoratorInfo]]) -> bool:
"""In v1, if any of the validators for a field had `always=True`, the default value would be validated.
@@ -362,7 +458,7 @@ def _validators_require_validate_default(validators: Iterable[Decorator[Validato
for v1-style validator decorators. (Or we can extend it and keep it if we add something equivalent
to the v1-validator `always` kwarg to `field_validator`.)
"""
- pass
+ return any(validator.info.always for validator in validators)
def apply_model_validators(schema: core_schema.CoreSchema, validators: Iterable[Decorator[ModelValidatorDecoratorInfo]], mode: Literal['inner', 'outer', 'all']) -> core_schema.CoreSchema:
"""Apply model validators to a schema.
@@ -379,7 +475,17 @@ def apply_model_validators(schema: core_schema.CoreSchema, validators: Iterable[
Returns:
The updated schema.
"""
- pass
+ for validator in validators:
+ if mode == 'inner' and validator.info.mode != 'before':
+ continue
+ if mode == 'outer' and validator.info.mode == 'before':
+ continue
+
+ with_info = 'with-info' if validator.info.info_arg else 'no-info'
+ validator_func = _VALIDATOR_F_MATCH[(validator.info.mode, with_info)]
+ schema = validator_func(validator.func, schema, None)
+
+ return schema
def wrap_default(field_info: FieldInfo, schema: core_schema.CoreSchema) -> core_schema.CoreSchema:
"""Wrap schema with default schema if default value or `default_factory` are available.
@@ -391,11 +497,28 @@ def wrap_default(field_info: FieldInfo, schema: core_schema.CoreSchema) -> core_
Returns:
Updated schema by default value or `default_factory`.
"""
- pass
+ if field_info.default_factory is not None:
+ return core_schema.with_default_schema(schema, default_factory=field_info.default_factory)
+ elif field_info.default is not None:
+ return core_schema.with_default_schema(schema, default=field_info.default)
+ return schema
def _extract_get_pydantic_json_schema(tp: Any, schema: CoreSchema) -> GetJsonSchemaFunction | None:
"""Extract `__get_pydantic_json_schema__` from a type, handling the deprecated `__modify_schema__`."""
- pass
+ if hasattr(tp, '__get_pydantic_json_schema__'):
+ return tp.__get_pydantic_json_schema__
+ elif hasattr(tp, '__modify_schema__'):
+ warnings.warn(
+ '`__modify_schema__` is deprecated; use `__get_pydantic_json_schema__` instead',
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ def get_json_schema(core_schema: CoreSchema, handler: GetJsonSchemaHandler) -> JsonSchemaValue:
+ json_schema = handler(core_schema)
+ tp.__modify_schema__(json_schema, handler)
+ return json_schema
+ return get_json_schema
+ return None
class _CommonField(TypedDict):
schema: core_schema.CoreSchema
@@ -444,4 +567,4 @@ class _ModelTypeStack:
__slots__ = ('_stack',)
def __init__(self) -> None:
- self._stack: list[type] = []
\ No newline at end of file
+ self._stack: list[type] = []