Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion optimizely/helpers/sdk_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ def __init__(
segments_cache_timeout_in_secs: int = enums.OdpSegmentsCacheConfig.DEFAULT_TIMEOUT_SECS,
odp_segments_cache: Optional[OptimizelySegmentsCache] = None,
odp_segment_manager: Optional[OdpSegmentManager] = None,
odp_event_manager: Optional[OdpEventManager] = None
odp_event_manager: Optional[OdpEventManager] = None,
fetch_segments_timeout: Optional[int] = None,
odp_event_timeout: Optional[int] = None
) -> None:
"""
Args:
Expand All @@ -45,6 +47,8 @@ def __init__(
`fetch_qualified_segments(user_key, user_value, options)`.
odp_event_manager: A custom odp event manager. Required method is:
`send_event(type:, action:, identifiers:, data:)`
fetch_segments_timeout: A fetch segment timeout in seconds (optional).
odp_event_timeout: A send odp event timeout in seconds (optional).
"""

self.odp_disabled = odp_disabled
Expand All @@ -53,3 +57,5 @@ def __init__(
self.segments_cache = odp_segments_cache
self.odp_segment_manager = odp_segment_manager
self.odp_event_manager = odp_event_manager
self.fetch_segments_timeout = fetch_segments_timeout
self.odp_event_timeout = odp_event_timeout
9 changes: 7 additions & 2 deletions optimizely/odp/odp_event_api_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,19 @@ class OdpEventApiManager:
def __init__(self, logger: Optional[optimizely_logger.Logger] = None):
self.logger = logger or optimizely_logger.NoOpLogger()

def send_odp_events(self, api_key: str, api_host: str, events: list[OdpEvent]) -> bool:
def send_odp_events(self,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about passing the timeout value when initializing? We do not need to send timeout for every event. It's not expected we need event-by-event control. Also a custom OdpEventApiManager can have own timeout control.

api_key: str,
api_host: str,
events: list[OdpEvent],
odp_event_timeout: Optional[int] = None) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
odp_event_timeout: Optional[int] = None) -> bool:
timeout: Optional[int] = None) -> bool:

Seems like odp_event prefix is unnecessary in this context?

