Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,10 @@ async def download_ranges(
client_checksum = int.from_bytes(client_crc32c, "big")

if server_checksum != client_checksum:
raise DataCorruption(response,
raise DataCorruption(
response,
f"Checksum mismatch for read_id {object_data_range.read_range.read_id}. "
f"Server sent {server_checksum}, client calculated {client_checksum}."
f"Server sent {server_checksum}, client calculated {client_checksum}.",
)

read_id = object_data_range.read_range.read_id
Expand Down
44 changes: 37 additions & 7 deletions google/cloud/storage/_opentelemetry_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import os

from contextlib import contextmanager

from urllib.parse import urlparse
from google.api_core import exceptions as api_exceptions
from google.api_core import retry as api_retry
from google.cloud.storage import __version__
Expand All @@ -28,7 +28,15 @@
ENABLE_OTEL_TRACES_ENV_VAR = "ENABLE_GCS_PYTHON_CLIENT_OTEL_TRACES"
_DEFAULT_ENABLE_OTEL_TRACES_VALUE = False

enable_otel_traces = os.environ.get(

def _parse_bool_env(name: str, default: bool = False) -> bool:
val = os.environ.get(name, None)
if val is None:
return default
return str(val).strip().lower() in {"1", "true", "yes", "on"}


enable_otel_traces = _parse_bool_env(
ENABLE_OTEL_TRACES_ENV_VAR, _DEFAULT_ENABLE_OTEL_TRACES_VALUE
)
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -105,15 +113,37 @@ def _set_api_request_attr(request, client):
if request.get("method"):
attr["http.request.method"] = request.get("method")
if request.get("path"):
path = request.get("path")
full_path = f"{client._connection.API_BASE_URL}{path}"
attr["url.full"] = full_path
if request.get("timeout"):
attr["connect_timeout,read_timeout"] = request.get("timeout")
full_url = client._connection.build_api_url(request.get("path"))
attr.update(_get_opentelemetry_attributes_from_url(full_url, strip_query=True))
if "timeout" in request:
attr["connect_timeout,read_timeout"] = str(request.get("timeout"))
return attr


def _set_retry_attr(retry, conditional_predicate=None):
predicate = conditional_predicate if conditional_predicate else retry._predicate
retry_info = f"multiplier{retry._multiplier}/deadline{retry._deadline}/max{retry._maximum}/initial{retry._initial}/predicate{predicate}"
return {"retry": retry_info}


def _get_opentelemetry_attributes_from_url(url, strip_query=True):
"""Helper to assemble OpenTelemetry span attributes from a URL."""
u = urlparse(url)
netloc = u.netloc
# u.hostname is always lowercase. We parse netloc to preserve casing.
# netloc format: [userinfo@]host[:port]
if "@" in netloc:
netloc = netloc.split("@", 1)[1]
if ":" in netloc and not netloc.endswith("]"): # Handle IPv6 literal
netloc = netloc.split(":", 1)[0]

attributes = {
"server.address": netloc,
"server.port": u.port,
"url.scheme": u.scheme,
"url.path": u.path,
}
if not strip_query:
attributes["url.query"] = u.query

return attributes
30 changes: 14 additions & 16 deletions google/cloud/storage/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
from google.cloud._helpers import _rfc3339_nanos_to_datetime
from google.cloud._helpers import _to_bytes
from google.cloud.exceptions import NotFound
from google.cloud.storage._opentelemetry_tracing import (
_get_opentelemetry_attributes_from_url,
)
from google.cloud.storage._helpers import _add_etag_match_headers
from google.cloud.storage._helpers import _add_generation_match_parameters
from google.cloud.storage._helpers import _PropertyMixin
Expand Down Expand Up @@ -1055,13 +1058,11 @@ def _do_download(
Please enable this as per your use case.
"""

extra_attributes = {
"url.full": download_url,
"download.chunk_size": f"{self.chunk_size}",
"download.raw_download": raw_download,
"upload.checksum": f"{checksum}",
"download.single_shot_download": single_shot_download,
}
extra_attributes = _get_opentelemetry_attributes_from_url(download_url)
extra_attributes["download.chunk_size"] = f"{self.chunk_size}"
extra_attributes["download.raw_download"] = raw_download
extra_attributes["upload.checksum"] = f"{checksum}"
extra_attributes["download.single_shot_download"] = single_shot_download
args = {"timeout": timeout}

if self.chunk_size is None:
Expand Down Expand Up @@ -2048,10 +2049,8 @@ def _do_multipart_upload(
upload_url, headers=headers, checksum=checksum, retry=retry
)

extra_attributes = {
"url.full": upload_url,
"upload.checksum": f"{checksum}",
}
extra_attributes = _get_opentelemetry_attributes_from_url(upload_url)
extra_attributes["upload.checksum"] = f"{checksum}"
args = {"timeout": timeout}
with create_trace_span(
name="Storage.MultipartUpload/transmit",
Expand Down Expand Up @@ -2448,11 +2447,10 @@ def _do_resumable_upload(
command=command,
crc32c_checksum_value=crc32c_checksum_value,
)
extra_attributes = {
"url.full": upload.resumable_url,
"upload.chunk_size": upload.chunk_size,
"upload.checksum": f"{checksum}",
}
extra_attributes = _get_opentelemetry_attributes_from_url(upload.resumable_url)
extra_attributes["upload.chunk_size"] = upload.chunk_size
extra_attributes["upload.checksum"] = f"{checksum}"

args = {"timeout": timeout}
with create_trace_span(
name="Storage.ResumableUpload/transmitNextChunk",
Expand Down
37 changes: 23 additions & 14 deletions tests/unit/asyncio/test_async_multi_range_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,25 +260,37 @@ async def test_downloading_without_opening_should_throw_error(
assert str(exc.value) == "Underlying bidi-gRPC stream is not open"
assert not mrd.is_stream_open

@mock.patch("google.cloud.storage._experimental.asyncio.async_multi_range_downloader.google_crc32c")
@mock.patch("google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client")
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_multi_range_downloader.google_crc32c"
)
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
def test_init_raises_if_crc32c_c_extension_is_missing(
self, mock_grpc_client, mock_google_crc32c
):
mock_google_crc32c.implementation = "python"

with pytest.raises(exceptions.NotFound) as exc_info:
AsyncMultiRangeDownloader(
mock_grpc_client, "bucket", "object"
)
AsyncMultiRangeDownloader(mock_grpc_client, "bucket", "object")

assert "The google-crc32c package is not installed with C support" in str(exc_info.value)
assert "The google-crc32c package is not installed with C support" in str(
exc_info.value
)

@pytest.mark.asyncio
@mock.patch("google.cloud.storage._experimental.asyncio.async_multi_range_downloader.Checksum")
@mock.patch("google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client")
async def test_download_ranges_raises_on_checksum_mismatch(self, mock_client, mock_checksum_class):
mock_stream = mock.AsyncMock(spec=async_read_object_stream._AsyncReadObjectStream)
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_multi_range_downloader.Checksum"
)
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
async def test_download_ranges_raises_on_checksum_mismatch(
self, mock_client, mock_checksum_class
):
mock_stream = mock.AsyncMock(
spec=async_read_object_stream._AsyncReadObjectStream
)

test_data = b"some-data"
server_checksum = 12345
Expand All @@ -299,9 +311,7 @@ async def test_download_ranges_raises_on_checksum_mismatch(self, mock_client, mo

mock_stream.recv.side_effect = [mock_response, None]

mrd = AsyncMultiRangeDownloader(
mock_client, "bucket", "object"
)
mrd = AsyncMultiRangeDownloader(mock_client, "bucket", "object")
mrd.read_obj_str = mock_stream
mrd._is_stream_open = True

Expand All @@ -310,4 +320,3 @@ async def test_download_ranges_raises_on_checksum_mismatch(self, mock_client, mo

assert "Checksum mismatch" in str(exc_info.value)
mock_checksum_class.assert_called_once_with(test_data)

140 changes: 122 additions & 18 deletions tests/unit/test__opentelemetry_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ def setup_optin(mock_os_environ):
importlib.reload(_opentelemetry_tracing)


@pytest.fixture()
def setup_optout(mock_os_environ):
"""Mock envar to opt-in tracing for storage client."""
mock_os_environ["ENABLE_GCS_PYTHON_CLIENT_OTEL_TRACES"] = "False"
importlib.reload(_opentelemetry_tracing)


def test_opentelemetry_not_installed(setup, monkeypatch):
monkeypatch.setitem(sys.modules, "opentelemetry", None)
importlib.reload(_opentelemetry_tracing)
Expand All @@ -83,6 +90,13 @@ def test_enable_trace_yield_span(setup, setup_optin):
assert span is not None


def test_disable_traces(setup, setup_optout):
assert _opentelemetry_tracing.HAS_OPENTELEMETRY
assert not _opentelemetry_tracing.enable_otel_traces
with _opentelemetry_tracing.create_trace_span("No-ops for opentelemetry") as span:
assert span is None


def test_enable_trace_call(setup, setup_optin):
from opentelemetry import trace as trace_api

Expand Down Expand Up @@ -136,7 +150,7 @@ def test_get_final_attributes(setup, setup_optin):
}
api_request = {
"method": "GET",
"path": "/foo/bar/baz",
"path": "/foo/bar/baz?sensitive=true",
"timeout": (100, 100),
}
retry_obj = api_retry.Retry()
Expand All @@ -147,15 +161,19 @@ def test_get_final_attributes(setup, setup_optin):
"rpc.system": "http",
"user_agent.original": f"gcloud-python/{__version__}",
"http.request.method": "GET",
"url.full": "https://testOtel.org/foo/bar/baz",
"connect_timeout,read_timeout": (100, 100),
"server.address": "testOtel.org",
"url.path": "/foo/bar/baz",
"url.scheme": "https",
"connect_timeout,read_timeout": str((100, 100)),
"retry": f"multiplier{retry_obj._multiplier}/deadline{retry_obj._deadline}/max{retry_obj._maximum}/initial{retry_obj._initial}/predicate{retry_obj._predicate}",
}
expected_attributes.update(_opentelemetry_tracing._cloud_trace_adoption_attrs)

with mock.patch("google.cloud.storage.client.Client") as test_client:
test_client.project = "test_project"
test_client._connection.API_BASE_URL = "https://testOtel.org"
test_client._connection.build_api_url.return_value = (
"https://testOtel.org/foo/bar/baz?sensitive=true"
)
with _opentelemetry_tracing.create_trace_span(
test_span_name,
attributes=test_span_attributes,
Expand All @@ -165,6 +183,7 @@ def test_get_final_attributes(setup, setup_optin):
) as span:
assert span is not None
assert span.name == test_span_name
assert "url.query" not in span.attributes
assert span.attributes == expected_attributes


Expand Down Expand Up @@ -196,23 +215,108 @@ def test_set_conditional_retry_attr(setup, setup_optin):
assert span.attributes == expected_attributes


def test_set_api_request_attr():
from google.cloud.storage import Client
def test__get_opentelemetry_attributes_from_url():
url = "https://example.com:8080/path?query=true"
expected = {
"server.address": "example.com",
"server.port": 8080,
"url.scheme": "https",
"url.path": "/path",
}
# Test stripping query
attrs = _opentelemetry_tracing._get_opentelemetry_attributes_from_url(
url, strip_query=True
)
assert attrs == expected
assert "url.query" not in attrs

# Test not stripping query
expected["url.query"] = "query=true"
attrs = _opentelemetry_tracing._get_opentelemetry_attributes_from_url(
url, strip_query=False
)
assert attrs == expected

test_client = Client()
args_method = {"method": "GET"}
expected_attributes = {"http.request.method": "GET"}
attr = _opentelemetry_tracing._set_api_request_attr(args_method, test_client)
assert attr == expected_attributes

args_path = {"path": "/foo/bar/baz"}
expected_attributes = {"url.full": "https://storage.googleapis.com/foo/bar/baz"}
attr = _opentelemetry_tracing._set_api_request_attr(args_path, test_client)
assert attr == expected_attributes
def test__get_opentelemetry_attributes_from_url_with_query():
url = "https://example.com/path?query=true&another=false"
expected = {
"server.address": "example.com",
"server.port": None,
"url.scheme": "https",
"url.path": "/path",
"url.query": "query=true&another=false",
}
# Test not stripping query
attrs = _opentelemetry_tracing._get_opentelemetry_attributes_from_url(
url, strip_query=False
)
assert attrs == expected

args_timeout = {"timeout": (100, 100)}

def test_set_api_request_attr_with_pii_in_query():
client = mock.Mock()
client._connection.build_api_url.return_value = (
"https://example.com/path?sensitive=true&token=secret"
)

request = {
"method": "GET",
"path": "/path?sensitive=true&token=secret",
"timeout": 60,
}
expected_attributes = {
"connect_timeout,read_timeout": (100, 100),
"http.request.method": "GET",
"server.address": "example.com",
"server.port": None,
"url.scheme": "https",
"url.path": "/path",
"connect_timeout,read_timeout": "60",
}
attr = _opentelemetry_tracing._set_api_request_attr(args_timeout, test_client)
attr = _opentelemetry_tracing._set_api_request_attr(request, client)
assert attr == expected_attributes
assert "url.query" not in attr # Ensure query with PII is not captured


def test_set_api_request_attr_no_timeout():
client = mock.Mock()
client._connection.build_api_url.return_value = "https://example.com/path"

request = {"method": "GET", "path": "/path"}
attr = _opentelemetry_tracing._set_api_request_attr(request, client)
assert "connect_timeout,read_timeout" not in attr


@pytest.mark.parametrize(
"env_value, default, expected",
[
# Test default values when env var is not set
(None, False, False),
(None, True, True),
# Test truthy values
("1", False, True),
("true", False, True),
("yes", False, True),
("on", False, True),
("TRUE", False, True),
(" Yes ", False, True),
# Test falsy values
("0", False, False),
("false", False, False),
("no", False, False),
("off", False, False),
("any_other_string", False, False),
("", False, False),
# Test with default=True and falsy values
("false", True, False),
("0", True, False),
],
)
def test__parse_bool_env(monkeypatch, env_value, default, expected):
env_var_name = "TEST_ENV_VAR"
monkeypatch.setenv(
env_var_name, str(env_value)
) if env_value is not None else monkeypatch.delenv(env_var_name, raising=False)

result = _opentelemetry_tracing._parse_bool_env(env_var_name, default)
assert result is expected
Loading