Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions google/cloud/storage/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
from google.cloud.storage.constants import REGIONAL_LEGACY_STORAGE_CLASS
from google.cloud.storage.constants import REGION_LOCATION_TYPE
from google.cloud.storage.constants import STANDARD_STORAGE_CLASS
from google.cloud.storage.ip_filter import IPFilter
from google.cloud.storage.notification import BucketNotification
from google.cloud.storage.notification import NONE_PAYLOAD_FORMAT
from google.cloud.storage.retry import DEFAULT_RETRY
Expand Down Expand Up @@ -88,6 +89,7 @@
_FROM_STRING_MESSAGE = (
"Bucket.from_string() is deprecated. " "Use Bucket.from_uri() instead."
)
_IP_FILTER_PROPERTY = "ipFilter"


def _blobs_page_start(iterator, page, response):
Expand Down Expand Up @@ -3887,6 +3889,59 @@ def generate_signed_url(
query_parameters=query_parameters,
)

@property
def ip_filter(self):
"""Retrieve or set the IP Filter configuration for this bucket.

See https://cloud.google.com/storage/docs/ip-filtering-overview and
https://cloud.google.com/storage/docs/json_api/v1/buckets#ipFilter

.. note::
The getter for this property returns an
:class:`~google.cloud.storage.ip_filter.IPFilter` object, which is a
structured representation of the bucket's IP filter configuration.
Modifying the returned object has no effect. To update the bucket's
IP filter, create and assign a new ``IPFilter`` object to this
property and then call
:meth:`~google.cloud.storage.bucket.Bucket.patch`.

.. code-block:: python

from google.cloud.storage.ip_filter import (
IPFilter,
PublicNetworkSource,
)

ip_filter = IPFilter()
ip_filter.mode = "Enabled"
ip_filter.public_network_source = PublicNetworkSource(
allowed_ip_cidr_ranges=["203.0.113.5/32"]
)
bucket.ip_filter = ip_filter
bucket.patch()

:setter: Set the IP Filter configuration for this bucket.
:getter: Gets the IP Filter configuration for this bucket.

:rtype: :class:`~google.cloud.storage.ip_filter.IPFilter` or ``NoneType``
:returns:
An ``IPFilter`` object representing the configuration, or ``None``
if no filter is configured.
"""
resource = self._properties.get(_IP_FILTER_PROPERTY)
if resource:
return IPFilter._from_api_resource(resource)
return None

@ip_filter.setter
def ip_filter(self, value):
if value is None:
self._patch_property(_IP_FILTER_PROPERTY, None)
elif isinstance(value, IPFilter):
self._patch_property(_IP_FILTER_PROPERTY, value._to_api_resource())
else:
self._patch_property(_IP_FILTER_PROPERTY, value)


class SoftDeletePolicy(dict):
"""Map a bucket's soft delete policy.
Expand Down
126 changes: 126 additions & 0 deletions google/cloud/storage/ip_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Copyright 2014 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""IP Filter configuration for Google Cloud Storage Buckets."""

from typing import Dict, Any, Optional, List

_MODE = "mode"
_PUBLIC_NETWORK_SOURCE = "publicNetworkSource"
_VPC_NETWORK_SOURCES = "vpcNetworkSources"
_ALLOWED_IP_CIDR_RANGES = "allowedIpCidrRanges"
_NETWORK = "network"
_ALLOW_ALL_SERVICE_AGENT_ACCESS = "allowAllServiceAgentAccess"

class PublicNetworkSource:
"""Represents a public network source for a GCS Bucket IP Filter.

:type allowed_ip_cidr_ranges: list(str) or None
:param allowed_ip_cidr_ranges: A list of public IPv4 or IPv6 ranges in
CIDR notation that are allowed to access
the bucket.
"""

def __init__(self, allowed_ip_cidr_ranges: Optional[List[str]] = None):
self.allowed_ip_cidr_ranges = allowed_ip_cidr_ranges or []

def _to_api_resource(self) -> Dict[str, Any]:
"""Serializes this object to a dictionary for API requests."""
return {_ALLOWED_IP_CIDR_RANGES: self.allowed_ip_cidr_ranges}


class VpcNetworkSource:
"""Represents a VPC network source for a GCS Bucket IP Filter.

:type network: str
:param network: The resource name of the VPC network.

:type allowed_ip_cidr_ranges: list(str) or None
:param allowed_ip_cidr_ranges: A list of IPv4 or IPv6 ranges in CIDR
notation allowed to access the bucket
from this VPC.
"""

def __init__(self, network: str, allowed_ip_cidr_ranges: Optional[List[str]] = None):
self.network = network
self.allowed_ip_cidr_ranges = allowed_ip_cidr_ranges or []

def _to_api_resource(self) -> Dict[str, Any]:
"""Serializes this object to a dictionary for API requests."""
return {
_NETWORK: self.network,
_ALLOWED_IP_CIDR_RANGES: self.allowed_ip_cidr_ranges,
}


class IPFilter:
"""Represents a GCS Bucket IP Filter configuration.

This class is a helper for constructing the IP Filter dictionary to be
assigned to a bucket's ``ip_filter`` property.
"""

def __init__(self):
self.mode: Optional[str] = None
"""str: The mode of the IP filter. Can be "Enabled" or "Disabled"."""

self.public_network_source: Optional[PublicNetworkSource] = None
"""(Optional) :class:`PublicNetworkSource`: The configuration for public sources."""

self.vpc_network_sources: List[VpcNetworkSource] = []
"""(Optional) list of :class:`VpcNetworkSource`: Configurations for VPC sources."""

self.allow_all_service_agent_access: Optional[bool] = None
"""(Optional) bool: If True, allows GCS service agents to bypass the filter."""

