diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 38373ff809..ce9d6599d8 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -55,7 +55,15 @@ TableAlreadyExistsError, UnauthorizedError, ) -from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, FileIO, load_file_io +from pyiceberg.io import ( + AWS_ACCESS_KEY_ID, + AWS_PROFILE_NAME, + AWS_REGION, + AWS_SECRET_ACCESS_KEY, + AWS_SESSION_TOKEN, + FileIO, + load_file_io, +) from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec, assign_fresh_partition_spec_ids from pyiceberg.schema import Schema, assign_fresh_schema_ids from pyiceberg.table import ( @@ -77,7 +85,7 @@ from pyiceberg.typedef import EMPTY_DICT, UTF8, IcebergBaseModel, Identifier, Properties from pyiceberg.types import transform_dict_value_to_str from pyiceberg.utils.deprecated import deprecation_message -from pyiceberg.utils.properties import get_first_property_value, get_header_properties, property_as_bool +from pyiceberg.utils.properties import get_first_property_value, get_header_properties, property_as_bool, property_as_int if TYPE_CHECKING: import pyarrow as pa @@ -228,6 +236,10 @@ class IdentifierKind(Enum): SIGV4 = "rest.sigv4-enabled" SIGV4_REGION = "rest.signing-region" SIGV4_SERVICE = "rest.signing-name" +SIGV4_MAX_RETRIES = "rest.sigv4.max-retries" +SIGV4_DEFAULT_MAX_RETRIES = 10 +SIGV4_RETRY_BACKOFF_FACTOR = 0.5 +SIGV4_RETRY_STATUS_CODES = (429, 500, 502, 503, 504) EMPTY_BODY_SHA256: str = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" OAUTH2_SERVER_URI = "oauth2-server-uri" SNAPSHOT_LOADING_MODE = "snapshot-loading-mode" @@ -685,12 +697,27 @@ def _init_sigv4(self, session: Session) -> None: from botocore.awsrequest import AWSRequest from requests import PreparedRequest from requests.adapters import HTTPAdapter + from urllib3.util.retry import Retry class SigV4Adapter(HTTPAdapter): def __init__(self, **properties: str): - super().__init__() self._properties = properties + max_retries = property_as_int(self._properties, SIGV4_MAX_RETRIES, SIGV4_DEFAULT_MAX_RETRIES) + super().__init__( + max_retries=Retry( + total=max_retries, + status=max_retries, + connect=max_retries, + read=max_retries, + # Keep retries conservative for idempotent calls. + allowed_methods=frozenset({"GET", "HEAD", "OPTIONS"}), + status_forcelist=SIGV4_RETRY_STATUS_CODES, + backoff_factor=SIGV4_RETRY_BACKOFF_FACTOR, + respect_retry_after_header=True, + ) + ) self._boto_session = boto3.Session( + profile_name=get_first_property_value(self._properties, AWS_PROFILE_NAME), region_name=get_first_property_value(self._properties, AWS_REGION), botocore_session=self._properties.get(BOTOCORE_SESSION), aws_access_key_id=get_first_property_value(self._properties, AWS_ACCESS_KEY_ID), diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index c45d899388..0453327b5b 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -33,6 +33,8 @@ DEFAULT_ENDPOINTS, EMPTY_BODY_SHA256, OAUTH2_SERVER_URI, + SIGV4_DEFAULT_MAX_RETRIES, + SIGV4_MAX_RETRIES, SNAPSHOT_LOADING_MODE, Capability, Endpoint, @@ -529,6 +531,67 @@ def test_sigv4_sign_request_with_body(rest_mock: Mocker) -> None: assert prepared.headers.get("x-amz-content-sha256") != EMPTY_BODY_SHA256 +def test_sigv4_adapter_default_retry_config(rest_mock: Mocker) -> None: + catalog = RestCatalog( + "rest", + **{ + "uri": TEST_URI, + "token": TEST_TOKEN, + "rest.sigv4-enabled": "true", + "rest.signing-region": "us-west-2", + "client.access-key-id": "id", + "client.secret-access-key": "secret", + }, + ) + + adapter = catalog._session.adapters[catalog.uri] + assert isinstance(adapter, HTTPAdapter) + assert adapter.max_retries.total == SIGV4_DEFAULT_MAX_RETRIES + assert 429 in adapter.max_retries.status_forcelist + + +def test_sigv4_adapter_override_retry_config(rest_mock: Mocker) -> None: + catalog = RestCatalog( + "rest", + **{ + "uri": TEST_URI, + "token": TEST_TOKEN, + "rest.sigv4-enabled": "true", + "rest.signing-region": "us-west-2", + "client.access-key-id": "id", + "client.secret-access-key": "secret", + SIGV4_MAX_RETRIES: "3", + }, + ) + + adapter = catalog._session.adapters[catalog.uri] + assert isinstance(adapter, HTTPAdapter) + assert adapter.max_retries.total == 3 + + +def test_sigv4_uses_client_profile_name(rest_mock: Mocker) -> None: + with mock.patch("boto3.Session") as mock_session: + RestCatalog( + "rest", + **{ + "uri": TEST_URI, + "token": TEST_TOKEN, + "rest.sigv4-enabled": "true", + "rest.signing-region": "us-west-2", + "client.profile-name": "rest-profile", + }, + ) + + mock_session.assert_called_with( + profile_name="rest-profile", + region_name=None, + botocore_session=None, + aws_access_key_id=None, + aws_secret_access_key=None, + aws_session_token=None, + ) + + def test_list_tables_404(rest_mock: Mocker) -> None: namespace = "examples" rest_mock.get(