back to Claude Sonnet 3.5 - Fill-in summary
Claude Sonnet 3.5 - Fill-in: flask
Failed to run pytests for test tests
ImportError while loading conftest '/testbed/tests/conftest.py'.
tests/conftest.py:8: in <module>
from flask import Flask
src/flask/__init__.py:5: in <module>
from . import json as json
src/flask/json/__init__.py:7: in <module>
from .provider import _default
src/flask/json/provider.py:112: in <module>
class DefaultJSONProvider(JSONProvider):
src/flask/json/provider.py:125: in DefaultJSONProvider
default: t.Callable[[t.Any], t.Any] = staticmethod(_default)
E NameError: name '_default' is not defined
Patch diff
diff --git a/src/flask/app.py b/src/flask/app.py
index 3e76b0ba..45fe946c 100644
--- a/src/flask/app.py
+++ b/src/flask/app.py
@@ -215,7 +215,7 @@ class Flask(App):
.. versionadded:: 0.9
"""
- pass
+ return self.config['SEND_FILE_MAX_AGE_DEFAULT']
def send_static_file(self, filename: str) ->Response:
"""The view function used to serve files from
@@ -229,7 +229,9 @@ class Flask(App):
.. versionadded:: 0.5
"""
- pass
+ if not self.has_static_folder:
+ raise RuntimeError('No static folder for this object')
+ return send_from_directory(self.static_folder, filename)
def open_resource(self, resource: str, mode: str='rb') ->t.IO[t.AnyStr]:
"""Open a resource file relative to :attr:`root_path` for
@@ -253,7 +255,9 @@ class Flask(App):
class.
"""
- pass
+ if mode not in ('r', 'rt', 'rb'):
+ raise ValueError('Resources can only be opened for reading')
+ return open(os.path.join(self.root_path, resource), mode)
def open_instance_resource(self, resource: str, mode: str='rb') ->t.IO[t
.AnyStr]:
@@ -266,7 +270,7 @@ class Flask(App):
subfolders use forward slashes as separator.
:param mode: resource file opening mode, default is 'rb'.
"""
- pass
+ return open(os.path.join(self.instance_path, resource), mode)
def create_jinja_environment(self) ->Environment:
"""Create the Jinja environment based on :attr:`jinja_options`
@@ -280,7 +284,22 @@ class Flask(App):
.. versionadded:: 0.5
"""
- pass
+ options = dict(self.jinja_options)
+ if 'autoescape' not in options:
+ options['autoescape'] = self.select_jinja_autoescape
+ if 'auto_reload' not in options:
+ options['auto_reload'] = self.config['TEMPLATES_AUTO_RELOAD']
+ rv = self.jinja_environment(self, **options)
+ rv.globals.update(
+ url_for=url_for,
+ get_flashed_messages=get_flashed_messages,
+ config=self.config,
+ request=request,
+ session=session,
+ g=g
+ )
+ rv.filters['tojson'] = json.tojson_filter
+ return rv
def create_url_adapter(self, request: (Request | None)) ->(MapAdapter |
None):
@@ -298,7 +317,18 @@ class Flask(App):
:data:`SERVER_NAME` no longer implicitly enables subdomain
matching. Use :attr:`subdomain_matching` instead.
"""
- pass
+ if request is not None:
+ return self.url_map.bind_to_environ(
+ request.environ,
+ server_name=self.config['SERVER_NAME']
+ )
+ if self.config['SERVER_NAME'] is not None:
+ return self.url_map.bind(
+ self.config['SERVER_NAME'],
+ script_name=self.config['APPLICATION_ROOT'],
+ url_scheme=self.config['PREFERRED_URL_SCHEME']
+ )
+ return None
def raise_routing_exception(self, request: Request) ->t.NoReturn:
"""Intercept routing exceptions and possibly do something else.
@@ -316,7 +346,11 @@ class Flask(App):
:meta private:
:internal:
"""
- pass
+ if not self.debug or not isinstance(request.routing_exception, RequestRedirect):
+ raise request.routing_exception
+
+ from .debughelpers import FormDataRoutingRedirect
+ raise FormDataRoutingRedirect(request)
def update_template_context(self, context: dict[str, t.Any]) ->None:
"""Update the template context with some commonly used variables.
@@ -329,7 +363,10 @@ class Flask(App):
:param context: the context as a dictionary that is updated in place
to add extra variables.
"""
- pass
+ names = ('request', 'session', 'config', 'g')
+ context.update((name, getattr(self, name, None)) for name in names)
+ for func in self.template_context_processors:
+ context.update(func())
def make_shell_context(self) ->dict[str, t.Any]:
"""Returns the shell context for an interactive shell for this
@@ -338,7 +375,10 @@ class Flask(App):
.. versionadded:: 0.11
"""
- pass
+ context = {'app': self, 'g': g}
+ for processor in self.shell_context_processors:
+ context.update(processor())
+ return context
def run(self, host: (str | None)=None, port: (int | None)=None, debug:
(bool | None)=None, load_dotenv: bool=True, **options: t.Any) ->None:
diff --git a/src/flask/blueprints.py b/src/flask/blueprints.py
index 446e7185..162d3921 100644
--- a/src/flask/blueprints.py
+++ b/src/flask/blueprints.py
@@ -43,7 +43,7 @@ class Blueprint(SansioBlueprint):
.. versionadded:: 0.9
"""
- pass
+ return current_app.get_send_file_max_age(filename)
def send_static_file(self, filename: str) ->Response:
"""The view function used to serve files from
@@ -57,7 +57,15 @@ class Blueprint(SansioBlueprint):
.. versionadded:: 0.5
"""
- pass
+ if not self.has_static_folder:
+ raise RuntimeError("No static folder for this blueprint")
+
+ # Ensure cache_timeout is set
+ cache_timeout = self.get_send_file_max_age(filename)
+
+ return send_from_directory(
+ self.static_folder, filename, cache_timeout=cache_timeout
+ )
def open_resource(self, resource: str, mode: str='rb') ->t.IO[t.AnyStr]:
"""Open a resource file relative to :attr:`root_path` for
@@ -81,4 +89,7 @@ class Blueprint(SansioBlueprint):
class.
"""
- pass
+ if mode not in ('r', 'rt', 'rb'):
+ raise ValueError("Resources can only be opened for reading")
+
+ return open(os.path.join(self.root_path, resource), mode)
diff --git a/src/flask/cli.py b/src/flask/cli.py
index f98c3984..347c5b77 100644
--- a/src/flask/cli.py
+++ b/src/flask/cli.py
@@ -36,7 +36,27 @@ def find_best_app(module: ModuleType) ->Flask:
"""Given a module instance this tries to find the best possible
application in the module or raises an exception.
"""
- pass
+ from flask import Flask
+
+ # Try to get an app instance directly
+ if hasattr(module, 'app') and isinstance(module.app, Flask):
+ return module.app
+
+ # Look for 'create_app' or 'make_app' factory functions
+ for app_factory in ('create_app', 'make_app'):
+ app_factory = getattr(module, app_factory, None)
+ if app_factory and callable(app_factory):
+ app = app_factory()
+ if isinstance(app, Flask):
+ return app
+
+ # Look for any Flask instance in the module
+ for name in dir(module):
+ attr = getattr(module, name)
+ if isinstance(attr, Flask):
+ return attr
+
+ raise NoAppException('Failed to find Flask application or factory function.')
def _called_with_wrong_args(f: t.Callable[..., Flask]) ->bool:
@@ -47,21 +67,64 @@ def _called_with_wrong_args(f: t.Callable[..., Flask]) ->bool:
:param f: The function that was called.
:return: ``True`` if the call failed.
"""
- pass
+ try:
+ f()
+ except TypeError as e:
+ tb = traceback.extract_tb(sys.exc_info()[2])
+ if len(tb) == 1:
+ return True # The exception was raised at the function call
+ return False # The exception was raised inside the function
+ except Exception:
+ return False
+ return False
def find_app_by_string(module: ModuleType, app_name: str) ->Flask:
"""Check if the given string is a variable name or a function. Call
a function to get the app instance, or return the variable directly.
"""
- pass
+ from flask import Flask
+
+ # Check if app_name refers to a variable
+ if hasattr(module, app_name):
+ app = getattr(module, app_name)
+ if isinstance(app, Flask):
+ return app
+ elif callable(app):
+ app = app()
+ if isinstance(app, Flask):
+ return app
+
+ # If not found or not a Flask instance, raise an exception
+ raise NoAppException(f'Failed to find application in "{app_name}".')
def prepare_import(path: str) ->str:
"""Given a filename this will try to calculate the python path, add it
to the search path and return the actual module name that is expected.
"""
- pass
+ path = os.path.realpath(path)
+
+ if os.path.splitext(path)[1] == '.py':
+ path = os.path.splitext(path)[0]
+
+ if os.path.basename(path) == '__init__':
+ path = os.path.dirname(path)
+
+ module_name = []
+
+ # move up until outside package structure (no __init__.py)
+ while True:
+ path, name = os.path.split(path)
+ module_name.append(name)
+
+ if not os.path.exists(os.path.join(path, '__init__.py')):
+ break
+
+ if sys.path[0] != path:
+ sys.path.insert(0, path)
+
+ return '.'.join(module_name[::-1])
version_option = click.Option(['--version'], help='Show the Flask version.',
@@ -90,7 +153,36 @@ class ScriptInfo:
this multiple times will just result in the already loaded app to
be returned.
"""
- pass
+ if self._loaded_app is not None:
+ return self._loaded_app
+
+ if self.create_app is not None:
+ app = self.create_app()
+ else:
+ if self.app_import_path:
+ path, name = (self.app_import_path.split(':', 1) + [None])[:2]
+ import_name = prepare_import(path)
+ app = locate_app(import_name, name)
+ else:
+ for path in ('wsgi.py', 'app.py'):
+ import_name = prepare_import(path)
+ app = locate_app(import_name)
+ if app:
+ break
+ else:
+ raise NoAppException(
+ 'Could not locate a Flask application. You did not provide '
+ 'the "FLASK_APP" environment variable, and a "wsgi.py" or '
+ '"app.py" module was not found in the current directory.'
+ )
+
+ if self.set_debug_flag:
+ # Update the app's debug flag through the descriptor so that
+ # other values repopulate as well.
+ app.debug = get_debug_flag()
+
+ self._loaded_app = app
+ return app
pass_script_info = click.make_pass_decorator(ScriptInfo, ensure=True)
@@ -110,7 +202,11 @@ def with_appcontext(f: F) ->F:
decorated callback. The app context is always available to
``app.cli`` command and parameter callbacks.
"""
- pass
+ @click.pass_context
+ def decorator(__ctx: click.Context, *args: t.Any, **kwargs: t.Any) -> t.Any:
+ with __ctx.ensure_object(ScriptInfo).load_app().app_context():
+ return __ctx.invoke(f, *args, **kwargs)
+ return update_wrapper(t.cast(F, decorator), f)
class AppGroup(click.Group):
@@ -127,7 +223,14 @@ class AppGroup(click.Group):
:class:`click.Group` but it wraps callbacks in :func:`with_appcontext`
unless it's disabled by passing ``with_appcontext=False``.
"""
- pass
+ wrap_for_ctx = kwargs.pop("with_appcontext", True)
+
+ def decorator(f):
+ if wrap_for_ctx:
+ f = with_appcontext(f)
+ return click.Group.command(self, *args, **kwargs)(f)
+
+ return decorator
def group(self, *args: t.Any, **kwargs: t.Any) ->t.Callable[[t.Callable
[..., t.Any]], click.Group]:
@@ -135,7 +238,8 @@ class AppGroup(click.Group):
:class:`click.Group` but it defaults the group class to
:class:`AppGroup`.
"""
- pass
+ kwargs.setdefault('cls', AppGroup)
+ return super().group(*args, **kwargs)
_app_option = click.Option(['-A', '--app'], metavar='IMPORT', help=
@@ -204,7 +308,11 @@ def _path_is_ancestor(path: str, other: str) ->bool:
"""Take ``other`` and remove the length of ``path`` from it. Then join it
to ``path``. If it is the original value, ``path`` is an ancestor of
``other``."""
- pass
+ path = os.path.normpath(path)
+ other = os.path.normpath(other)
+ if other.startswith(path):
+ return os.path.join(path, other[len(path):].lstrip(os.sep)) == other
+ return False
def load_dotenv(path: (str | os.PathLike[str] | None)=None) ->bool:
@@ -233,14 +341,50 @@ def load_dotenv(path: (str | os.PathLike[str] | None)=None) ->bool:
.. versionadded:: 1.0
"""
- pass
+ try:
+ import dotenv
+ except ImportError:
+ if path or os.path.isfile('.env') or os.path.isfile('.flaskenv'):
+ click.secho(
+ ' * Tip: There are .env or .flaskenv files present.'
+ ' Do "pip install python-dotenv" to use them.',
+ fg='yellow',
+ err=True,
+ )
+ return False
+
+ if path is not None:
+ if os.path.isfile(path):
+ return dotenv.load_dotenv(path, encoding='utf-8')
+ return False
+
+ new_dir = None
+
+ for name in ('.env', '.flaskenv'):
+ path = dotenv.find_dotenv(name, usecwd=True)
+ if not path:
+ continue
+ if new_dir is None:
+ new_dir = os.path.dirname(path)
+ dotenv.load_dotenv(path, encoding='utf-8')
+
+ return new_dir is not None
def show_server_banner(debug: bool, app_import_path: (str | None)) ->None:
"""Show extra startup messages the first time the server is run,
ignoring the reloader.
"""
- pass
+ if is_running_from_reloader():
+ return
+
+ if app_import_path is not None:
+ message = f" * Serving Flask app '{app_import_path}'"
+ else:
+ message = " * Serving Flask app"
+
+ click.echo(message)
+ click.echo(f" * Debug mode: {'on' if debug else 'off'}")
class CertParamType(click.ParamType):
@@ -260,7 +404,18 @@ def _validate_key(ctx: click.Context, param: click.Parameter, value: t.Any
"""The ``--key`` option must be specified when ``--cert`` is a file.
Modifies the ``cert`` param to be a ``(cert, key)`` pair if needed.
"""
- pass
+ cert = ctx.params.get("cert")
+ if cert and not ctx.params.get("key"):
+ raise click.BadParameter(
+ 'When "--cert" is a file, "--key" is required.',
+ ctx=ctx,
+ param=param,
+ )
+
+ if cert and value:
+ ctx.params["cert"] = cert, value
+
+ return value
class SeparatedPathType(click.Path):
@@ -308,7 +463,31 @@ def run_command(info: ScriptInfo, host: str, port: int, reload: bool,
The reloader and debugger are enabled by default with the '--debug'
option.
"""
- pass
+ debug = get_debug_flag()
+
+ if reload is None:
+ reload = debug
+
+ if debugger is None:
+ debugger = debug
+
+ show_server_banner(debug, info.app_import_path)
+
+ app = DispatchingApp(info.load_app, use_eager_loading=True)
+
+ from werkzeug.serving import run_simple
+
+ run_simple(
+ host,
+ port,
+ app,
+ use_reloader=reload,
+ use_debugger=debugger,
+ threaded=with_threads,
+ ssl_context=cert,
+ extra_files=extra_files,
+ exclude_patterns=exclude_patterns,
+ )
run_command.params.insert(0, _debug_option)
@@ -324,7 +503,23 @@ def shell_command() ->None:
This is useful for executing small snippets of management code
without having to manually configure the application.
"""
- pass
+ import code
+ from flask.globals import _app_ctx_stack
+
+ app = _app_ctx_stack.top.app
+ banner = f"Python {sys.version} on {sys.platform}\nApp: {app.import_name}\nInstance: {app.instance_path}"
+
+ ctx = {}
+
+ # Support the regular Python interpreter startup script
+ startup = os.environ.get("PYTHONSTARTUP")
+ if startup and os.path.isfile(startup):
+ with open(startup, "r") as f:
+ eval(compile(f.read(), startup, "exec"), ctx)
+
+ ctx.update(app.make_shell_context())
+
+ code.interact(banner=banner, local=ctx)
@click.command('routes', short_help='Show the routes for the app.')
@@ -337,7 +532,36 @@ def shell_command() ->None:
@with_appcontext
def routes_command(sort: str, all_methods: bool) ->None:
"""Show all registered routes with endpoints and methods."""
- pass
+ from flask import current_app
+
+ rules = list(current_app.url_map.iter_rules())
+ if not rules:
+ click.echo("No routes were registered.")
+ return
+
+ ignored_methods = set(() if all_methods else ("HEAD", "OPTIONS"))
+
+ if sort in ("endpoint", "rule"):
+ rules = sorted(rules, key=attrgetter(sort))
+ elif sort == "methods":
+ rules = sorted(rules, key=lambda rule: sorted(rule.methods))
+
+ rule_methods = [", ".join(sorted(rule.methods - ignored_methods)) for rule in rules]
+
+ headers = ("Endpoint", "Methods", "Rule")
+ widths = (
+ max(len(rule.endpoint) for rule in rules),
+ max(len(methods) for methods in rule_methods),
+ max(len(rule.rule) for rule in rules),
+ )
+ widths = [max(len(h), w) for h, w in zip(headers, widths)]
+ row = "{{0:<{0}}} {{1:<{1}}} {{2:<{2}}}".format(*widths)
+
+ click.echo(row.format(*headers).strip())
+ click.echo(row.format(*("-" * width for width in widths)))
+
+ for rule, methods in zip(rules, rule_methods):
+ click.echo(row.format(rule.endpoint, methods, rule.rule).rstrip())
cli = FlaskGroup(name='flask', help=
diff --git a/src/flask/config.py b/src/flask/config.py
index 917b25e6..08b7f428 100644
--- a/src/flask/config.py
+++ b/src/flask/config.py
@@ -101,7 +101,17 @@ class Config(dict):
files.
:return: ``True`` if the file was loaded successfully.
"""
- pass
+ rv = os.environ.get(variable_name)
+ if not rv:
+ if silent:
+ return False
+ raise RuntimeError(
+ f"The environment variable {variable_name!r} is not set "
+ "and as such configuration could not be loaded. Set "
+ "this variable and make it point to a configuration "
+ "file"
+ )
+ return self.from_pyfile(rv, silent=silent)
def from_prefixed_env(self, prefix: str='FLASK', *, loads: t.Callable[[
str], t.Any]=json.loads) ->bool:
@@ -128,7 +138,32 @@ class Config(dict):
.. versionadded:: 2.1
"""
- pass
+ prefix = f"{prefix}_"
+ env_vars = {
+ key[len(prefix):]: value
+ for key, value in os.environ.items()
+ if key.startswith(prefix)
+ }
+
+ for key in sorted(env_vars):
+ value = env_vars[key]
+ try:
+ value = loads(value)
+ except Exception:
+ pass
+
+ if "__" in key:
+ parts = key.split("__")
+ current = self
+ for part in parts[:-1]:
+ if part not in current:
+ current[part] = {}
+ current = current[part]
+ current[parts[-1]] = value
+ else:
+ self[key] = value
+
+ return True
def from_pyfile(self, filename: (str | os.PathLike[str]), silent: bool=
False) ->bool:
@@ -146,7 +181,19 @@ class Config(dict):
.. versionadded:: 0.7
`silent` parameter.
"""
- pass
+ filename = os.path.join(self.root_path, filename)
+ d = types.ModuleType('config')
+ d.__file__ = filename
+ try:
+ with open(filename, mode='rb') as config_file:
+ exec(compile(config_file.read(), filename, 'exec'), d.__dict__)
+ except IOError as e:
+ if silent and e.errno in (errno.ENOENT, errno.EISDIR, errno.ENOTDIR):
+ return False
+ e.strerror = f'Unable to load configuration file ({e.strerror})'
+ raise
+ self.from_object(d)
+ return True
def from_object(self, obj: (object | str)) ->None:
"""Updates the values from the given object. An object can be of one
@@ -180,7 +227,11 @@ class Config(dict):
:param obj: an import name or object
"""
- pass
+ if isinstance(obj, str):
+ obj = import_string(obj)
+ for key in dir(obj):
+ if key.isupper():
+ self[key] = getattr(obj, key)
def from_file(self, filename: (str | os.PathLike[str]), load: t.
Callable[[t.IO[t.Any]], t.Mapping[str, t.Any]], silent: bool=False,
@@ -212,7 +263,18 @@ class Config(dict):
.. versionadded:: 2.0
"""
- pass
+ filename = os.path.join(self.root_path, filename)
+
+ try:
+ with open(filename, 'r' if text else 'rb') as f:
+ obj = load(f)
+ except OSError as e:
+ if silent and e.errno in (errno.ENOENT, errno.EISDIR):
+ return False
+ e.strerror = f'Unable to load configuration file ({e.strerror})'
+ raise
+
+ return self.from_mapping(obj)
def from_mapping(self, mapping: (t.Mapping[str, t.Any] | None)=None, **
kwargs: t.Any) ->bool:
@@ -223,7 +285,14 @@ class Config(dict):
.. versionadded:: 0.11
"""
- pass
+ mappings: dict[str, t.Any] = {}
+ if mapping is not None:
+ mappings.update(mapping)
+ mappings.update(kwargs)
+ for key, value in mappings.items():
+ if key.isupper():
+ self[key] = value
+ return True
def get_namespace(self, namespace: str, lowercase: bool=True,
trim_namespace: bool=True) ->dict[str, t.Any]:
@@ -254,7 +323,18 @@ class Config(dict):
.. versionadded:: 0.11
"""
- pass
+ rv = {}
+ for k, v in self.items():
+ if not k.startswith(namespace):
+ continue
+ if trim_namespace:
+ key = k[len(namespace):]
+ else:
+ key = k
+ if lowercase:
+ key = key.lower()
+ rv[key] = v
+ return rv
def __repr__(self) ->str:
return f'<{type(self).__name__} {dict.__repr__(self)}>'
diff --git a/src/flask/ctx.py b/src/flask/ctx.py
index ce2683ae..63548900 100644
--- a/src/flask/ctx.py
+++ b/src/flask/ctx.py
@@ -62,7 +62,7 @@ class _AppCtxGlobals:
.. versionadded:: 0.10
"""
- pass
+ return self.__dict__.get(name, default)
def pop(self, name: str, default: t.Any=_sentinel) ->t.Any:
"""Get and remove an attribute by name. Like :meth:`dict.pop`.
@@ -73,7 +73,9 @@ class _AppCtxGlobals:
.. versionadded:: 0.11
"""
- pass
+ if default is _sentinel:
+ return self.__dict__.pop(name)
+ return self.__dict__.pop(name, default)
def setdefault(self, name: str, default: t.Any=None) ->t.Any:
"""Get the value of an attribute if it is present, otherwise
@@ -85,7 +87,7 @@ class _AppCtxGlobals:
.. versionadded:: 0.11
"""
- pass
+ return self.__dict__.setdefault(name, default)
def __contains__(self, item: str) ->bool:
return item in self.__dict__
@@ -122,7 +124,8 @@ def after_this_request(f: ft.AfterRequestCallable[t.Any]
.. versionadded:: 0.9
"""
- pass
+ _request_ctx_stack.top._after_request_functions.append(f)
+ return f
F = t.TypeVar('F', bound=t.Callable[..., t.Any])
@@ -152,7 +155,16 @@ def copy_current_request_context(f: F) ->F:
.. versionadded:: 0.10
"""
- pass
+ top = _request_ctx_stack.top
+ if top is None:
+ raise RuntimeError('This decorator can only be used at local scopes '
+ 'when a request context is on the stack. '
+ 'For instance within view functions.')
+ reqctx = top.copy()
+ def wrapper(*args, **kwargs):
+ with reqctx:
+ return f(*args, **kwargs)
+ return update_wrapper(wrapper, f)
def has_request_context() ->bool:
@@ -184,7 +196,7 @@ def has_request_context() ->bool:
.. versionadded:: 0.7
"""
- pass
+ return _request_ctx_stack.top is not None
def has_app_context() ->bool:
@@ -194,7 +206,7 @@ def has_app_context() ->bool:
.. versionadded:: 0.9
"""
- pass
+ return _app_ctx_stack.top is not None
class AppContext:
@@ -212,11 +224,19 @@ class AppContext:
def push(self) ->None:
"""Binds the app context to the current context."""
- pass
+ self._cv_tokens.append(_cv_app.set(self))
+ appcontext_pushed.send(self.app)
def pop(self, exc: (BaseException | None)=_sentinel) ->None:
"""Pops the app context."""
- pass
+ try:
+ if exc is _sentinel:
+ exc = sys.exc_info()[1]
+ self.app.do_teardown_appcontext(exc)
+ finally:
+ token = self._cv_tokens.pop()
+ _cv_app.reset(token)
+ appcontext_popped.send(self.app)
def __enter__(self) ->AppContext:
self.push()
@@ -281,13 +301,22 @@ class RequestContext:
The current session object is used instead of reloading the original
data. This prevents `flask.session` pointing to an out-of-date object.
"""
- pass
+ return RequestContext(
+ self.app,
+ self.request.environ,
+ self.request,
+ self.session
+ )
def match_request(self) ->None:
"""Can be overridden by a subclass to hook into the matching
of the request.
"""
- pass
+ try:
+ result = self.url_adapter.match(return_rule=True)
+ self.request.url_rule, self.request.view_args = result
+ except HTTPException as e:
+ self.request.routing_exception = e
def pop(self, exc: (BaseException | None)=_sentinel) ->None:
"""Pops the request context and unbinds it by doing that. This will
@@ -297,7 +326,17 @@ class RequestContext:
.. versionchanged:: 0.9
Added the `exc` argument.
"""
- pass
+ app_ctx = self._cv_tokens.pop()[1]
+
+ try:
+ if exc is _sentinel:
+ exc = sys.exc_info()[1]
+ self.app.do_teardown_request(exc)
+ finally:
+ token = self._cv_tokens.pop()
+ _cv_request.reset(token)
+ if app_ctx is not None:
+ app_ctx.pop(exc)
def __enter__(self) ->RequestContext:
self.push()
diff --git a/src/flask/debughelpers.py b/src/flask/debughelpers.py
index ac217f75..f3615010 100644
--- a/src/flask/debughelpers.py
+++ b/src/flask/debughelpers.py
@@ -71,11 +71,38 @@ def attach_enctype_error_multidict(request: Request) ->None:
:param request: The request to patch.
:meta private:
"""
- pass
+ def __getitem__(key):
+ try:
+ return request.files.storage[key]
+ except KeyError:
+ raise DebugFilesKeyError(request, key)
+
+ request.files.__getitem__ = __getitem__
def explain_template_loading_attempts(app: App, template: str, attempts:
list[tuple[BaseLoader, Scaffold, tuple[str, str | None, t.Callable[[],
bool] | None] | None]]) ->None:
"""This should help developers understand what failed"""
- pass
+ from .templating import DispatchingJinjaLoader
+
+ info = []
+ for idx, (loader, blueprint, triple) in enumerate(attempts):
+ if isinstance(loader, DispatchingJinjaLoader):
+ detail = f"Loader: {type(loader).__name__}"
+ else:
+ detail = f"Loader: {type(loader).__name__} (Blueprint: {blueprint.name})" if blueprint else f"Loader: {type(loader).__name__}"
+
+ if triple is None:
+ info.append(f"{idx + 1}: {detail} - loader did not return a triple")
+ else:
+ template_name, filename, uptodate = triple
+ if filename is None:
+ info.append(f"{idx + 1}: {detail} - template not found")
+ else:
+ info.append(f"{idx + 1}: {detail} - template found: {filename}")
+
+ if info:
+ app.logger.debug("Template loading attempts:\n%s", "\n".join(info))
+ else:
+ app.logger.debug("No template loading attempts were made.")
diff --git a/src/flask/helpers.py b/src/flask/helpers.py
index 2061dbf3..14d4654b 100644
--- a/src/flask/helpers.py
+++ b/src/flask/helpers.py
@@ -24,7 +24,7 @@ def get_debug_flag() ->bool:
"""Get whether debug mode should be enabled for the app, indicated by the
:envvar:`FLASK_DEBUG` environment variable. The default is ``False``.
"""
- pass
+ return os.environ.get('FLASK_DEBUG', '').lower() in ('1', 'true', 'yes', 'on')
def get_load_dotenv(default: bool=True) ->bool:
@@ -34,7 +34,8 @@ def get_load_dotenv(default: bool=True) ->bool:
:param default: What to return if the env var isn't set.
"""
- pass
+ skip_dotenv = os.environ.get("FLASK_SKIP_DOTENV", "").lower()
+ return not (skip_dotenv in ('1', 'true', 'yes', 'on')) if skip_dotenv else default
def stream_with_context(generator_or_function: (t.Iterator[t.AnyStr] | t.
@@ -72,7 +73,34 @@ def stream_with_context(generator_or_function: (t.Iterator[t.AnyStr] | t.
.. versionadded:: 0.9
"""
- pass
+ try:
+ gen = iter(generator_or_function)
+ except TypeError:
+ gen = generator_or_function()
+
+ def generator():
+ ctx = _cv_request.get(None)
+ if ctx is None:
+ raise RuntimeError('No request context found.')
+ with ctx:
+ # Dummy sentinel. Has to be inside the context block or we're
+ # unable to emit the deprecation warning if the user uses
+ # the deprecated functionality.
+ yield None
+
+ try:
+ yield from gen
+ finally:
+ if hasattr(gen, 'close'):
+ gen.close()
+
+ # The trick is to start the generator. Then the code execution runs until
+ # the first dummy None is yielded at which point the context was already
+ # pushed. This item is discarded. Then when the iteration continues the
+ # real generator is executed.
+ wrapped_g = generator()
+ next(wrapped_g)
+ return wrapped_g
def make_response(*args: t.Any) ->Response:
@@ -117,7 +145,11 @@ def make_response(*args: t.Any) ->Response:
.. versionadded:: 0.6
"""
- pass
+ if not args:
+ return current_app.response_class()
+ if len(args) == 1:
+ args = args[0]
+ return current_app.make_response(args)
def url_for(endpoint: str, *, _anchor: (str | None)=None, _method: (str |
@@ -158,7 +190,14 @@ def url_for(endpoint: str, *, _anchor: (str | None)=None, _method: (str |
.. versionchanged:: 0.9
Calls ``app.handle_url_build_error`` on build errors.
"""
- pass
+ return current_app.url_for(
+ endpoint,
+ _anchor=_anchor,
+ _method=_method,
+ _scheme=_scheme,
+ _external=_external,
+ **values
+ )
def redirect(location: str, code: int=302, Response: (type[BaseResponse] |
diff --git a/src/flask/json/provider.py b/src/flask/json/provider.py
index e9123cb0..c39cbf9f 100644
--- a/src/flask/json/provider.py
+++ b/src/flask/json/provider.py
@@ -40,7 +40,7 @@ class JSONProvider:
:param obj: The data to serialize.
:param kwargs: May be passed to the underlying JSON library.
"""
- pass
+ return json.dumps(obj, **kwargs)
def dump(self, obj: t.Any, fp: t.IO[str], **kwargs: t.Any) ->None:
"""Serialize data as JSON and write to a file.
@@ -50,7 +50,7 @@ class JSONProvider:
encoding to be valid JSON.
:param kwargs: May be passed to the underlying JSON library.
"""
- pass
+ json.dump(obj, fp, **kwargs)
def loads(self, s: (str | bytes), **kwargs: t.Any) ->t.Any:
"""Deserialize data as JSON.
@@ -58,7 +58,9 @@ class JSONProvider:
:param s: Text or UTF-8 bytes.
:param kwargs: May be passed to the underlying JSON library.
"""
- pass
+ if isinstance(s, bytes):
+ s = s.decode('utf-8')
+ return json.loads(s, **kwargs)
def load(self, fp: t.IO[t.AnyStr], **kwargs: t.Any) ->t.Any:
"""Deserialize data as JSON read from a file.
@@ -66,7 +68,7 @@ class JSONProvider:
:param fp: A file opened for reading text or UTF-8 bytes.
:param kwargs: May be passed to the underlying JSON library.
"""
- pass
+ return json.load(fp, **kwargs)
def response(self, *args: t.Any, **kwargs: t.Any) ->Response:
"""Serialize the given arguments as JSON, and return a
@@ -83,7 +85,28 @@ class JSONProvider:
treat as a list to serialize.
:param kwargs: Treat as a dict to serialize.
"""
- pass
+ from flask import current_app, Response
+
+ if args and kwargs:
+ raise TypeError("Cannot pass both args and kwargs.")
+
+ if not args and not kwargs:
+ data = None
+ elif len(args) == 1:
+ data = args[0]
+ elif len(args) > 1:
+ data = list(args)
+ else:
+ data = kwargs
+
+ indent = None
+ separators = (',', ':')
+ if current_app.config['JSONIFY_PRETTYPRINT_REGULAR'] or current_app.debug:
+ indent = 2
+ separators = (', ', ': ')
+
+ dumped = self.dumps(data, indent=indent, separators=separators)
+ return Response(dumped, mimetype=self.mimetype)
class DefaultJSONProvider(JSONProvider):
@@ -133,7 +156,10 @@ class DefaultJSONProvider(JSONProvider):
:param obj: The data to serialize.
:param kwargs: Passed to :func:`json.dumps`.
"""
- pass
+ kwargs.setdefault('default', self.default)
+ kwargs.setdefault('ensure_ascii', self.ensure_ascii)
+ kwargs.setdefault('sort_keys', self.sort_keys)
+ return json.dumps(obj, **kwargs)
def loads(self, s: (str | bytes), **kwargs: t.Any) ->t.Any:
"""Deserialize data as JSON from a string or bytes.
@@ -141,7 +167,9 @@ class DefaultJSONProvider(JSONProvider):
:param s: Text or UTF-8 bytes.
:param kwargs: Passed to :func:`json.loads`.
"""
- pass
+ if isinstance(s, bytes):
+ s = s.decode('utf-8')
+ return json.loads(s, **kwargs)
def response(self, *args: t.Any, **kwargs: t.Any) ->Response:
"""Serialize the given arguments as JSON, and return a
@@ -159,4 +187,25 @@ class DefaultJSONProvider(JSONProvider):
treat as a list to serialize.
:param kwargs: Treat as a dict to serialize.
"""
- pass
+ from flask import current_app, Response
+
+ if args and kwargs:
+ raise TypeError("Cannot pass both args and kwargs.")
+
+ if not args and not kwargs:
+ data = None
+ elif len(args) == 1:
+ data = args[0]
+ elif len(args) > 1:
+ data = list(args)
+ else:
+ data = kwargs
+
+ indent = None
+ separators = (',', ':')
+ if self.compact is False or (self.compact is None and current_app.debug):
+ indent = 2
+ separators = (', ', ': ')
+
+ dumped = self.dumps(data, indent=indent, separators=separators)
+ return Response(dumped, mimetype=self.mimetype)
diff --git a/src/flask/json/tag.py b/src/flask/json/tag.py
index ded094b2..d2929ee3 100644
--- a/src/flask/json/tag.py
+++ b/src/flask/json/tag.py
@@ -64,22 +64,22 @@ class JSONTag:
def check(self, value: t.Any) ->bool:
"""Check if the given value should be tagged by this tag."""
- pass
+ raise NotImplementedError
def to_json(self, value: t.Any) ->t.Any:
"""Convert the Python object to an object that is a valid JSON type.
The tag will be added later."""
- pass
+ raise NotImplementedError
def to_python(self, value: t.Any) ->t.Any:
"""Convert the JSON representation back to the correct type. The tag
will already be removed."""
- pass
+ raise NotImplementedError
def tag(self, value: t.Any) ->dict[str, t.Any]:
"""Convert the value to a valid JSON type and add the tag structure
around it."""
- pass
+ return {self.key: self.to_json(value)}
class TagDict(JSONTag):
@@ -169,20 +169,38 @@ class TaggedJSONSerializer:
:raise KeyError: if the tag key is already registered and ``force`` is
not true.
"""
- pass
+ tag = tag_class(self)
+ key = tag.key
+
+ if key in self.tags and not force:
+ raise KeyError(f"Tag '{key}' is already registered.")
+
+ self.tags[key] = tag
+
+ if index is None:
+ self.order.append(tag)
+ else:
+ self.order.insert(index, tag)
def tag(self, value: t.Any) ->t.Any:
"""Convert a value to a tagged representation if necessary."""
- pass
+ for tag in self.order:
+ if tag.check(value):
+ return tag.tag(value)
+ return value
def untag(self, value: dict[str, t.Any]) ->t.Any:
"""Convert a tagged representation back to the original type."""
- pass
+ if isinstance(value, dict) and len(value) == 1:
+ key = next(iter(value))
+ if key in self.tags:
+ return self.tags[key].to_python(value[key])
+ return value
def dumps(self, value: t.Any) ->str:
"""Tag the value and dump it to a compact JSON string."""
- pass
+ return dumps(self.tag(value), separators=(',', ':'))
def loads(self, value: str) ->t.Any:
"""Load data from a JSON string and deserialized any tagged objects."""
- pass
+ return self.untag(loads(value))
diff --git a/src/flask/logging.py b/src/flask/logging.py
index 6fe6f650..f8361fee 100644
--- a/src/flask/logging.py
+++ b/src/flask/logging.py
@@ -18,14 +18,24 @@ def wsgi_errors_stream() ->t.TextIO:
can't import this directly, you can refer to it as
``ext://flask.logging.wsgi_errors_stream``.
"""
- pass
+ return request.environ['wsgi.errors'] if request else sys.stderr
def has_level_handler(logger: logging.Logger) ->bool:
"""Check if there is a handler in the logging chain that will handle the
given logger's :meth:`effective level <~logging.Logger.getEffectiveLevel>`.
"""
- pass
+ level = logger.getEffectiveLevel()
+ current = logger
+
+ while current:
+ if any(handler.level <= level for handler in current.handlers):
+ return True
+ if not current.propagate:
+ break
+ current = current.parent
+
+ return False
default_handler = logging.StreamHandler(wsgi_errors_stream)
@@ -46,4 +56,12 @@ def create_logger(app: App) ->logging.Logger:
:class:`~logging.StreamHandler` for
:func:`~flask.logging.wsgi_errors_stream` with a basic format.
"""
- pass
+ logger = logging.getLogger(app.name)
+
+ if app.debug and logger.level == logging.NOTSET:
+ logger.setLevel(logging.DEBUG)
+
+ if not has_level_handler(logger):
+ logger.addHandler(default_handler)
+
+ return logger
diff --git a/src/flask/sansio/app.py b/src/flask/sansio/app.py
index 2424fb7f..4b4c7b51 100644
--- a/src/flask/sansio/app.py
+++ b/src/flask/sansio/app.py
@@ -221,7 +221,12 @@ class App(Scaffold):
.. versionadded:: 0.8
"""
- pass
+ if self.import_name == '__main__':
+ fn = getattr(sys.modules['__main__'], '__file__', None)
+ if fn is None:
+ return '__main__'
+ return os.path.splitext(os.path.basename(fn))[0]
+ return self.import_name
@cached_property
def logger(self) ->logging.Logger:
@@ -248,7 +253,7 @@ class App(Scaffold):
.. versionadded:: 0.3
"""
- pass
+ return create_logger(self)
@cached_property
def jinja_env(self) ->Environment:
@@ -258,7 +263,7 @@ class App(Scaffold):
accessed. Changing :attr:`jinja_options` after that will have no
effect.
"""
- pass
+ return self.create_jinja_environment()
def make_config(self, instance_relative: bool=False) ->Config:
"""Used to create the config attribute by the Flask constructor.
@@ -269,7 +274,10 @@ class App(Scaffold):
.. versionadded:: 0.8
"""
- pass
+ root_path = self.root_path
+ if instance_relative:
+ root_path = self.instance_path
+ return self.config_class(root_path, self.default_config)
def make_aborter(self) ->Aborter:
"""Create the object to assign to :attr:`aborter`. That object
@@ -281,7 +289,7 @@ class App(Scaffold):
.. versionadded:: 2.2
"""
- pass
+ return self.aborter_class(self.make_aborter_map())
def auto_find_instance_path(self) ->str:
"""Tries to locate the instance path if it was not provided to the
@@ -291,7 +299,10 @@ class App(Scaffold):
.. versionadded:: 0.8
"""
- pass
+ prefix, package_path = find_package(self.import_name)
+ if prefix is None:
+ return os.path.join(package_path, 'instance')
+ return os.path.join(prefix, 'var', self.name + '-instance')
def create_global_jinja_loader(self) ->DispatchingJinjaLoader:
"""Creates the loader for the Jinja2 environment. Can be used to
@@ -304,7 +315,7 @@ class App(Scaffold):
.. versionadded:: 0.7
"""
- pass
+ return DispatchingJinjaLoader(self)
def select_jinja_autoescape(self, filename: str) ->bool:
"""Returns ``True`` if autoescaping should be active for the given
@@ -315,7 +326,9 @@ class App(Scaffold):
.. versionadded:: 0.5
"""
- pass
+ if filename is None:
+ return True
+ return filename.endswith(('.html', '.htm', '.xml', '.xhtml', '.svg'))
@property
def debug(self) ->bool:
@@ -328,7 +341,7 @@ class App(Scaffold):
Default: ``False``
"""
- pass
+ return self.config['DEBUG']
@setupmethod
def register_blueprint(self, blueprint: Blueprint, **options: t.Any
@@ -509,7 +522,20 @@ class App(Scaffold):
blueprint handler for an exception class, app handler for an exception
class, or ``None`` if a suitable handler is not found.
"""
- pass
+ exc_class, code = self._get_exc_class_and_code(e)
+
+ for src in chain(blueprints, [None]):
+ handler = self.error_handler_spec.get(src, {}).get(code)
+ if handler is not None:
+ return handler
+
+ for src in chain(blueprints, [None]):
+ for cls in exc_class.__mro__:
+ handler = self.error_handler_spec.get(src, {}).get(cls)
+ if handler is not None:
+ return handler
+
+ return None
def trap_http_exception(self, e: Exception) ->bool:
"""Checks if an HTTP exception should be trapped or not. By default
@@ -528,7 +554,17 @@ class App(Scaffold):
.. versionadded:: 0.8
"""
- pass
+ if self.config['TRAP_HTTP_EXCEPTIONS']:
+ return True
+
+ trap_bad_request = self.config['TRAP_BAD_REQUEST_ERRORS']
+
+ if trap_bad_request and isinstance(e, BadRequestKeyError):
+ if self.debug:
+ return False
+ return True
+
+ return False
def should_ignore_error(self, error: (BaseException | None)) ->bool:
"""This is called to figure out if an error should be ignored
@@ -538,7 +574,7 @@ class App(Scaffold):
.. versionadded:: 0.10
"""
- pass
+ return False
def redirect(self, location: str, code: int=302) ->BaseResponse:
"""Create a redirect response object.
@@ -552,7 +588,7 @@ class App(Scaffold):
.. versionadded:: 2.2
Moved from ``flask.redirect``, which calls this method.
"""
- pass
+ return _wz_redirect(location, code=code, Response=self.response_class)
def inject_url_defaults(self, endpoint: str, values: dict[str, t.Any]
) ->None:
@@ -562,7 +598,12 @@ class App(Scaffold):
.. versionadded:: 0.7
"""
- pass
+ funcs = self.url_default_functions.get(None, ())
+ if '.' in endpoint:
+ bp = endpoint.rsplit('.', 1)[0]
+ funcs = chain(funcs, self.url_default_functions.get(bp, ()))
+ for func in funcs:
+ func(endpoint, values)
def handle_url_build_error(self, error: BuildError, endpoint: str,
values: dict[str, t.Any]) ->str:
@@ -580,4 +621,18 @@ class App(Scaffold):
:param endpoint: The endpoint being built.
:param values: The keyword arguments passed to ``url_for``.
"""
- pass
+ for handler in self.url_build_error_handlers:
+ try:
+ rv = handler(error, endpoint, values)
+ except BuildError as e:
+ # make error available outside except block
+ error = e
+ else:
+ if rv is not None:
+ return rv
+
+ # Re-raise if called with an active exception, otherwise raise
+ # the passed in exception.
+ if error is sys.exc_info()[1]:
+ raise
+ raise error
diff --git a/src/flask/sansio/blueprints.py b/src/flask/sansio/blueprints.py
index bd3b9de9..d4f74dcd 100644
--- a/src/flask/sansio/blueprints.py
+++ b/src/flask/sansio/blueprints.py
@@ -61,7 +61,19 @@ class BlueprintSetupState:
to the application. The endpoint is automatically prefixed with the
blueprint's name.
"""
- pass
+ if self.url_prefix is not None:
+ if rule:
+ rule = f"{self.url_prefix}{rule}"
+ else:
+ rule = self.url_prefix
+ options.setdefault("subdomain", self.subdomain)
+ if endpoint is None:
+ endpoint = _endpoint_from_view_func(view_func)
+ defaults = self.url_defaults
+ if "defaults" in options:
+ defaults = dict(defaults, **options.pop("defaults"))
+ self.app.add_url_rule(rule, f"{self.name}.{endpoint}",
+ view_func, defaults=defaults, **options)
class Blueprint(Scaffold):
@@ -148,7 +160,7 @@ class Blueprint(Scaffold):
state as argument as returned by the :meth:`make_setup_state`
method.
"""
- pass
+ self.deferred_functions.append(func)
@setupmethod
def record_once(self, func: DeferredSetupFunction) ->None:
@@ -157,7 +169,10 @@ class Blueprint(Scaffold):
blueprint is registered a second time on the application, the
function passed is not called.
"""
- pass
+ def wrapper(state: BlueprintSetupState) -> None:
+ if state.first_registration:
+ func(state)
+ return self.record(update_wrapper(wrapper, func))
def make_setup_state(self, app: App, options: dict[str, t.Any],
first_registration: bool=False) ->BlueprintSetupState:
@@ -165,7 +180,7 @@ class Blueprint(Scaffold):
object that is later passed to the register callback functions.
Subclasses can override this to return a subclass of the setup state.
"""
- pass
+ return BlueprintSetupState(self, app, options, first_registration)
@setupmethod
def register_blueprint(self, blueprint: Blueprint, **options: t.Any
@@ -182,7 +197,7 @@ class Blueprint(Scaffold):
.. versionadded:: 2.0
"""
- pass
+ self._blueprints.append((blueprint, options))
def register(self, app: App, options: dict[str, t.Any]) ->None:
"""Called by :meth:`Flask.register_blueprint` to register all
@@ -213,7 +228,29 @@ class Blueprint(Scaffold):
blueprint to be registered multiple times with unique names
for ``url_for``.
"""
- pass
+ first_registration = not self._got_registered_once
+ if first_registration:
+ self._got_registered_once = True
+
+ if options.get("name") and options["name"] in app.blueprints:
+ raise ValueError(f"Blueprint name '{options['name']}' is already registered.")
+
+ self._validate_blueprint_name(app)
+
+ state = self.make_setup_state(app, options, first_registration)
+
+ if self.has_static_folder:
+ state.add_url_rule(
+ f"{self.static_url_path}/<path:filename>",
+ view_func=self.send_static_file,
+ endpoint="static",
+ )
+
+ for deferred in self.deferred_functions:
+ deferred(state)
+
+ for blueprint, blueprint_options in self._blueprints:
+ blueprint.register(app, dict(blueprint_options, **options))
@setupmethod
def add_url_rule(self, rule: str, endpoint: (str | None)=None,
@@ -226,7 +263,8 @@ class Blueprint(Scaffold):
The URL rule is prefixed with the blueprint's URL prefix. The endpoint name,
used with :func:`url_for`, is prefixed with the blueprint's name.
"""
- pass
+ self.record(lambda s: s.add_url_rule(rule, endpoint, view_func,
+ provide_automatic_options, **options))
@setupmethod
def app_template_filter(self, name: (str | None)=None) ->t.Callable[[
@@ -237,7 +275,10 @@ class Blueprint(Scaffold):
:param name: the optional name of the filter, otherwise the
function name will be used.
"""
- pass
+ def decorator(f: T_template_filter) -> T_template_filter:
+ self.add_app_template_filter(f, name=name)
+ return f
+ return decorator
@setupmethod
def add_app_template_filter(self, f: ft.TemplateFilterCallable, name: (
@@ -249,7 +290,7 @@ class Blueprint(Scaffold):
:param name: the optional name of the filter, otherwise the
function name will be used.
"""
- pass
+ self.record(lambda s: s.app.add_template_filter(f, name))
@setupmethod
def app_template_test(self, name: (str | None)=None) ->t.Callable[[
@@ -262,7 +303,10 @@ class Blueprint(Scaffold):
:param name: the optional name of the test, otherwise the
function name will be used.
"""
- pass
+ def decorator(f: T_template_test) -> T_template_test:
+ self.add_app_template_test(f, name=name)
+ return f
+ return decorator
@setupmethod
def add_app_template_test(self, f: ft.TemplateTestCallable, name: (str |
@@ -276,7 +320,7 @@ class Blueprint(Scaffold):
:param name: the optional name of the test, otherwise the
function name will be used.
"""
- pass
+ self.record(lambda s: s.app.add_template_test(f, name))
@setupmethod
def app_template_global(self, name: (str | None)=None) ->t.Callable[[
@@ -289,7 +333,10 @@ class Blueprint(Scaffold):
:param name: the optional name of the global, otherwise the
function name will be used.
"""
- pass
+ def decorator(f: T_template_global) -> T_template_global:
+ self.add_app_template_global(f, name=name)
+ return f
+ return decorator
@setupmethod
def add_app_template_global(self, f: ft.TemplateGlobalCallable, name: (
@@ -303,28 +350,31 @@ class Blueprint(Scaffold):
:param name: the optional name of the global, otherwise the
function name will be used.
"""
- pass
+ self.record(lambda s: s.app.add_template_global(f, name))
@setupmethod
def before_app_request(self, f: T_before_request) ->T_before_request:
"""Like :meth:`before_request`, but before every request, not only those handled
by the blueprint. Equivalent to :meth:`.Flask.before_request`.
"""
- pass
+ self.record_once(lambda s: s.app.before_request(f))
+ return f
@setupmethod
def after_app_request(self, f: T_after_request) ->T_after_request:
"""Like :meth:`after_request`, but after every request, not only those handled
by the blueprint. Equivalent to :meth:`.Flask.after_request`.
"""
- pass
+ self.record_once(lambda s: s.app.after_request(f))
+ return f
@setupmethod
def teardown_app_request(self, f: T_teardown) ->T_teardown:
"""Like :meth:`teardown_request`, but after every request, not only those
handled by the blueprint. Equivalent to :meth:`.Flask.teardown_request`.
"""
- pass
+ self.record_once(lambda s: s.app.teardown_request(f))
+ return f
@setupmethod
def app_context_processor(self, f: T_template_context_processor
@@ -332,7 +382,8 @@ class Blueprint(Scaffold):
"""Like :meth:`context_processor`, but for templates rendered by every view, not
only by the blueprint. Equivalent to :meth:`.Flask.context_processor`.
"""
- pass
+ self.record_once(lambda s: s.app.context_processor(f))
+ return f
@setupmethod
def app_errorhandler(self, code: (type[Exception] | int)) ->t.Callable[
@@ -340,7 +391,10 @@ class Blueprint(Scaffold):
"""Like :meth:`errorhandler`, but for every request, not only those handled by
the blueprint. Equivalent to :meth:`.Flask.errorhandler`.
"""
- pass
+ def decorator(f: T_error_handler) -> T_error_handler:
+ self.record_once(lambda s: s.app.errorhandler(code)(f))
+ return f
+ return decorator
@setupmethod
def app_url_value_preprocessor(self, f: T_url_value_preprocessor
@@ -348,11 +402,13 @@ class Blueprint(Scaffold):
"""Like :meth:`url_value_preprocessor`, but for every request, not only those
handled by the blueprint. Equivalent to :meth:`.Flask.url_value_preprocessor`.
"""
- pass
+ self.record_once(lambda s: s.app.url_value_preprocessor(f))
+ return f
@setupmethod
def app_url_defaults(self, f: T_url_defaults) ->T_url_defaults:
"""Like :meth:`url_defaults`, but for every request, not only those handled by
the blueprint. Equivalent to :meth:`.Flask.url_defaults`.
"""
- pass
+ self.record_once(lambda s: s.app.url_defaults(f))
+ return f
diff --git a/src/flask/sansio/scaffold.py b/src/flask/sansio/scaffold.py
index e35f461d..1f1b673d 100644
--- a/src/flask/sansio/scaffold.py
+++ b/src/flask/sansio/scaffold.py
@@ -91,7 +91,9 @@ class Scaffold:
"""The absolute path to the configured static folder. ``None``
if no static folder is set.
"""
- pass
+ if self._static_folder is not None:
+ return os.path.join(self.root_path, self._static_folder)
+ return None
@property
def has_static_folder(self) ->bool:
@@ -99,7 +101,7 @@ class Scaffold:
.. versionadded:: 0.5
"""
- pass
+ return self._static_folder is not None
@property
def static_url_path(self) ->(str | None):
@@ -108,7 +110,11 @@ class Scaffold:
If it was not configured during init, it is derived from
:attr:`static_folder`.
"""
- pass
+ if self._static_url_path is not None:
+ return self._static_url_path
+ if self.static_folder is not None:
+ return '/' + os.path.basename(self.static_folder)
+ return None
@cached_property
def jinja_loader(self) ->(BaseLoader | None):
@@ -118,7 +124,9 @@ class Scaffold:
.. versionadded:: 0.5
"""
- pass
+ if self.template_folder is not None:
+ return FileSystemLoader(os.path.join(self.root_path, self.template_folder))
+ return None
@setupmethod
def get(self, rule: str, **options: t.Any) ->t.Callable[[T_route], T_route
@@ -127,7 +135,7 @@ class Scaffold:
.. versionadded:: 2.0
"""
- pass
+ return self.route(rule, methods=["GET"], **options)
@setupmethod
def post(self, rule: str, **options: t.Any) ->t.Callable[[T_route], T_route
@@ -136,7 +144,7 @@ class Scaffold:
.. versionadded:: 2.0
"""
- pass
+ return self.route(rule, methods=["POST"], **options)
@setupmethod
def put(self, rule: str, **options: t.Any) ->t.Callable[[T_route], T_route
@@ -145,7 +153,7 @@ class Scaffold:
.. versionadded:: 2.0
"""
- pass
+ return self.route(rule, methods=["PUT"], **options)
@setupmethod
def delete(self, rule: str, **options: t.Any) ->t.Callable[[T_route],
@@ -154,7 +162,7 @@ class Scaffold:
.. versionadded:: 2.0
"""
- pass
+ return self.route(rule, methods=["DELETE"], **options)
@setupmethod
def patch(self, rule: str, **options: t.Any) ->t.Callable[[T_route],
@@ -163,7 +171,7 @@ class Scaffold:
.. versionadded:: 2.0
"""
- pass
+ return self.route(rule, methods=["PATCH"], **options)
@setupmethod
def route(self, rule: str, **options: t.Any) ->t.Callable[[T_route],
@@ -190,7 +198,11 @@ class Scaffold:
:param options: Extra options passed to the
:class:`~werkzeug.routing.Rule` object.
"""
- pass
+ def decorator(f: T_route) -> T_route:
+ endpoint = options.pop("endpoint", None)
+ self.add_url_rule(rule, endpoint, f, **options)
+ return f
+ return decorator
@setupmethod
def add_url_rule(self, rule: str, endpoint: (str | None)=None,
@@ -254,7 +266,38 @@ class Scaffold:
:param options: Extra options passed to the
:class:`~werkzeug.routing.Rule` object.
"""
- pass
+ if endpoint is None and view_func is not None:
+ endpoint = view_func.__name__
+
+ methods = options.pop("methods", None)
+ if methods is None:
+ methods = getattr(view_func, "methods", None) or ("GET",)
+ if isinstance(methods, str):
+ methods = [methods]
+
+ # Add "HEAD" if "GET" is present
+ if "GET" in methods:
+ methods = set(methods)
+ methods.add("HEAD")
+
+ # Add automatic OPTIONS method handling
+ if provide_automatic_options is None:
+ provide_automatic_options = getattr(view_func, "provide_automatic_options", None)
+ if provide_automatic_options is None:
+ provide_automatic_options = True
+ if provide_automatic_options:
+ methods = set(methods)
+ methods.add("OPTIONS")
+
+ options["methods"] = methods
+
+ rule = self.url_rule_class(rule, endpoint=endpoint, **options)
+ self.url_map.add(rule)
+ if view_func is not None:
+ old_func = self.view_functions.get(endpoint)
+ if old_func is not None and old_func != view_func:
+ raise AssertionError(f"View function mapping is overwriting an existing endpoint function: {endpoint}")
+ self.view_functions[endpoint] = view_func
@setupmethod
def endpoint(self, endpoint: str) ->t.Callable[[F], F]:
@@ -273,7 +316,10 @@ class Scaffold:
:param endpoint: The endpoint name to associate with the view
function.
"""
- pass
+ def decorator(f: F) -> F:
+ self.view_functions[endpoint] = f
+ return f
+ return decorator
@setupmethod
def before_request(self, f: T_before_request) ->T_before_request:
@@ -462,19 +508,35 @@ class Scaffold:
:param exc_class_or_code: Any exception class, or an HTTP status
code as an integer.
"""
- pass
+ if isinstance(exc_class_or_code, int):
+ exc_class = default_exceptions.get(exc_class_or_code)
+ if exc_class is None:
+ raise ValueError(f"'{exc_class_or_code}' is not a valid HTTP error code.")
+ return exc_class, exc_class_or_code
+
+ if isinstance(exc_class_or_code, type) and issubclass(exc_class_or_code, Exception):
+ if issubclass(exc_class_or_code, HTTPException):
+ return exc_class_or_code, exc_class_or_code.code
+ return exc_class_or_code, None
+
+ raise ValueError(f"'{exc_class_or_code}' is neither an exception class nor an HTTP status code.")
def _endpoint_from_view_func(view_func: ft.RouteCallable) ->str:
"""Internal helper that returns the default endpoint for a given
function. This always is the function name.
"""
- pass
+ return view_func.__name__
def _find_package_path(import_name: str) ->str:
"""Find the path that contains the package or module."""
- pass
+ module = importlib.util.find_spec(import_name)
+ if module is None:
+ raise ValueError(f"Could not find package or module '{import_name}'")
+ if module.submodule_search_locations:
+ return os.path.dirname(os.path.dirname(module.origin))
+ return os.path.dirname(module.origin)
def find_package(import_name: str) ->tuple[str | None, str]:
@@ -490,4 +552,25 @@ def find_package(import_name: str) ->tuple[str | None, str]:
for import. If the package is not installed, it's assumed that the
package was imported from the current working directory.
"""
- pass
+ root_mod_name = import_name.split('.')[0]
+ module = importlib.util.find_spec(root_mod_name)
+ if module is None:
+ return None, os.getcwd()
+
+ package_path = _find_package_path(root_mod_name)
+
+ # Check if it's installed in site-packages
+ site_parent = os.path.abspath(os.path.join(package_path, '..'))
+ site_folder = os.path.basename(site_parent)
+ if site_folder.lower() == 'site-packages':
+ parent = os.path.dirname(site_parent)
+ if os.path.basename(parent) == 'Lib':
+ prefix = os.path.dirname(parent)
+ else:
+ prefix = parent
+ elif package_path.lower().endswith('lib' + os.sep + root_mod_name):
+ prefix = package_path[:package_path.lower().rindex('lib' + os.sep + root_mod_name)]
+ else:
+ prefix = None
+
+ return prefix, package_path
diff --git a/src/flask/sessions.py b/src/flask/sessions.py
index 5ed091cb..42950cd8 100644
--- a/src/flask/sessions.py
+++ b/src/flask/sessions.py
@@ -21,7 +21,7 @@ class SessionMixin(MutableMapping):
@property
def permanent(self) ->bool:
"""This reflects the ``'_permanent'`` key in the dict."""
- pass
+ return self.get('_permanent', False)
new = False
modified = True
accessed = True
@@ -109,7 +109,7 @@ class SessionInterface:
This creates an instance of :attr:`null_session_class` by default.
"""
- pass
+ return self.null_session_class()
def is_null_session(self, obj: object) ->bool:
"""Checks if a given object is a null session. Null sessions are
@@ -118,11 +118,11 @@ class SessionInterface:
This checks if the object is an instance of :attr:`null_session_class`
by default.
"""
- pass
+ return isinstance(obj, self.null_session_class)
def get_cookie_name(self, app: Flask) ->str:
"""The name of the session cookie. Uses``app.config["SESSION_COOKIE_NAME"]``."""
- pass
+ return app.config["SESSION_COOKIE_NAME"]
def get_cookie_domain(self, app: Flask) ->(str | None):
"""The value of the ``Domain`` parameter on the session cookie. If not set,
@@ -134,7 +134,7 @@ class SessionInterface:
.. versionchanged:: 2.3
Not set by default, does not fall back to ``SERVER_NAME``.
"""
- pass
+ return app.config.get("SESSION_COOKIE_DOMAIN")
def get_cookie_path(self, app: Flask) ->str:
"""Returns the path for which the cookie should be valid. The
@@ -142,27 +142,27 @@ class SessionInterface:
config var if it's set, and falls back to ``APPLICATION_ROOT`` or
uses ``/`` if it's ``None``.
"""
- pass
+ return app.config.get("SESSION_COOKIE_PATH") or app.config.get("APPLICATION_ROOT") or "/"
def get_cookie_httponly(self, app: Flask) ->bool:
"""Returns True if the session cookie should be httponly. This
currently just returns the value of the ``SESSION_COOKIE_HTTPONLY``
config var.
"""
- pass
+ return app.config.get("SESSION_COOKIE_HTTPONLY", True)
def get_cookie_secure(self, app: Flask) ->bool:
"""Returns True if the cookie should be secure. This currently
just returns the value of the ``SESSION_COOKIE_SECURE`` setting.
"""
- pass
+ return app.config.get("SESSION_COOKIE_SECURE", False)
def get_cookie_samesite(self, app: Flask) ->(str | None):
"""Return ``'Strict'`` or ``'Lax'`` if the cookie should use the
``SameSite`` attribute. This currently just returns the value of
the :data:`SESSION_COOKIE_SAMESITE` setting.
"""
- pass
+ return app.config.get("SESSION_COOKIE_SAMESITE")
def get_expiration_time(self, app: Flask, session: SessionMixin) ->(
datetime | None):
@@ -171,7 +171,9 @@ class SessionInterface:
default implementation returns now + the permanent session
lifetime configured on the application.
"""
- pass
+ if session.permanent:
+ return datetime.now(timezone.utc) + app.permanent_session_lifetime
+ return None
def should_set_cookie(self, app: Flask, session: SessionMixin) ->bool:
"""Used by session backends to determine if a ``Set-Cookie`` header
@@ -184,7 +186,9 @@ class SessionInterface:
.. versionadded:: 0.11
"""
- pass
+ return session.modified or (
+ session.permanent and app.config.get('SESSION_REFRESH_EACH_REQUEST', False)
+ )
def open_session(self, app: Flask, request: Request) ->(SessionMixin | None
):
@@ -199,7 +203,7 @@ class SessionInterface:
context will fall back to using :meth:`make_null_session`
in this case.
"""
- pass
+ return None
def save_session(self, app: Flask, session: SessionMixin, response:
Response) ->None:
@@ -207,7 +211,8 @@ class SessionInterface:
a response, before removing the request context. It is skipped
if :meth:`is_null_session` returns ``True``.
"""
- pass
+ if self.is_null_session(session):
+ return
session_json_serializer = TaggedJSONSerializer()
@@ -218,7 +223,7 @@ def _lazy_sha1(string: bytes=b'') ->t.Any:
SHA-1, in which case the import and use as a default would fail before the
developer can configure something else.
"""
- pass
+ return hashlib.sha1(string)
class SecureCookieSessionInterface(SessionInterface):
diff --git a/src/flask/templating.py b/src/flask/templating.py
index 861e328e..a86be9e0 100644
--- a/src/flask/templating.py
+++ b/src/flask/templating.py
@@ -21,7 +21,12 @@ def _default_template_ctx_processor() ->dict[str, t.Any]:
"""Default template context processor. Injects `request`,
`session` and `g`.
"""
- pass
+ from .globals import g, session
+ return {
+ 'request': _cv_request.get(None),
+ 'session': session,
+ 'g': g,
+ }
class Environment(BaseEnvironment):
@@ -54,7 +59,17 @@ def render_template(template_name_or_list: (str | Template | list[str |
a list is given, the first name to exist will be rendered.
:param context: The variables to make available in the template.
"""
- pass
+ ctx = _cv_app.get(current_app)._get_current_object()
+ ctx.update(context)
+ before_render_template.send(current_app._get_current_object(),
+ template=template_name_or_list,
+ context=ctx)
+ template = current_app.jinja_env.get_or_select_template(template_name_or_list)
+ rv = template.render(ctx)
+ template_rendered.send(current_app._get_current_object(),
+ template=template,
+ context=ctx)
+ return rv
def render_template_string(source: str, **context: t.Any) ->str:
@@ -64,7 +79,17 @@ def render_template_string(source: str, **context: t.Any) ->str:
:param source: The source code of the template to render.
:param context: The variables to make available in the template.
"""
- pass
+ ctx = _cv_app.get(current_app)._get_current_object()
+ ctx.update(context)
+ before_render_template.send(current_app._get_current_object(),
+ template=source,
+ context=ctx)
+ template = current_app.jinja_env.from_string(source)
+ rv = template.render(ctx)
+ template_rendered.send(current_app._get_current_object(),
+ template=template,
+ context=ctx)
+ return rv
def stream_template(template_name_or_list: (str | Template | list[str |
@@ -79,7 +104,19 @@ def stream_template(template_name_or_list: (str | Template | list[str |
.. versionadded:: 2.2
"""
- pass
+ app = _cv_app.get(current_app)
+ ctx = app._get_current_object()
+ ctx.update(context)
+ before_render_template.send(app._get_current_object(),
+ template=template_name_or_list,
+ context=ctx)
+ template = app.jinja_env.get_or_select_template(template_name_or_list)
+ rv = template.stream(ctx)
+ rv = stream_with_context(rv)
+ template_rendered.send(app._get_current_object(),
+ template=template,
+ context=ctx)
+ return rv
def stream_template_string(source: str, **context: t.Any) ->t.Iterator[str]:
@@ -92,4 +129,16 @@ def stream_template_string(source: str, **context: t.Any) ->t.Iterator[str]:
.. versionadded:: 2.2
"""
- pass
+ app = _cv_app.get(current_app)
+ ctx = app._get_current_object()
+ ctx.update(context)
+ before_render_template.send(app._get_current_object(),
+ template=source,
+ context=ctx)
+ template = app.jinja_env.from_string(source)
+ rv = template.stream(ctx)
+ rv = stream_with_context(rv)
+ template_rendered.send(app._get_current_object(),
+ template=template,
+ context=ctx)
+ return rv
diff --git a/src/flask/testing.py b/src/flask/testing.py
index 7c533f33..daff8a9b 100644
--- a/src/flask/testing.py
+++ b/src/flask/testing.py
@@ -70,7 +70,7 @@ class EnvironBuilder(werkzeug.test.EnvironBuilder):
The serialization will be configured according to the config associated
with this EnvironBuilder's ``app``.
"""
- pass
+ return self.app.json.dumps(obj, **kwargs)
_werkzeug_version = ''
@@ -118,7 +118,31 @@ class FlaskClient(Client):
:meth:`~flask.Flask.test_request_context` which are directly
passed through.
"""
- pass
+ if self.cookie_jar is None:
+ raise RuntimeError("Session transactions only make sense with cookies enabled.")
+
+ app = self.application
+ environ_overrides = kwargs.setdefault("environ_overrides", {})
+ self.cookie_jar.inject_wsgi(environ_overrides)
+ outer_reqctx = app.request_context(environ_overrides)
+ outer_reqctx.push()
+
+ session_interface = app.session_interface
+ session = session_interface.open_session(app, outer_reqctx.request)
+
+ if session is None:
+ raise RuntimeError("Session backend did not open a session.")
+
+ try:
+ yield session
+ finally:
+ resp = app.response_class()
+ if not session_interface.is_null_session(session):
+ session_interface.save_session(app, session, resp)
+ headers = resp.get_wsgi_headers(outer_reqctx.request.environ)
+ self.cookie_jar.extract_wsgi(headers)
+
+ outer_reqctx.pop()
def __enter__(self) ->FlaskClient:
if self.preserve_context:
@@ -158,4 +182,10 @@ class FlaskCliRunner(CliRunner):
:return: a :class:`~click.testing.Result` object.
"""
- pass
+ if cli is None:
+ cli = self.app.cli
+
+ if "obj" not in kwargs:
+ kwargs["obj"] = ScriptInfo(create_app=lambda: self.app)
+
+ return super().invoke(cli, args, **kwargs)
diff --git a/src/flask/views.py b/src/flask/views.py
index 25272f32..a1e006e3 100644
--- a/src/flask/views.py
+++ b/src/flask/views.py
@@ -49,7 +49,9 @@ class View:
this and return a valid response. Any variables from the URL
rule are passed as keyword arguments.
"""
- pass
+ raise NotImplementedError(
+ f"{self.__class__.__name__} must implement dispatch_request method."
+ )
@classmethod
def as_view(cls, name: str, *class_args: t.Any, **class_kwargs: t.Any
@@ -69,7 +71,24 @@ class View:
.. versionchanged:: 2.2
Added the ``init_every_request`` class attribute.
"""
- pass
+ def view(*args: t.Any, **kwargs: t.Any) ->ft.ResponseReturnValue:
+ self = view.view_class(*class_args, **class_kwargs)
+ return current_app.ensure_sync(self.dispatch_request)(*args, **kwargs)
+
+ if cls.decorators:
+ view.__name__ = name
+ view.__module__ = cls.__module__
+ for decorator in cls.decorators:
+ view = decorator(view)
+
+ view.view_class = cls
+ view.__name__ = name
+ view.__doc__ = cls.__doc__
+ view.__module__ = cls.__module__
+ view.methods = cls.methods
+ view.provide_automatic_options = cls.provide_automatic_options
+
+ return view
class MethodView(View):
diff --git a/src/flask/wrappers.py b/src/flask/wrappers.py
index e086f271..61243c33 100644
--- a/src/flask/wrappers.py
+++ b/src/flask/wrappers.py
@@ -31,7 +31,7 @@ class Request(RequestBase):
@property
def max_content_length(self) ->(int | None):
"""Read-only view of the ``MAX_CONTENT_LENGTH`` config key."""
- pass
+ return current_app.config.get('MAX_CONTENT_LENGTH')
@property
def endpoint(self) ->(str | None):
@@ -43,7 +43,9 @@ class Request(RequestBase):
This in combination with :attr:`view_args` can be used to
reconstruct the same URL or a modified URL.
"""
- pass
+ if self.url_rule is not None:
+ return self.url_rule.endpoint
+ return None
@property
def blueprint(self) ->(str | None):
@@ -57,7 +59,9 @@ class Request(RequestBase):
created with. It may have been nested, or registered with a
different name.
"""
- pass
+ if self.endpoint is not None:
+ return _split_blueprint_path(self.endpoint)[0]
+ return None
@property
def blueprints(self) ->list[str]:
@@ -69,7 +73,9 @@ class Request(RequestBase):
.. versionadded:: 2.0.1
"""
- pass
+ if self.endpoint is not None:
+ return _split_blueprint_path(self.endpoint)[1]
+ return []
class Response(ResponseBase):
@@ -100,4 +106,4 @@ class Response(ResponseBase):
See :attr:`~werkzeug.wrappers.Response.max_cookie_size` in
Werkzeug's docs.
"""
- pass
+ return current_app.config.get('MAX_COOKIE_SIZE', 4093)