@classmethod
def _from_api_resource(cls, resource: Dict[str, Any]) -> "IPFilter":
"""Factory: creates an IPFilter instance from a server response."""
ip_filter = cls()
ip_filter.mode = resource.get(_MODE)
ip_filter.allow_all_service_agent_access = resource.get(
_ALLOW_ALL_SERVICE_AGENT_ACCESS
)

pns_res = resource.get(_PUBLIC_NETWORK_SOURCE)
if pns_res:
ip_filter.public_network_source = PublicNetworkSource(
allowed_ip_cidr_ranges=pns_res.get(_ALLOWED_IP_CIDR_RANGES)
)

vns_res_list = resource.get(_VPC_NETWORK_SOURCES, [])
ip_filter.vpc_network_sources = [
VpcNetworkSource(
network=vns.get(_NETWORK),
allowed_ip_cidr_ranges=vns.get(_ALLOWED_IP_CIDR_RANGES),
)
for vns in vns_res_list
]
return ip_filter

def _to_api_resource(self) -> Dict[str, Any]:
"""Serializes this object to a dictionary for API requests."""
resource = {_MODE: self.mode}
if self.public_network_source:
resource[_PUBLIC_NETWORK_SOURCE] = self.public_network_source._to_api_resource()
if self.vpc_network_sources is not None:
resource[_VPC_NETWORK_SOURCES] = [
vns._to_api_resource() for vns in self.vpc_network_sources
]
resource[_ALLOW_ALL_SERVICE_AGENT_ACCESS] = (
self.allow_all_service_agent_access
if self.allow_all_service_agent_access is not None
else False
)
return resource
39 changes: 38 additions & 1 deletion tests/system/test_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

from google.api_core import exceptions
from . import _helpers

from google.cloud.storage.ip_filter import (
IPFilter,
PublicNetworkSource,
VpcNetworkSource,
)

def test_bucket_create_w_alt_storage_class(storage_client, buckets_to_delete):
from google.cloud.storage import constants
Expand Down Expand Up @@ -1299,3 +1303,36 @@ def test_new_bucket_with_hierarchical_namespace(
bucket = storage_client.create_bucket(bucket_obj)
buckets_to_delete.append(bucket)
assert bucket.hierarchical_namespace_enabled is True


def test_bucket_ip_filter(storage_client, buckets_to_delete):
"""Test setting and clearing IP filter configuration without enabling enforcement."""
bucket_name = _helpers.unique_name("ip-filter-control")
bucket = _helpers.retry_429_503(storage_client.create_bucket)(bucket_name)
buckets_to_delete.append(bucket)

ip_filter = IPFilter()
ip_filter.mode = "Disabled"
ip_filter.allow_all_service_agent_access = True
ip_filter.public_network_source = PublicNetworkSource(
allowed_ip_cidr_ranges=["203.0.113.10/32"]
)
ip_filter.vpc_network_sources.append(
VpcNetworkSource(
network=f"projects/{storage_client.project}/global/networks/default",
allowed_ip_cidr_ranges=["10.0.0.0/8"],
)
)
bucket.ip_filter = ip_filter
bucket.patch()

# Reload and verify the full configuration was set correctly.
bucket.reload()
reloaded_filter = bucket.ip_filter
assert reloaded_filter is not None
assert reloaded_filter.mode == "Disabled"
assert reloaded_filter.allow_all_service_agent_access is True
assert reloaded_filter.public_network_source.allowed_ip_cidr_ranges == [
"203.0.113.10/32"
]
assert len(reloaded_filter.vpc_network_sources) == 1
40 changes: 40 additions & 0 deletions tests/unit/test_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -4612,6 +4612,46 @@ def test_generate_signed_url_v4_w_incompatible_params(self):
virtual_hosted_style=True, bucket_bound_hostname="cdn.example.com"
)

def test_ip_filter_getter_unset(self):
"""Test that ip_filter is None when not set."""
bucket = self._make_one()
self.assertIsNone(bucket.ip_filter)

def test_ip_filter_getter_w_value(self):
"""Test getting an existing ip_filter configuration."""
from google.cloud.storage.ip_filter import IPFilter

ipf_property = {"mode": "Enabled"}
properties = {"ipFilter": ipf_property}
bucket = self._make_one(properties=properties)

ip_filter = bucket.ip_filter
self.assertIsInstance(ip_filter, IPFilter)
self.assertEqual(ip_filter.mode, "Enabled")

def test_ip_filter_setter(self):
"""Test setting the ip_filter with a helper class."""
from google.cloud.storage.ip_filter import IPFilter
from google.cloud.storage.bucket import _IP_FILTER_PROPERTY

bucket = self._make_one()
ip_filter = IPFilter()
ip_filter.mode = "Enabled"

bucket.ip_filter = ip_filter

self.assertIn(_IP_FILTER_PROPERTY, bucket._changes)
self.assertEqual(bucket._properties[_IP_FILTER_PROPERTY], {"mode": "Enabled", "vpcNetworkSources": [], 'allowAllServiceAgentAccess': False})

def test_ip_filter_setter_w_none(self):
"""Test clearing the ip_filter by setting it to None."""
from google.cloud.storage.bucket import _IP_FILTER_PROPERTY

bucket = self._make_one(properties={"ipFilter": {"mode": "Enabled"}})
bucket.ip_filter = None

self.assertIn(_IP_FILTER_PROPERTY, bucket._changes)
self.assertIsNone(bucket._properties.get(_IP_FILTER_PROPERTY))

class Test__item_to_notification(unittest.TestCase):
def _call_fut(self, iterator, item):
Expand Down
Loading