Traceback (most recent call last):
File "/testbed/.venv/bin/pytest", line 8, in <module>
sys.exit(console_main())
^^^^^^^^^^^^^^
File "/testbed/.venv/lib/python3.12/site-packages/_pytest/config/__init__.py", line 201, in console_main
code = main()
^^^^^^
File "/testbed/.venv/lib/python3.12/site-packages/_pytest/config/__init__.py", line 156, in main
config = _prepareconfig(args, plugins)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/testbed/.venv/lib/python3.12/site-packages/_pytest/config/__init__.py", line 341, in _prepareconfig
config = pluginmanager.hook.pytest_cmdline_parse(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/testbed/.venv/lib/python3.12/site-packages/pluggy/_hooks.py", line 513, in __call__
return self._hookexec(self.name, self._hookimpls.copy(), kwargs, firstresult)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/testbed/.venv/lib/python3.12/site-packages/pluggy/_manager.py", line 120, in _hookexec
return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/testbed/.venv/lib/python3.12/site-packages/pluggy/_callers.py", line 139, in _multicall
raise exception.with_traceback(exception.__traceback__)
File "/testbed/.venv/lib/python3.12/site-packages/pluggy/_callers.py", line 122, in _multicall
teardown.throw(exception) # type: ignore[union-attr]
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/testbed/.venv/lib/python3.12/site-packages/_pytest/helpconfig.py", line 105, in pytest_cmdline_parse
config = yield
^^^^^
File "/testbed/.venv/lib/python3.12/site-packages/pluggy/_callers.py", line 103, in _multicall
res = hook_impl.function(*args)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/testbed/.venv/lib/python3.12/site-packages/_pytest/config/__init__.py", line 1140, in pytest_cmdline_parse
self.parse(args)
File "/testbed/.venv/lib/python3.12/site-packages/_pytest/config/__init__.py", line 1494, in parse
self._preparse(args, addopts=addopts)
File "/testbed/.venv/lib/python3.12/site-packages/_pytest/config/__init__.py", line 1381, in _preparse
self.pluginmanager.load_setuptools_entrypoints("pytest11")
File "/testbed/.venv/lib/python3.12/site-packages/pluggy/_manager.py", line 421, in load_setuptools_entrypoints
plugin = ep.load()
^^^^^^^^^
File "/root/.local/share/uv/python/cpython-3.12.6-linux-x86_64-gnu/lib/python3.12/importlib/metadata/__init__.py", line 205, in load
module = import_module(match.group('module'))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/root/.local/share/uv/python/cpython-3.12.6-linux-x86_64-gnu/lib/python3.12/importlib/__init__.py", line 90, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
File "<frozen importlib._bootstrap>", line 1310, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
File "<frozen importlib._bootstrap>", line 1310, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
File "<frozen importlib._bootstrap>", line 1310, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
File "<frozen importlib._bootstrap>", line 1331, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 935, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 995, in exec_module
File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
File "/testbed/web3/__init__.py", line 4, in <module>
from web3.providers import (
File "/testbed/web3/providers/__init__.py", line 1, in <module>
from .async_base import (
File "/testbed/web3/providers/async_base.py", line 4, in <module>
from web3._utils.encoding import FriendlyJsonSerde, Web3JsonEncoder
File "/testbed/web3/_utils/encoding.py", line 9, in <module>
from web3._utils.abi import is_address_type, is_array_type, is_bool_type, is_bytes_type, is_int_type, is_string_type, is_uint_type, size_of_type, sub_type_of_array_type
File "/testbed/web3/_utils/abi.py", line 21, in <module>
from web3.utils import get_abi_input_names
File "/testbed/web3/utils/__init__.py", line 6, in <module>
from .abi import ( # NOQA
ImportError: cannot import name 'get_abi_input_names' from 'web3.utils.abi' (/testbed/web3/utils/abi.py)
diff --git a/tests/core/contracts/test_contract_call_interface.py b/tests/core/contracts/test_contract_call_interface.py
index f9173336..8452faf9 100644
--- a/tests/core/contracts/test_contract_call_interface.py
+++ b/tests/core/contracts/test_contract_call_interface.py
@@ -27,7 +27,7 @@ from web3._utils.contract_sources.contract_data.bytes_contracts import (
BYTES32_CONTRACT_DATA,
BYTES_CONTRACT_DATA,
)
-from web3._utils.ens import (
+from web3._utils.validation_utils import (
contract_ens_addresses,
)
from web3.exceptions import (
diff --git a/tests/core/contracts/test_contract_init.py b/tests/core/contracts/test_contract_init.py
index e297dd51..07062a0f 100644
--- a/tests/core/contracts/test_contract_init.py
+++ b/tests/core/contracts/test_contract_init.py
@@ -1,6 +1,6 @@
import pytest
-from web3._utils.ens import (
+from web3._utils.validation_utils import (
contract_ens_addresses,
ens_addresses,
)
diff --git a/tests/core/eth-module/test_transactions.py b/tests/core/eth-module/test_transactions.py
index a455c311..91ab248a 100644
--- a/tests/core/eth-module/test_transactions.py
+++ b/tests/core/eth-module/test_transactions.py
@@ -9,7 +9,7 @@ from hexbytes import (
HexBytes,
)
-from web3._utils.ens import (
+from web3._utils.validation_utils import (
ens_addresses,
)
from web3._utils.rpc_abi import (
diff --git a/web3/_utils/abi.py b/web3/_utils/abi.py
index c3535180..64456cf0 100644
--- a/web3/_utils/abi.py
+++ b/web3/_utils/abi.py
@@ -14,7 +14,7 @@ from eth_utils import decode_hex, is_bytes, is_list_like, is_string, is_text, to
from eth_utils.abi import collapse_if_tuple
from eth_utils.toolz import curry, partial, pipe
from web3._utils.decorators import reject_recursive_repeats
-from web3._utils.ens import is_ens_name
+from web3._utils.validation_utils import is_ens_name
from web3._utils.formatters import recursive_map
from web3.exceptions import FallbackNotFound, MismatchedABI
from web3.types import ABI, ABIEvent, ABIEventParams, ABIFunction, ABIFunctionParams, TReturn
@@ -29,7 +29,9 @@ def get_normalized_abi_arg_type(abi_arg: ABIEventParams) -> str:
makes use of `collapse_if_tuple()` to collapse the appropriate component
types within a tuple type, if present.
"""
- pass
+ if isinstance(abi_arg['type'], str):
+ return collapse_if_tuple(dict(abi_arg))
+ raise ValueError(f"Unknown ABI arg type: {abi_arg['type']}")
class AddressEncoder(encoding.AddressEncoder):
pass
@@ -73,7 +75,57 @@ def merge_args_and_kwargs(function_abi: ABIFunction, args: Sequence[Any], kwargs
given. Returns a list of argument values aligned to the order of inputs
defined in ``function_abi``.
"""
- pass
+ if not function_abi.get('inputs', None):
+ if args or kwargs:
+ raise TypeError(
+ "Function {} does not accept any arguments".format(function_abi.get('name', '[fallback]'))
+ )
+ return ()
+
+ input_names = get_abi_input_names(function_abi)
+ num_inputs = len(input_names)
+
+ # Check that all positional args are in bounds
+ if len(args) > num_inputs:
+ raise TypeError(
+ "Function {} received too many positional values".format(function_abi.get('name', '[fallback]'))
+ )
+
+ # Check that all keyword args are known
+ for key in kwargs:
+ if key not in input_names:
+ raise TypeError(
+ "{} is not a valid argument for function {}".format(
+ key, function_abi.get('name', '[fallback]')
+ )
+ )
+
+ # Check that same argument is not passed twice
+ for idx, value in enumerate(args):
+ if input_names[idx] in kwargs:
+ raise TypeError(
+ "Function {} got multiple values for argument {}".format(
+ function_abi.get('name', '[fallback]'),
+ input_names[idx],
+ )
+ )
+
+ # Fill remaining arguments with keyword values
+ args_as_kwargs = {name: arg for name, arg in zip(input_names, args)}
+ args_as_kwargs.update(kwargs)
+
+ # Check that all required args have been given
+ missing_args = set(input_names) - set(args_as_kwargs)
+ if missing_args:
+ raise TypeError(
+ "Function {} missing arguments: {}".format(
+ function_abi.get('name', '[fallback]'),
+ ', '.join(missing_args),
+ )
+ )
+
+ # Return values in order of inputs
+ return tuple(args_as_kwargs[name] for name in input_names)
TUPLE_TYPE_STR_RE = re.compile('^(tuple)((\\[([1-9]\\d*\\b)?])*)??$')
def get_tuple_type_str_parts(s: str) -> Optional[Tuple[str, Optional[str]]]:
@@ -81,14 +133,57 @@ def get_tuple_type_str_parts(s: str) -> Optional[Tuple[str, Optional[str]]]:
Takes a JSON ABI type string. For tuple type strings, returns the separated
prefix and array dimension parts. For all other strings, returns ``None``.
"""
- pass
+ match = TUPLE_TYPE_STR_RE.match(s)
+ if match:
+ tuple_prefix = match.group(1)
+ array_part = match.group(2) or None
+ return tuple_prefix, array_part
+ return None
+
+def _align_tuple_items(tuple_components: Sequence[ABIFunctionParams], tuple_value: Union[Sequence[Any], Mapping[Any, Any]]) -> Tuple[Any, ...]:
+ """
+ Takes a list of tuple component ABIs and a sequence or mapping of values.
+ Returns a tuple of values aligned to the component ABIs.
+ """
+ if isinstance(tuple_value, Mapping):
+ return tuple(
+ _align_abi_input(comp_abi, tuple_value[comp_abi['name']])
+ for comp_abi in tuple_components
+ )
+
+ return tuple(
+ _align_abi_input(comp_abi, tuple_item)
+ for comp_abi, tuple_item in zip(tuple_components, tuple_value)
+ )
def _align_abi_input(arg_abi: ABIFunctionParams, arg: Any) -> Tuple[Any, ...]:
"""
Aligns the values of any mapping at any level of nesting in ``arg``
according to the layout of the corresponding abi spec.
"""
- pass
+ tuple_parts = get_tuple_type_str_parts(arg_abi['type'])
+ if tuple_parts is None:
+ return arg
+
+ tuple_prefix, array_part = tuple_parts
+ tuple_components = arg_abi.get('components', None)
+ if tuple_components is None:
+ raise ValueError("Tuple components missing from ABI")
+
+ if array_part:
+ # If array dimension exists, map alignment to each tuple element
+ if not is_list_like(arg):
+ raise TypeError(
+ "Expected list-like data for type {}, got {}".format(
+ arg_abi['type'], type(arg)
+ )
+ )
+ return tuple(
+ _align_tuple_items(tuple_components, tuple_item)
+ for tuple_item in arg
+ )
+
+ return _align_tuple_items(tuple_components, arg)
def get_aligned_abi_inputs(abi: ABIFunction, args: Union[Tuple[Any, ...], Mapping[Any, Any]]) -> Tuple[Tuple[Any, ...], Tuple[Any, ...]]:
"""
@@ -98,7 +193,39 @@ def get_aligned_abi_inputs(abi: ABIFunction, args: Union[Tuple[Any, ...], Mappin
contained in ``args`` may contain nested mappings or sequences corresponding
to tuple-encoded values in ``abi``.
"""
- pass
+ inputs = abi.get('inputs', [])
+ if not inputs:
+ if args and not isinstance(args, (tuple, list)) or (isinstance(args, (tuple, list)) and args):
+ raise TypeError(
+ "Function {} does not accept any arguments".format(abi.get('name', '[fallback]'))
+ )
+ return (), ()
+
+ input_types = tuple(collapse_if_tuple(dict(arg)) for arg in inputs)
+
+ if isinstance(args, (list, tuple)):
+ if len(args) > len(inputs):
+ raise TypeError(
+ "Function {} received too many arguments".format(abi.get('name', '[fallback]'))
+ )
+ args_as_list = list(args)
+ else:
+ args_as_list = []
+ for input_abi in inputs:
+ if input_abi['name'] not in args:
+ raise TypeError(
+ "Function {} missing argument: {}".format(
+ abi.get('name', '[fallback]'),
+ input_abi['name']
+ )
+ )
+ args_as_list.append(args[input_abi['name']])
+
+ for i, input_abi in enumerate(inputs):
+ if i < len(args_as_list):
+ args_as_list[i] = _align_abi_input(input_abi, args_as_list[i])
+
+ return input_types, tuple(args_as_list)
DYNAMIC_TYPES = ['bytes', 'string']
INT_SIZES = range(8, 257, 8)
BYTES_SIZES = range(1, 33)
@@ -110,16 +237,104 @@ BASE_TYPE_REGEX = '|'.join((_type + '(?![a-z0-9])' for _type in itertools.chain(
SUB_TYPE_REGEX = '\\[[0-9]*\\]'
TYPE_REGEX = '^(?:{base_type})(?:(?:{sub_type})*)?$'.format(base_type=BASE_TYPE_REGEX, sub_type=SUB_TYPE_REGEX)
+def is_recognized_type(abi_type: TypeStr) -> bool:
+ """
+ Returns True if the ABI type is a recognized type, False otherwise.
+ """
+ return bool(re.match(TYPE_REGEX, abi_type))
+
+def is_array_type(abi_type: TypeStr) -> bool:
+ """
+ Returns True if the ABI type is an array type, False otherwise.
+ """
+ return bool(re.match(ARRAY_REGEX, abi_type))
+
+def sub_type_of_array_type(abi_type: TypeStr) -> TypeStr:
+ """
+ Returns the type of the array elements.
+ """
+ if not is_array_type(abi_type):
+ raise ValueError(f"Not an array type: {abi_type}")
+ return re.sub(END_BRACKETS_OF_ARRAY_TYPE_REGEX, '', abi_type)
+
+def is_bool_type(abi_type: TypeStr) -> bool:
+ """
+ Returns True if the ABI type is a boolean type, False otherwise.
+ """
+ return abi_type == 'bool'
+
+def is_int_type(abi_type: TypeStr) -> bool:
+ """
+ Returns True if the ABI type is a signed integer type, False otherwise.
+ """
+ return abi_type.startswith('int') and not abi_type.startswith('uint')
+
+def is_uint_type(abi_type: TypeStr) -> bool:
+ """
+ Returns True if the ABI type is an unsigned integer type, False otherwise.
+ """
+ return abi_type.startswith('uint')
+
+def is_address_type(abi_type: TypeStr) -> bool:
+ """
+ Returns True if the ABI type is an address type, False otherwise.
+ """
+ return abi_type == 'address'
+
+def is_bytes_type(abi_type: TypeStr) -> bool:
+ """
+ Returns True if the ABI type is a bytes type, False otherwise.
+ """
+ return abi_type == 'bytes' or abi_type.startswith('bytes') and re.match('^bytes[0-9]+$', abi_type)
+
+def is_string_type(abi_type: TypeStr) -> bool:
+ """
+ Returns True if the ABI type is a string type, False otherwise.
+ """
+ return abi_type == 'string'
+
def size_of_type(abi_type: TypeStr) -> int:
"""
Returns size in bits of abi_type
"""
- pass
+ if not is_recognized_type(abi_type):
+ raise ValueError(f"Unrecognized abi_type: {abi_type}")
+
+ if is_array_type(abi_type):
+ sub_type = sub_type_of_array_type(abi_type)
+ return size_of_type(sub_type)
+
+ if abi_type == 'bool':
+ return 8
+ elif abi_type == 'address':
+ return 160
+ elif abi_type.startswith('bytes'):
+ if abi_type == 'bytes':
+ return None
+ return int(abi_type[5:]) * 8
+ elif abi_type.startswith('uint'):
+ return int(abi_type[4:])
+ elif abi_type.startswith('int'):
+ return int(abi_type[3:])
+ elif abi_type == 'string':
+ return None
+
+ raise ValueError(f"Unsupported abi_type: {abi_type}")
END_BRACKETS_OF_ARRAY_TYPE_REGEX = '\\[[^]]*\\]$'
ARRAY_REGEX = '^[a-zA-Z0-9_]+({sub_type})+$'.format(sub_type=SUB_TYPE_REGEX)
NAME_REGEX = '[a-zA-Z_][a-zA-Z0-9_]*'
ENUM_REGEX = '^{lib_name}\\.{enum_name}$'.format(lib_name=NAME_REGEX, enum_name=NAME_REGEX)
+def _get_data(data_tree: Any) -> Any:
+ """
+ Extract data values from an ABITypedData tree.
+ """
+ if isinstance(data_tree, ABITypedData):
+ return _get_data(data_tree.data)
+ elif isinstance(data_tree, (list, tuple)):
+ return type(data_tree)(_get_data(item) for item in data_tree)
+ return data_tree
+
@curry
def map_abi_data(normalizers: Sequence[Callable[[TypeStr, Any], Tuple[TypeStr, Any]]], types: Sequence[TypeStr], data: Sequence[Any]) -> Any:
"""
@@ -144,7 +359,13 @@ def map_abi_data(normalizers: Sequence[Callable[[TypeStr, Any], Tuple[TypeStr, A
2. Recursively mapping each of the normalizers to the data
3. Stripping the types back out of the tree
"""
- pass
+ pipeline = itertools.chain(
+ [abi_data_tree(types)],
+ map(data_tree_map, normalizers),
+ [lambda tree: _get_data(tree)],
+ )
+
+ return pipe(data, *pipeline)
@curry
def abi_data_tree(types: Sequence[TypeStr], data: Sequence[Any]) -> List[Any]:
@@ -157,7 +378,27 @@ def abi_data_tree(types: Sequence[TypeStr], data: Sequence[Any]) -> List[Any]:
>>> abi_data_tree(types=["bool[2]", "uint"], data=[[True, False], 0])
[("bool[2]", [("bool", True), ("bool", False)]), ("uint256", 0)]
"""
- pass
+ if len(types) != len(data):
+ raise ValueError(
+ "Length mismatch between types and data: got {0} types and {1} data items".format(
+ len(types), len(data)
+ )
+ )
+
+ results = []
+
+ for data_type, data_value in zip(types, data):
+ if is_array_type(data_type):
+ item_type = sub_type_of_array_type(data_type)
+ value_type = [
+ abi_data_tree([item_type], [item])[0]
+ for item in data_value
+ ]
+ results.append(ABITypedData([data_type, value_type]))
+ else:
+ results.append(ABITypedData([data_type, data_value]))
+
+ return results
@curry
def data_tree_map(func: Callable[[TypeStr, Any], Tuple[TypeStr, Any]], data_tree: Any) -> 'ABITypedData':
@@ -165,7 +406,21 @@ def data_tree_map(func: Callable[[TypeStr, Any], Tuple[TypeStr, Any]], data_tree
Map func to every ABITypedData element in the tree. func will
receive two args: abi_type, and data
"""
- pass
+ if isinstance(data_tree, ABITypedData):
+ abi_type, data = data_tree
+ if is_array_type(abi_type):
+ item_type = sub_type_of_array_type(abi_type)
+ value_type = [
+ data_tree_map(func, item)
+ for item in data
+ ]
+ return ABITypedData(func(abi_type, value_type))
+ else:
+ return ABITypedData(func(abi_type, data))
+ elif isinstance(data_tree, (list, tuple)):
+ return type(data_tree)(data_tree_map(func, item) for item in data_tree)
+ else:
+ return data_tree
class ABITypedData(namedtuple('ABITypedData', 'abi_type, data')):
"""
@@ -193,7 +448,15 @@ def named_tree(abi: Iterable[Union[ABIFunctionParams, ABIFunction, ABIEvent, Dic
"""
Convert function inputs/outputs or event data tuple to dict with names from ABI.
"""
- pass
+ names = [item['name'] for item in abi]
+ items_with_name = [
+ (name, data_item)
+ for name, data_item
+ in zip(names, data)
+ if name
+ ]
+
+ return dict(items_with_name)
async def async_data_tree_map(async_w3: 'AsyncWeb3', func: Callable[['AsyncWeb3', TypeStr, Any], Coroutine[Any, Any, Tuple[TypeStr, Any]]], data_tree: Any) -> 'ABITypedData':
"""
@@ -202,7 +465,24 @@ async def async_data_tree_map(async_w3: 'AsyncWeb3', func: Callable[['AsyncWeb3'
The awaitable method should receive three positional args:
async_w3, abi_type, and data
"""
- pass
+ if isinstance(data_tree, ABITypedData):
+ abi_type, data = data_tree
+ if is_array_type(abi_type):
+ item_type = sub_type_of_array_type(abi_type)
+ value_type = [
+ await async_data_tree_map(async_w3, func, item)
+ for item in data
+ ]
+ return ABITypedData(await func(async_w3, abi_type, value_type))
+ else:
+ return ABITypedData(await func(async_w3, abi_type, data))
+ elif isinstance(data_tree, (list, tuple)):
+ return type(data_tree)(
+ await async_data_tree_map(async_w3, func, item)
+ for item in data_tree
+ )
+ else:
+ return data_tree
@reject_recursive_repeats
async def async_recursive_map(async_w3: 'AsyncWeb3', func: Callable[[Any], Coroutine[Any, Any, TReturn]], data: Any) -> TReturn:
@@ -213,11 +493,31 @@ async def async_recursive_map(async_w3: 'AsyncWeb3', func: Callable[[Any], Corou
Define the awaitable method so that it only applies to the type of value that you
want it to apply to.
"""
- pass
+ result = await func(data)
+ return await async_map_if_collection(
+ lambda item: async_recursive_map(async_w3, func, item),
+ result
+ )
async def async_map_if_collection(func: Callable[[Any], Coroutine[Any, Any, Any]], value: Any) -> Any:
"""
Apply an awaitable method to each element of a collection or value of a dictionary.
If the value is not a collection, return it unmodified.
"""
- pass
\ No newline at end of file
+ if isinstance(value, dict):
+ return {
+ key: await async_map_if_collection(func, item)
+ for key, item in value.items()
+ }
+ elif isinstance(value, (list, tuple)):
+ return type(value)(
+ await async_map_if_collection(func, item)
+ for item in value
+ )
+ elif isinstance(value, abc.Collection) and not isinstance(value, (str, bytes)):
+ return type(value)(
+ await async_map_if_collection(func, item)
+ for item in value
+ )
+ else:
+ return value
\ No newline at end of file
diff --git a/web3/_utils/encoding.py b/web3/_utils/encoding.py
index f97c64a8..b606c091 100644
--- a/web3/_utils/encoding.py
+++ b/web3/_utils/encoding.py
@@ -14,25 +14,65 @@ def hex_encode_abi_type(abi_type: TypeStr, value: Any, force_size: Optional[int]
"""
Encodes value into a hex string in format of abi_type
"""
- pass
+ validate_abi_type(abi_type)
+ validate_abi_value(abi_type, value)
+
+ data_size = force_size or size_of_type(abi_type)
+ if is_array_type(abi_type):
+ sub_type = sub_type_of_array_type(abi_type)
+ return HexStr(''.join(
+ remove_0x_prefix(hex_encode_abi_type(sub_type, v, force_size))
+ for v in value
+ ))
+ elif is_bool_type(abi_type):
+ return to_hex_with_size(value, data_size)
+ elif is_uint_type(abi_type):
+ return to_hex_with_size(value, data_size)
+ elif is_int_type(abi_type):
+ return to_hex_twos_compliment(value, data_size)
+ elif is_address_type(abi_type):
+ return pad_hex(value, data_size)
+ elif is_bytes_type(abi_type):
+ if is_bytes(value):
+ return encode_hex(value)
+ else:
+ return value
+ elif is_string_type(abi_type):
+ return encode_hex(text_if_str(to_bytes, value))
+ else:
+ raise ValueError(f"Unsupported ABI type: {abi_type}")
def to_hex_twos_compliment(value: Any, bit_size: int) -> HexStr:
"""
Converts integer value to twos compliment hex representation with given bit_size
"""
- pass
+ if value >= 0:
+ return to_hex_with_size(value, bit_size)
+
+ value = (1 << bit_size) + value
+ hex_value = hex(value)[2:]
+ hex_size = bit_size // 4
+
+ return add_0x_prefix(hex_value.zfill(hex_size))
def to_hex_with_size(value: Any, bit_size: int) -> HexStr:
"""
Converts a value to hex with given bit_size:
"""
- pass
+ if not is_list_like(value):
+ value = [value]
+
+ hex_value = encode_hex(value[0])[2:]
+ hex_size = bit_size // 4
+
+ return add_0x_prefix(hex_value.zfill(hex_size))
def pad_hex(value: Any, bit_size: int) -> HexStr:
"""
Pads a hex string up to the given bit_size
"""
- pass
+ value = remove_0x_prefix(HexStr(value))
+ return add_0x_prefix(value.zfill(bit_size // 4))
zpad_bytes = pad_bytes(b'\x00')
@curry
@@ -44,7 +84,14 @@ def text_if_str(to_type: Callable[..., str], text_or_primitive: Union[Primitives
text=text), eg~ to_bytes, to_text, to_hex, to_int, etc
@param text_or_primitive in bytes, str, or int.
"""
- pass
+ if isinstance(text_or_primitive, (bytes, int, bool)):
+ return to_type(text_or_primitive)
+ elif isinstance(text_or_primitive, str):
+ return to_type(text_or_primitive, text=text_or_primitive)
+ else:
+ raise TypeError(
+ "Expected string, bytes, int, or bool. Got {}".format(type(text_or_primitive))
+ )
@curry
def hexstr_if_str(to_type: Callable[..., HexStr], hexstr_or_primitive: Union[Primitives, HexStr, str]) -> HexStr:
@@ -55,7 +102,20 @@ def hexstr_if_str(to_type: Callable[..., HexStr], hexstr_or_primitive: Union[Pri
text=text), eg~ to_bytes, to_text, to_hex, to_int, etc
@param hexstr_or_primitive in bytes, str, or int.
"""
- pass
+ if isinstance(hexstr_or_primitive, (bytes, int, bool)):
+ return to_type(hexstr_or_primitive)
+ elif isinstance(hexstr_or_primitive, str):
+ if is_hex(hexstr_or_primitive):
+ return to_type(hexstr_or_primitive, hexstr=hexstr_or_primitive)
+ else:
+ raise ValueError(
+ "When the type is 'hexstr' the value must be a valid hex string. "
+ "Got: {}".format(hexstr_or_primitive)
+ )
+ else:
+ raise TypeError(
+ "Expected string, bytes, int, or bool. Got {}".format(type(hexstr_or_primitive))
+ )
class FriendlyJsonSerde:
"""
@@ -65,15 +125,48 @@ class FriendlyJsonSerde:
information on which fields failed, to show more
helpful information in the raised error messages.
"""
+ def _json_mapping_errors(self, mapping: Dict[Any, Any], field_path: str='') -> Iterable[str]:
+ for key, val in mapping.items():
+ try:
+ json.dumps(key)
+ json.dumps(val)
+ except TypeError as exc:
+ field_name = field_path + str(key)
+ yield f"{field_name}: {exc}"
+ if isinstance(val, dict):
+ yield from self._json_mapping_errors(val, field_name + '.')
+
+ def json_decode(self, json_str: str) -> Dict[Any, Any]:
+ try:
+ decoded = json.loads(json_str)
+ except json.decoder.JSONDecodeError as exc:
+ raise ValueError(f"Could not decode json: {exc}")
+ return decoded
+
+ def json_encode(self, value: Dict[Any, Any], cls: Optional[Type[json.JSONEncoder]]=None) -> str:
+ try:
+ return json.dumps(value, cls=cls)
+ except TypeError as exc:
+ if json.dumps([]) == '[]':
+ # TypeError not caused by json module, let it bubble up
+ raise
+ # Get information about which fields failed
+ mapping_errors = '\n'.join(self._json_mapping_errors(value))
+ raise TypeError(f"Could not encode to JSON: {exc}\nMapping errors: {mapping_errors}")
class DynamicArrayPackedEncoder(BaseArrayEncoder):
is_dynamic = True
class Web3JsonEncoder(json.JSONEncoder):
- pass
+ def default(self, obj: Any) -> Any:
+ if isinstance(obj, AttributeDict):
+ return {key: value for key, value in obj.items()}
+ if isinstance(obj, HexBytes):
+ return obj.hex()
+ return json.JSONEncoder.default(self, obj)
def to_json(obj: Dict[Any, Any]) -> str:
"""
Convert a complex object (like a transaction object) to a JSON string
"""
- pass
\ No newline at end of file
+ return FriendlyJsonSerde().json_encode(obj, cls=Web3JsonEncoder)
\ No newline at end of file
diff --git a/web3/_utils/ens.py b/web3/_utils/ens.py
index e3e1b590..2eb47f7f 100644
--- a/web3/_utils/ens.py
+++ b/web3/_utils/ens.py
@@ -4,6 +4,8 @@ from eth_typing import ChecksumAddress
from eth_utils import is_0x_prefixed, is_hex, is_hex_address
from ens import ENS, AsyncENS
from web3.exceptions import NameNotFound
+
+from web3._utils.validation_utils import is_ens_name
if TYPE_CHECKING:
from web3 import AsyncWeb3, Web3
from web3.contract import Contract
@@ -22,4 +24,9 @@ def contract_ens_addresses(contract: 'Contract', name_addr_pairs: Dict[str, Chec
with contract_ens_addresses(mycontract, [('resolve-as-1s.eth', '0x111...111')]):
# any contract call or transaction in here would only resolve the above ENS pair
"""
- pass
\ No newline at end of file
+ tmp_ens = contract.w3.ens
+ contract.w3.ens = StaticENS(name_addr_pairs)
+ try:
+ yield
+ finally:
+ contract.w3.ens = tmp_ens
\ No newline at end of file
diff --git a/web3/_utils/module_testing/eth_module.py b/web3/_utils/module_testing/eth_module.py
index 0b041341..35428d59 100644
--- a/web3/_utils/module_testing/eth_module.py
+++ b/web3/_utils/module_testing/eth_module.py
@@ -10,7 +10,7 @@ from eth_utils import is_boolean, is_bytes, is_checksum_address, is_dict, is_int
from eth_utils.toolz import assoc
from hexbytes import HexBytes
from web3._utils.empty import empty
-from web3._utils.ens import ens_addresses
+from web3._utils.validation_utils import ens_addresses
from web3._utils.error_formatters_utils import PANIC_ERROR_CODES
from web3._utils.method_formatters import to_hex_if_integer
from web3._utils.module_testing.module_testing_utils import assert_contains_log, async_mock_offchain_lookup_request_response, flaky_geth_dev_mining, mock_offchain_lookup_request_response
diff --git a/web3/_utils/module_testing/web3_module.py b/web3/_utils/module_testing/web3_module.py
index 84870805..bb096fb7 100644
--- a/web3/_utils/module_testing/web3_module.py
+++ b/web3/_utils/module_testing/web3_module.py
@@ -3,7 +3,7 @@ from typing import Any, NoReturn, Sequence, Union
from eth_typing import ChecksumAddress, HexAddress, HexStr, TypeStr
from hexbytes import HexBytes
from web3 import AsyncWeb3, Web3
-from web3._utils.ens import ens_addresses
+from web3._utils.validation_utils import ens_addresses
from web3.exceptions import InvalidAddress
class Web3ModuleTest:
diff --git a/web3/_utils/normalizers.py b/web3/_utils/normalizers.py
index 86db09e5..7758abaf 100644
--- a/web3/_utils/normalizers.py
+++ b/web3/_utils/normalizers.py
@@ -11,7 +11,8 @@ from eth_utils.toolz import curry
from hexbytes import HexBytes
from ens import ENS, AsyncENS
from web3._utils.encoding import hexstr_if_str, text_if_str
-from web3._utils.ens import StaticENS, async_validate_name_has_address, is_ens_name, validate_name_has_address
+from web3._utils.validation_utils import is_ens_name
+from web3._utils.ens import StaticENS
from web3._utils.validation import validate_abi, validate_address
from web3.exceptions import InvalidAddress, NameNotFound
from web3.types import ABI
diff --git a/web3/_utils/validation.py b/web3/_utils/validation.py
index ed960605..a4967ea4 100644
--- a/web3/_utils/validation.py
+++ b/web3/_utils/validation.py
@@ -10,27 +10,121 @@ from web3._utils.abi import abi_to_signature, filter_by_type, is_address_type, i
from web3.exceptions import InvalidAddress
from web3.types import ABI, ABIFunction
+def validate_abi_item(abi_item: Dict[str, Any]) -> None:
+ """
+ Helper function for validating an ABI item
+ """
+ if 'type' not in abi_item:
+ raise ValueError("'type' is required in the ABI item")
+
+ abi_type = abi_item['type']
+ if abi_type not in ('function', 'constructor', 'event', 'fallback', 'receive'):
+ raise ValueError(f"'type' must be one of 'function', 'constructor', 'event', 'fallback', or 'receive'. Got {abi_type}")
+
+ if abi_type != 'fallback' and abi_type != 'receive':
+ if 'inputs' not in abi_item:
+ raise ValueError("'inputs' is required in the ABI item")
+ if not is_list_like(abi_item['inputs']):
+ raise ValueError("'inputs' must be a list")
+
+ for input_item in abi_item['inputs']:
+ validate_abi_input_output(input_item)
+
+ if abi_type in ('function', 'constructor'):
+ if 'outputs' not in abi_item:
+ raise ValueError("'outputs' is required in the ABI item")
+ if not is_list_like(abi_item['outputs']):
+ raise ValueError("'outputs' must be a list")
+
+ for output_item in abi_item['outputs']:
+ validate_abi_input_output(output_item)
+
+def validate_abi_input_output(item: Dict[str, Any]) -> None:
+ """
+ Helper function for validating an ABI input or output item
+ """
+ if 'type' not in item:
+ raise ValueError("'type' is required in the ABI input/output item")
+ validate_abi_type(item['type'])
+
def validate_abi(abi: ABI) -> None:
"""
Helper function for validating an ABI
"""
- pass
+ if not is_list_like(abi):
+ raise ValueError("'abi' is not a list")
+
+ for abi_item in abi:
+ if not is_dict(abi_item):
+ raise ValueError("The elements of 'abi' are not all dictionaries")
+
+ validate_abi_item(abi_item)
def validate_abi_type(abi_type: TypeStr) -> None:
"""
Helper function for validating an abi_type
"""
- pass
+ if not is_recognized_type(abi_type):
+ raise ValueError(f"'{abi_type}' is not a recognized type")
+
+ if is_array_type(abi_type):
+ validate_abi_type(sub_type_of_array_type(abi_type))
def validate_abi_value(abi_type: TypeStr, value: Any) -> None:
"""
Helper function for validating a value against the expected abi_type
Note: abi_type 'bytes' must either be python3 'bytes' object or ''
"""
- pass
+ if is_array_type(abi_type) and not is_list_like(value):
+ raise TypeError(f"Value must be list-like for array type: {abi_type}")
+
+ if is_array_type(abi_type):
+ sub_type = sub_type_of_array_type(abi_type)
+ for v in value:
+ validate_abi_value(sub_type, v)
+ return
+
+ if is_bool_type(abi_type) and not is_boolean(value):
+ raise TypeError(f"Value must be boolean for type: {abi_type}")
+
+ elif is_uint_type(abi_type) or is_int_type(abi_type):
+ if not is_integer(value):
+ raise TypeError(f"Value must be integer for type: {abi_type}")
+
+ elif is_address_type(abi_type):
+ validate_address(value)
+
+ elif is_bytes_type(abi_type):
+ if not is_bytes(value) and not is_string(value):
+ raise TypeError(f"Value must be bytes or string for type: {abi_type}")
+
+ elif is_string_type(abi_type) and not is_string(value):
+ raise TypeError(f"Value must be string for type: {abi_type}")
def validate_address(value: Any) -> None:
"""
Helper function for validating an address
"""
- pass
\ No newline at end of file
+ if not is_address(value):
+ raise InvalidAddress("Value must be a valid address, zero-padded to 20 bytes")
+
+def is_address(value: Any) -> bool:
+ """
+ Helper function for checking if a value is a valid address
+ """
+ if not is_string(value):
+ return False
+
+ if is_binary_address(value):
+ return True
+
+ if is_hex_address(value):
+ return True
+
+ if is_checksum_address(value):
+ return True
+
+ if is_valid_ens_name(value):
+ return True
+
+ return False
\ No newline at end of file
diff --git a/web3/_utils/validation_utils.py b/web3/_utils/validation_utils.py
new file mode 100644
index 00000000..a7255c67
--- /dev/null
+++ b/web3/_utils/validation_utils.py
@@ -0,0 +1,22 @@
+from typing import Any
+from eth_utils import is_0x_prefixed, is_hex_address
+
+def is_ens_name(value: Any) -> bool:
+ """
+ Check if the given value is a valid ENS name.
+
+ A valid ENS name:
+ - Is a string
+ - Contains at least one dot (.)
+ - Is not a hex address
+ - Is not 0x-prefixed
+ """
+ if not isinstance(value, str):
+ return False
+ if not "." in value:
+ return False
+ if is_hex_address(value):
+ return False
+ if is_0x_prefixed(value):
+ return False
+ return True
\ No newline at end of file
diff --git a/web3/pm.py b/web3/pm.py
index 26b403fa..9c17da87 100644
--- a/web3/pm.py
+++ b/web3/pm.py
@@ -11,7 +11,7 @@ from ethpm.uri import is_supported_content_addressed_uri, resolve_uri_contents
from ethpm.validation.manifest import validate_manifest_against_schema, validate_raw_manifest_format
from ethpm.validation.package import validate_package_name, validate_package_version
from web3 import Web3
-from web3._utils.ens import is_ens_name
+from web3._utils.validation_utils import is_ens_name
from web3.exceptions import InvalidAddress, NameNotFound
from web3.module import Module
T = TypeVar('T')