"""
Dispatch the event being represented by the OdpEvent object.

Args:
api_key: public api key
api_host: domain url of the host
events: list of odp events to be sent to optimizely's odp platform.
odp_event_timeout: event request timeout in seconds (Optional).

Returns:
retry is True - if network or server error (5xx), otherwise False
Expand All @@ -69,7 +74,7 @@ def send_odp_events(self, api_key: str, api_host: str, events: list[OdpEvent]) -
response = requests.post(url=url,
headers=request_headers,
data=payload_dict,
timeout=OdpEventApiConfig.REQUEST_TIMEOUT)
timeout=odp_event_timeout or OdpEventApiConfig.REQUEST_TIMEOUT)

response.raise_for_status()

Expand Down
6 changes: 5 additions & 1 deletion optimizely/odp/odp_event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(
self.odp_config: Optional[OdpConfig] = None
self.api_key: Optional[str] = None
self.api_host: Optional[str] = None
self.odp_event_timeout: Optional[int] = None

self.event_queue: Queue[OdpEvent | Signal] = Queue(OdpEventManagerConfig.DEFAULT_QUEUE_CAPACITY)
self.batch_size = OdpEventManagerConfig.DEFAULT_BATCH_SIZE
Expand Down Expand Up @@ -158,7 +159,10 @@ def _flush_batch(self) -> None:

for i in range(1 + self.retry_count):
try:
should_retry = self.api_manager.send_odp_events(self.api_key, self.api_host, self._current_batch)
should_retry = self.api_manager.send_odp_events(self.api_key,
self.api_host,
self._current_batch,
self.odp_event_timeout)
except Exception as error:
should_retry = False
self.logger.error(Errors.ODP_EVENT_FAILED.format(f'Error: {error} {self._current_batch}'))
Expand Down
6 changes: 5 additions & 1 deletion optimizely/odp/odp_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def __init__(
segments_cache: Optional[OptimizelySegmentsCache] = None,
segment_manager: Optional[OdpSegmentManager] = None,
event_manager: Optional[OdpEventManager] = None,
fetch_segments_timeout: Optional[int] = None,
odp_event_timeout: Optional[int] = None,
logger: Optional[optimizely_logger.Logger] = None
) -> None:

Expand All @@ -42,6 +44,7 @@ def __init__(

self.segment_manager = segment_manager
self.event_manager = event_manager
self.fetch_segments_timeout = fetch_segments_timeout

if not self.enabled:
self.logger.info('ODP is disabled.')
Expand All @@ -56,6 +59,7 @@ def __init__(
self.segment_manager = OdpSegmentManager(segments_cache, logger=self.logger)

self.event_manager = self.event_manager or OdpEventManager(self.logger)
self.event_manager.odp_event_timeout = odp_event_timeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we pass this in on initialization of the OdpEventManager?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Pass it down all the way to OdpEventApiManager and OdpSegmentApiManager?

self.segment_manager.odp_config = self.odp_config

def fetch_qualified_segments(self, user_id: str, options: list[str]) -> Optional[list[str]]:
Expand All @@ -66,7 +70,7 @@ def fetch_qualified_segments(self, user_id: str, options: list[str]) -> Optional
user_key = OdpManagerConfig.KEY_FOR_USER_ID
user_value = user_id

return self.segment_manager.fetch_qualified_segments(user_key, user_value, options)
return self.segment_manager.fetch_qualified_segments(user_key, user_value, options, self.fetch_segments_timeout)

def identify_user(self, user_id: str) -> None:
if not self.enabled or not self.event_manager:
Expand Down
5 changes: 3 additions & 2 deletions optimizely/odp/odp_segment_api_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ def __init__(self, logger: Optional[optimizely_logger.Logger] = None):
self.logger = logger or optimizely_logger.NoOpLogger()

def fetch_segments(self, api_key: str, api_host: str, user_key: str,
user_value: str, segments_to_check: list[str]) -> Optional[list[str]]:
user_value: str, segments_to_check: list[str],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here. we may keep the timeout constant locally instead of passing timeout for each api call.

fetch_segments_timeout: Optional[int] = None) -> Optional[list[str]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fetch_segments_timeout: Optional[int] = None) -> Optional[list[str]]:
timeout: Optional[int] = None) -> Optional[list[str]]:

"""
Fetch segments from ODP GraphQL API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing doc string for the new arg

Expand Down Expand Up @@ -151,7 +152,7 @@ def fetch_segments(self, api_key: str, api_host: str, user_key: str,
response = requests.post(url=url,
headers=request_headers,
data=payload_dict,
timeout=OdpSegmentApiConfig.REQUEST_TIMEOUT)
timeout=fetch_segments_timeout or OdpSegmentApiConfig.REQUEST_TIMEOUT)

response.raise_for_status()
response_dict = response.json()
Expand Down
6 changes: 3 additions & 3 deletions optimizely/odp/odp_segment_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ def __init__(
self.logger = logger or optimizely_logger.NoOpLogger()
self.api_manager = api_manager or OdpSegmentApiManager(self.logger)

def fetch_qualified_segments(self, user_key: str, user_value: str, options: list[str]
) -> Optional[list[str]]:
def fetch_qualified_segments(self, user_key: str, user_value: str, options: list[str],
fetch_segments_timeout: Optional[int]) -> Optional[list[str]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fetch_segments_timeout: Optional[int]) -> Optional[list[str]]:
timeout: Optional[int]) -> Optional[list[str]]:

fetch_segments is redundant in this context.

