diff --git a/google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py b/google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py index d8b8eeb1c..56aba3e27 100644 --- a/google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py +++ b/google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py @@ -270,9 +270,10 @@ async def download_ranges( client_checksum = int.from_bytes(client_crc32c, "big") if server_checksum != client_checksum: - raise DataCorruption(response, + raise DataCorruption( + response, f"Checksum mismatch for read_id {object_data_range.read_range.read_id}. " - f"Server sent {server_checksum}, client calculated {client_checksum}." + f"Server sent {server_checksum}, client calculated {client_checksum}.", ) read_id = object_data_range.read_range.read_id diff --git a/google/cloud/storage/_opentelemetry_tracing.py b/google/cloud/storage/_opentelemetry_tracing.py index 3416081cd..b654aae2b 100644 --- a/google/cloud/storage/_opentelemetry_tracing.py +++ b/google/cloud/storage/_opentelemetry_tracing.py @@ -18,7 +18,7 @@ import os from contextlib import contextmanager - +from urllib.parse import urlparse from google.api_core import exceptions as api_exceptions from google.api_core import retry as api_retry from google.cloud.storage import __version__ @@ -28,7 +28,15 @@ ENABLE_OTEL_TRACES_ENV_VAR = "ENABLE_GCS_PYTHON_CLIENT_OTEL_TRACES" _DEFAULT_ENABLE_OTEL_TRACES_VALUE = False -enable_otel_traces = os.environ.get( + +def _parse_bool_env(name: str, default: bool = False) -> bool: + val = os.environ.get(name, None) + if val is None: + return default + return str(val).strip().lower() in {"1", "true", "yes", "on"} + + +enable_otel_traces = _parse_bool_env( ENABLE_OTEL_TRACES_ENV_VAR, _DEFAULT_ENABLE_OTEL_TRACES_VALUE ) logger = logging.getLogger(__name__) @@ -105,11 +113,10 @@ def _set_api_request_attr(request, client): if request.get("method"): attr["http.request.method"] = request.get("method") if request.get("path"): - path = request.get("path") - full_path = f"{client._connection.API_BASE_URL}{path}" - attr["url.full"] = full_path - if request.get("timeout"): - attr["connect_timeout,read_timeout"] = request.get("timeout") + full_url = client._connection.build_api_url(request.get("path")) + attr.update(_get_opentelemetry_attributes_from_url(full_url, strip_query=True)) + if "timeout" in request: + attr["connect_timeout,read_timeout"] = str(request.get("timeout")) return attr @@ -117,3 +124,26 @@ def _set_retry_attr(retry, conditional_predicate=None): predicate = conditional_predicate if conditional_predicate else retry._predicate retry_info = f"multiplier{retry._multiplier}/deadline{retry._deadline}/max{retry._maximum}/initial{retry._initial}/predicate{predicate}" return {"retry": retry_info} + + +def _get_opentelemetry_attributes_from_url(url, strip_query=True): + """Helper to assemble OpenTelemetry span attributes from a URL.""" + u = urlparse(url) + netloc = u.netloc + # u.hostname is always lowercase. We parse netloc to preserve casing. + # netloc format: [userinfo@]host[:port] + if "@" in netloc: + netloc = netloc.split("@", 1)[1] + if ":" in netloc and not netloc.endswith("]"): # Handle IPv6 literal + netloc = netloc.split(":", 1)[0] + + attributes = { + "server.address": netloc, + "server.port": u.port, + "url.scheme": u.scheme, + "url.path": u.path, + } + if not strip_query: + attributes["url.query"] = u.query + + return attributes diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index 381ce5c9d..746334d1c 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -47,6 +47,9 @@ from google.cloud._helpers import _rfc3339_nanos_to_datetime from google.cloud._helpers import _to_bytes from google.cloud.exceptions import NotFound +from google.cloud.storage._opentelemetry_tracing import ( + _get_opentelemetry_attributes_from_url, +) from google.cloud.storage._helpers import _add_etag_match_headers from google.cloud.storage._helpers import _add_generation_match_parameters from google.cloud.storage._helpers import _PropertyMixin @@ -1055,13 +1058,11 @@ def _do_download( Please enable this as per your use case. """ - extra_attributes = { - "url.full": download_url, - "download.chunk_size": f"{self.chunk_size}", - "download.raw_download": raw_download, - "upload.checksum": f"{checksum}", - "download.single_shot_download": single_shot_download, - } + extra_attributes = _get_opentelemetry_attributes_from_url(download_url) + extra_attributes["download.chunk_size"] = f"{self.chunk_size}" + extra_attributes["download.raw_download"] = raw_download + extra_attributes["upload.checksum"] = f"{checksum}" + extra_attributes["download.single_shot_download"] = single_shot_download args = {"timeout": timeout} if self.chunk_size is None: @@ -2048,10 +2049,8 @@ def _do_multipart_upload( upload_url, headers=headers, checksum=checksum, retry=retry ) - extra_attributes = { - "url.full": upload_url, - "upload.checksum": f"{checksum}", - } + extra_attributes = _get_opentelemetry_attributes_from_url(upload_url) + extra_attributes["upload.checksum"] = f"{checksum}" args = {"timeout": timeout} with create_trace_span( name="Storage.MultipartUpload/transmit", @@ -2448,11 +2447,10 @@ def _do_resumable_upload( command=command, crc32c_checksum_value=crc32c_checksum_value, ) - extra_attributes = { - "url.full": upload.resumable_url, - "upload.chunk_size": upload.chunk_size, - "upload.checksum": f"{checksum}", - } + extra_attributes = _get_opentelemetry_attributes_from_url(upload.resumable_url) + extra_attributes["upload.chunk_size"] = upload.chunk_size + extra_attributes["upload.checksum"] = f"{checksum}" + args = {"timeout": timeout} with create_trace_span( name="Storage.ResumableUpload/transmitNextChunk", diff --git a/tests/unit/asyncio/test_async_multi_range_downloader.py b/tests/unit/asyncio/test_async_multi_range_downloader.py index 529aa4559..8c1137980 100644 --- a/tests/unit/asyncio/test_async_multi_range_downloader.py +++ b/tests/unit/asyncio/test_async_multi_range_downloader.py @@ -260,25 +260,37 @@ async def test_downloading_without_opening_should_throw_error( assert str(exc.value) == "Underlying bidi-gRPC stream is not open" assert not mrd.is_stream_open - @mock.patch("google.cloud.storage._experimental.asyncio.async_multi_range_downloader.google_crc32c") - @mock.patch("google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client") + @mock.patch( + "google.cloud.storage._experimental.asyncio.async_multi_range_downloader.google_crc32c" + ) + @mock.patch( + "google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client" + ) def test_init_raises_if_crc32c_c_extension_is_missing( self, mock_grpc_client, mock_google_crc32c ): mock_google_crc32c.implementation = "python" with pytest.raises(exceptions.NotFound) as exc_info: - AsyncMultiRangeDownloader( - mock_grpc_client, "bucket", "object" - ) + AsyncMultiRangeDownloader(mock_grpc_client, "bucket", "object") - assert "The google-crc32c package is not installed with C support" in str(exc_info.value) + assert "The google-crc32c package is not installed with C support" in str( + exc_info.value + ) @pytest.mark.asyncio - @mock.patch("google.cloud.storage._experimental.asyncio.async_multi_range_downloader.Checksum") - @mock.patch("google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client") - async def test_download_ranges_raises_on_checksum_mismatch(self, mock_client, mock_checksum_class): - mock_stream = mock.AsyncMock(spec=async_read_object_stream._AsyncReadObjectStream) + @mock.patch( + "google.cloud.storage._experimental.asyncio.async_multi_range_downloader.Checksum" + ) + @mock.patch( + "google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client" + ) + async def test_download_ranges_raises_on_checksum_mismatch( + self, mock_client, mock_checksum_class + ): + mock_stream = mock.AsyncMock( + spec=async_read_object_stream._AsyncReadObjectStream + ) test_data = b"some-data" server_checksum = 12345 @@ -299,9 +311,7 @@ async def test_download_ranges_raises_on_checksum_mismatch(self, mock_client, mo mock_stream.recv.side_effect = [mock_response, None] - mrd = AsyncMultiRangeDownloader( - mock_client, "bucket", "object" - ) + mrd = AsyncMultiRangeDownloader(mock_client, "bucket", "object") mrd.read_obj_str = mock_stream mrd._is_stream_open = True @@ -310,4 +320,3 @@ async def test_download_ranges_raises_on_checksum_mismatch(self, mock_client, mo assert "Checksum mismatch" in str(exc_info.value) mock_checksum_class.assert_called_once_with(test_data) - diff --git a/tests/unit/test__opentelemetry_tracing.py b/tests/unit/test__opentelemetry_tracing.py index bdbb40fd2..c1660dd5e 100644 --- a/tests/unit/test__opentelemetry_tracing.py +++ b/tests/unit/test__opentelemetry_tracing.py @@ -58,6 +58,13 @@ def setup_optin(mock_os_environ): importlib.reload(_opentelemetry_tracing) +@pytest.fixture() +def setup_optout(mock_os_environ): + """Mock envar to opt-in tracing for storage client.""" + mock_os_environ["ENABLE_GCS_PYTHON_CLIENT_OTEL_TRACES"] = "False" + importlib.reload(_opentelemetry_tracing) + + def test_opentelemetry_not_installed(setup, monkeypatch): monkeypatch.setitem(sys.modules, "opentelemetry", None) importlib.reload(_opentelemetry_tracing) @@ -83,6 +90,13 @@ def test_enable_trace_yield_span(setup, setup_optin): assert span is not None +def test_disable_traces(setup, setup_optout): + assert _opentelemetry_tracing.HAS_OPENTELEMETRY + assert not _opentelemetry_tracing.enable_otel_traces + with _opentelemetry_tracing.create_trace_span("No-ops for opentelemetry") as span: + assert span is None + + def test_enable_trace_call(setup, setup_optin): from opentelemetry import trace as trace_api @@ -136,7 +150,7 @@ def test_get_final_attributes(setup, setup_optin): } api_request = { "method": "GET", - "path": "/foo/bar/baz", + "path": "/foo/bar/baz?sensitive=true", "timeout": (100, 100), } retry_obj = api_retry.Retry() @@ -147,15 +161,19 @@ def test_get_final_attributes(setup, setup_optin): "rpc.system": "http", "user_agent.original": f"gcloud-python/{__version__}", "http.request.method": "GET", - "url.full": "https://testOtel.org/foo/bar/baz", - "connect_timeout,read_timeout": (100, 100), + "server.address": "testOtel.org", + "url.path": "/foo/bar/baz", + "url.scheme": "https", + "connect_timeout,read_timeout": str((100, 100)), "retry": f"multiplier{retry_obj._multiplier}/deadline{retry_obj._deadline}/max{retry_obj._maximum}/initial{retry_obj._initial}/predicate{retry_obj._predicate}", } expected_attributes.update(_opentelemetry_tracing._cloud_trace_adoption_attrs) with mock.patch("google.cloud.storage.client.Client") as test_client: test_client.project = "test_project" - test_client._connection.API_BASE_URL = "https://testOtel.org" + test_client._connection.build_api_url.return_value = ( + "https://testOtel.org/foo/bar/baz?sensitive=true" + ) with _opentelemetry_tracing.create_trace_span( test_span_name, attributes=test_span_attributes, @@ -165,6 +183,7 @@ def test_get_final_attributes(setup, setup_optin): ) as span: assert span is not None assert span.name == test_span_name + assert "url.query" not in span.attributes assert span.attributes == expected_attributes @@ -196,23 +215,108 @@ def test_set_conditional_retry_attr(setup, setup_optin): assert span.attributes == expected_attributes -def test_set_api_request_attr(): - from google.cloud.storage import Client +def test__get_opentelemetry_attributes_from_url(): + url = "https://example.com:8080/path?query=true" + expected = { + "server.address": "example.com", + "server.port": 8080, + "url.scheme": "https", + "url.path": "/path", + } + # Test stripping query + attrs = _opentelemetry_tracing._get_opentelemetry_attributes_from_url( + url, strip_query=True + ) + assert attrs == expected + assert "url.query" not in attrs + + # Test not stripping query + expected["url.query"] = "query=true" + attrs = _opentelemetry_tracing._get_opentelemetry_attributes_from_url( + url, strip_query=False + ) + assert attrs == expected - test_client = Client() - args_method = {"method": "GET"} - expected_attributes = {"http.request.method": "GET"} - attr = _opentelemetry_tracing._set_api_request_attr(args_method, test_client) - assert attr == expected_attributes - args_path = {"path": "/foo/bar/baz"} - expected_attributes = {"url.full": "https://storage.googleapis.com/foo/bar/baz"} - attr = _opentelemetry_tracing._set_api_request_attr(args_path, test_client) - assert attr == expected_attributes +def test__get_opentelemetry_attributes_from_url_with_query(): + url = "https://example.com/path?query=true&another=false" + expected = { + "server.address": "example.com", + "server.port": None, + "url.scheme": "https", + "url.path": "/path", + "url.query": "query=true&another=false", + } + # Test not stripping query + attrs = _opentelemetry_tracing._get_opentelemetry_attributes_from_url( + url, strip_query=False + ) + assert attrs == expected - args_timeout = {"timeout": (100, 100)} + +def test_set_api_request_attr_with_pii_in_query(): + client = mock.Mock() + client._connection.build_api_url.return_value = ( + "https://example.com/path?sensitive=true&token=secret" + ) + + request = { + "method": "GET", + "path": "/path?sensitive=true&token=secret", + "timeout": 60, + } expected_attributes = { - "connect_timeout,read_timeout": (100, 100), + "http.request.method": "GET", + "server.address": "example.com", + "server.port": None, + "url.scheme": "https", + "url.path": "/path", + "connect_timeout,read_timeout": "60", } - attr = _opentelemetry_tracing._set_api_request_attr(args_timeout, test_client) + attr = _opentelemetry_tracing._set_api_request_attr(request, client) assert attr == expected_attributes + assert "url.query" not in attr # Ensure query with PII is not captured + + +def test_set_api_request_attr_no_timeout(): + client = mock.Mock() + client._connection.build_api_url.return_value = "https://example.com/path" + + request = {"method": "GET", "path": "/path"} + attr = _opentelemetry_tracing._set_api_request_attr(request, client) + assert "connect_timeout,read_timeout" not in attr + + +@pytest.mark.parametrize( + "env_value, default, expected", + [ + # Test default values when env var is not set + (None, False, False), + (None, True, True), + # Test truthy values + ("1", False, True), + ("true", False, True), + ("yes", False, True), + ("on", False, True), + ("TRUE", False, True), + (" Yes ", False, True), + # Test falsy values + ("0", False, False), + ("false", False, False), + ("no", False, False), + ("off", False, False), + ("any_other_string", False, False), + ("", False, False), + # Test with default=True and falsy values + ("false", True, False), + ("0", True, False), + ], +) +def test__parse_bool_env(monkeypatch, env_value, default, expected): + env_var_name = "TEST_ENV_VAR" + monkeypatch.setenv( + env_var_name, str(env_value) + ) if env_value is not None else monkeypatch.delenv(env_var_name, raising=False) + + result = _opentelemetry_tracing._parse_bool_env(env_var_name, default) + assert result is expected diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index db8094a95..99de31961 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -684,6 +684,7 @@ def test__list_resource_w_defaults(self): credentials = _make_credentials() client = self._make_one(project=project, credentials=credentials) connection = client._base_connection = _make_connection() + connection.build_api_url = mock.Mock(return_value="http://example.com" + path) iterator = client._list_resource( path=path, @@ -719,6 +720,7 @@ def test__list_resource_w_explicit(self): credentials = _make_credentials() client = self._make_one(project=project, credentials=credentials) connection = client._base_connection = _make_connection() + connection.build_api_url = mock.Mock(return_value="http://example.com" + path) iterator = client._list_resource( path=path,