"""
Args:
user_key: The key for identifying the id type.
Expand Down Expand Up @@ -80,7 +80,7 @@ def fetch_qualified_segments(self, user_key: str, user_value: str, options: list
self.logger.debug('Making a call to ODP server.')

segments = self.api_manager.fetch_segments(odp_api_key, odp_api_host, user_key, user_value,
odp_segments_to_check)
odp_segments_to_check, fetch_segments_timeout)

if segments and not ignore_cache:
self.segments_cache.save(cache_key, segments)
Expand Down
2 changes: 2 additions & 0 deletions optimizely/optimizely.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ def __init__(
self.sdk_settings.segments_cache,
self.sdk_settings.odp_segment_manager,
self.sdk_settings.odp_event_manager,
self.sdk_settings.fetch_segments_timeout,
self.sdk_settings.odp_event_timeout,
self.logger
)

Expand Down
28 changes: 14 additions & 14 deletions tests/test_odp_event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class OdpEventManagerTest(BaseTest):
"key-3": 3.0,
"key-4": None,
"key-5": True
}
},
},
{
"type": "t2",
Expand Down Expand Up @@ -127,7 +127,7 @@ def test_odp_event_manager_batch(self, *args):
event_manager.send_event(**self.events[1])
event_manager.event_queue.join()

mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events)
mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events, None)
self.assertEqual(len(event_manager._current_batch), 0)
mock_logger.error.assert_not_called()
mock_logger.debug.assert_any_call('ODP event queue: flushing on batch size.')
Expand All @@ -151,7 +151,7 @@ def test_odp_event_manager_multiple_batches(self, *args):

self.assertEqual(mock_send.call_count, batch_count)
mock_send.assert_has_calls(
[mock.call(self.api_key, self.api_host, self.processed_events)] * batch_count
[mock.call(self.api_key, self.api_host, self.processed_events, None)] * batch_count
)

self.assertEqual(len(event_manager._current_batch), 0)
Expand Down Expand Up @@ -187,7 +187,7 @@ def test_odp_event_manager_backlog(self, *args):

self.assertEqual(mock_send.call_count, batch_count)
mock_send.assert_has_calls(
[mock.call(self.api_key, self.api_host, self.processed_events)] * batch_count
[mock.call(self.api_key, self.api_host, self.processed_events, None)] * batch_count
)

self.assertEqual(len(event_manager._current_batch), 0)
Expand All @@ -210,7 +210,7 @@ def test_odp_event_manager_flush(self, *args):
event_manager.flush()
event_manager.event_queue.join()

mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events)
mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events, None)
mock_logger.error.assert_not_called()
self.assertEqual(len(event_manager._current_batch), 0)
mock_logger.debug.assert_any_call('ODP event queue: received flush signal.')
Expand All @@ -233,7 +233,7 @@ def test_odp_event_manager_multiple_flushes(self, *args):

self.assertEqual(mock_send.call_count, flush_count)
for call in mock_send.call_args_list:
self.assertEqual(call, mock.call(self.api_key, self.api_host, self.processed_events))
self.assertEqual(call, mock.call(self.api_key, self.api_host, self.processed_events, None))
mock_logger.error.assert_not_called()

self.assertEqual(len(event_manager._current_batch), 0)
Expand All @@ -259,7 +259,7 @@ def test_odp_event_manager_retry_failure(self, *args):
event_manager.event_queue.join()

mock_send.assert_has_calls(
[mock.call(self.api_key, self.api_host, self.processed_events)] * number_of_tries
[mock.call(self.api_key, self.api_host, self.processed_events, None)] * number_of_tries
)
self.assertEqual(len(event_manager._current_batch), 0)
mock_logger.debug.assert_any_call('Error dispatching ODP events, scheduled to retry.')
Expand All @@ -281,7 +281,7 @@ def test_odp_event_manager_retry_success(self, *args):
event_manager.flush()
event_manager.event_queue.join()

mock_send.assert_has_calls([mock.call(self.api_key, self.api_host, self.processed_events)] * 3)
mock_send.assert_has_calls([mock.call(self.api_key, self.api_host, self.processed_events, None)] * 3)
self.assertEqual(len(event_manager._current_batch), 0)
mock_logger.debug.assert_any_call('Error dispatching ODP events, scheduled to retry.')
mock_logger.error.assert_not_called()
Expand All @@ -304,7 +304,7 @@ def test_odp_event_manager_send_failure(self, *args):
event_manager.flush()
event_manager.event_queue.join()

mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events)
mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events, None)
self.assertEqual(len(event_manager._current_batch), 0)
mock_logger.error.assert_any_call(f"ODP event send failed (Error: Unexpected error {self.processed_events}).")
self.assertStrictTrue(event_manager.is_running)
Expand Down Expand Up @@ -379,7 +379,7 @@ def test_odp_event_manager_override_default_data(self, *args):
event_manager.flush()
event_manager.event_queue.join()

mock_send.assert_called_once_with(self.api_key, self.api_host, [processed_event])
mock_send.assert_called_once_with(self.api_key, self.api_host, [processed_event], None)
event_manager.stop()

def test_odp_event_manager_flush_timeout(self, *args):
Expand All @@ -398,7 +398,7 @@ def test_odp_event_manager_flush_timeout(self, *args):

mock_logger.error.assert_not_called()
mock_logger.debug.assert_any_call('ODP event queue: flushing on interval.')
mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events)
mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events, None)
event_manager.stop()

def test_odp_event_manager_events_before_odp_ready(self, *args):
Expand Down Expand Up @@ -433,7 +433,7 @@ def test_odp_event_manager_events_before_odp_ready(self, *args):
mock.call('ODP event queue: received flush signal.'),
mock.call('ODP event queue: flushing batch size 2.')
])
mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events)
mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events, None)
event_manager.stop()

def test_odp_event_manager_events_before_odp_disabled(self, *args):
Expand Down Expand Up @@ -495,7 +495,7 @@ def test_odp_event_manager_disabled_after_init(self, *args):
mock.call(Errors.ODP_NOT_INTEGRATED)
])
self.assertEqual(len(event_manager._current_batch), 0)
mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events)
mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events, None)
event_manager.stop()

def test_odp_event_manager_disabled_after_events_in_queue(self, *args):
Expand Down Expand Up @@ -524,7 +524,7 @@ def test_odp_event_manager_disabled_after_events_in_queue(self, *args):
self.assertEqual(len(event_manager._current_batch), 0)
mock_logger.debug.assert_any_call(Errors.ODP_NOT_INTEGRATED)
mock_logger.error.assert_not_called()
mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events)
mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events, None)
event_manager.stop()

def test_send_event_before_config_set(self, *args):
Expand Down
8 changes: 4 additions & 4 deletions tests/test_odp_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ def test_fetch_qualified_segments(self):

mock_logger.debug.assert_not_called()
mock_logger.error.assert_not_called()
mock_fetch_qualif_segments.assert_called_once_with('fs_user_id', 'user1', [])
mock_fetch_qualif_segments.assert_called_once_with('fs_user_id', 'user1', [], None)

with mock.patch.object(segment_manager, 'fetch_qualified_segments') as mock_fetch_qualif_segments:
manager.fetch_qualified_segments('user1', ['IGNORE_CACHE'])

mock_logger.debug.assert_not_called()
mock_logger.error.assert_not_called()
mock_fetch_qualif_segments.assert_called_once_with('fs_user_id', 'user1', ['IGNORE_CACHE'])
mock_fetch_qualif_segments.assert_called_once_with('fs_user_id', 'user1', ['IGNORE_CACHE'], None)

def test_fetch_qualified_segments__disabled(self):
mock_logger = mock.MagicMock()
Expand All @@ -102,7 +102,7 @@ def test_fetch_qualified_segments__segment_mgr_is_none(self):
manager.fetch_qualified_segments('user1', [])

mock_logger.error.assert_not_called()
mock_fetch_qualif_segments.assert_called_once_with('fs_user_id', 'user1', [])
mock_fetch_qualif_segments.assert_called_once_with('fs_user_id', 'user1', [], None)

def test_fetch_qualified_segments__seg_cache_and_seg_mgr_are_none(self):
"""
Expand All @@ -118,7 +118,7 @@ def test_fetch_qualified_segments__seg_cache_and_seg_mgr_are_none(self):

mock_logger.debug.assert_not_called()
mock_logger.error.assert_not_called()
mock_fetch_qualif_segments.assert_called_once_with('fs_user_id', 'user1', [])
mock_fetch_qualif_segments.assert_called_once_with('fs_user_id', 'user1', [], None)

def test_identify_user_datafile_not_ready(self):
mock_logger = mock.MagicMock()
Expand Down
Loading