Skip to content

Commit de849d2

Browse files
feat: add odp event manager (#403)
* add odp event manager
1 parent 967471b commit de849d2

File tree

8 files changed

+843
-13
lines changed

8 files changed

+843
-13
lines changed

optimizely/helpers/enums.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,10 +120,11 @@ class Errors:
120120
NONE_VARIABLE_KEY_PARAMETER: Final = '"None" is an invalid value for variable key.'
121121
UNSUPPORTED_DATAFILE_VERSION: Final = (
122122
'This version of the Python SDK does not support the given datafile version: "{}".')
123-
INVALID_SEGMENT_IDENTIFIER = 'Audience segments fetch failed (invalid identifier).'
124-
FETCH_SEGMENTS_FAILED = 'Audience segments fetch failed ({}).'
125-
ODP_EVENT_FAILED = 'ODP event send failed ({}).'
126-
ODP_NOT_ENABLED = 'ODP is not enabled. '
123+
INVALID_SEGMENT_IDENTIFIER: Final = 'Audience segments fetch failed (invalid identifier).'
124+
FETCH_SEGMENTS_FAILED: Final = 'Audience segments fetch failed ({}).'
125+
ODP_EVENT_FAILED: Final = 'ODP event send failed ({}).'
126+
ODP_NOT_ENABLED: Final = 'ODP is not enabled.'
127+
ODP_NOT_INTEGRATED: Final = 'ODP is not integrated.'
127128

128129

129130
class ForcedDecisionLogs:
@@ -205,3 +206,11 @@ class OdpRestApiConfig:
205206
class OdpGraphQLApiConfig:
206207
"""ODP GraphQL API configs."""
207208
REQUEST_TIMEOUT: Final = 10
209+
210+
211+
class OdpEventManagerConfig:
212+
"""ODP Event Manager configs."""
213+
DEFAULT_QUEUE_CAPACITY: Final = 1000
214+
DEFAULT_BATCH_SIZE: Final = 10
215+
DEFAULT_FLUSH_INTERVAL: Final = 1
216+
DEFAULT_RETRY_COUNT: Final = 3

optimizely/helpers/validator.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from optimizely.event.event_processor import BaseEventProcessor
3232
from optimizely.helpers.event_tag_utils import EventTags
3333
from optimizely.optimizely_user_context import UserAttributes
34+
from optimizely.odp.odp_event import OdpDataDict
3435

3536

3637
def is_datafile_valid(datafile: Optional[str | bytes]) -> bool:
@@ -306,3 +307,8 @@ def are_values_same_type(first_val: Any, second_val: Any) -> bool:
306307
return True
307308

308309
return False
310+
311+
312+
def are_odp_data_types_valid(data: OdpDataDict) -> bool:
313+
valid_types = (str, int, float, bool, type(None))
314+
return all(isinstance(v, valid_types) for v in data.values())

optimizely/odp/odp_config.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,19 @@
1212
# limitations under the License.
1313

1414
from __future__ import annotations
15+
from enum import Enum
1516

1617
from typing import Optional
1718
from threading import Lock
1819

1920

21+
class OdpConfigState(Enum):
22+
"""State of the ODP integration."""
23+
UNDETERMINED = 1
24+
INTEGRATED = 2
25+
NOT_INTEGRATED = 3
26+
27+
2028
class OdpConfig:
2129
"""
2230
Contains configuration used for ODP integration.
@@ -37,6 +45,9 @@ def __init__(
3745
self._api_host = api_host
3846
self._segments_to_check = segments_to_check or []
3947
self.lock = Lock()
48+
self._odp_state = OdpConfigState.UNDETERMINED
49+
if self._api_host and self._api_key:
50+
self._odp_state = OdpConfigState.INTEGRATED
4051

4152
def update(self, api_key: Optional[str], api_host: Optional[str], segments_to_check: list[str]) -> bool:
4253
"""
@@ -51,8 +62,14 @@ def update(self, api_key: Optional[str], api_host: Optional[str], segments_to_ch
5162
Returns:
5263
True if the provided values were different than the existing values.
5364
"""
65+
5466
updated = False
5567
with self.lock:
68+
if api_key and api_host:
69+
self._odp_state = OdpConfigState.INTEGRATED
70+
else:
71+
self._odp_state = OdpConfigState.NOT_INTEGRATED
72+
5673
if self._api_key != api_key or self._api_host != api_host or self._segments_to_check != segments_to_check:
5774
self._api_key = api_key
5875
self._api_host = api_host
@@ -73,7 +90,7 @@ def get_segments_to_check(self) -> list[str]:
7390
with self.lock:
7491
return self._segments_to_check.copy()
7592

76-
def odp_integrated(self) -> bool:
77-
"""Returns True if ODP is integrated."""
93+
def odp_state(self) -> OdpConfigState:
94+
"""Returns the state of ODP integration (UNDETERMINED, INTEGRATED, or NOT_INTEGRATED)."""
7895
with self.lock:
79-
return self._api_key is not None and self._api_host is not None
96+
return self._odp_state

optimizely/odp/odp_event.py

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,47 @@
1313

1414
from __future__ import annotations
1515

16-
from typing import Any
16+
from typing import Any, Union, Dict
17+
import uuid
18+
import json
19+
from optimizely import version
20+
21+
OdpDataDict = Dict[str, Union[str, int, float, bool, None]]
1722

1823

1924
class OdpEvent:
2025
""" Representation of an odp event which can be sent to the Optimizely odp platform. """
2126

22-
def __init__(self, type: str, action: str,
23-
identifiers: dict[str, str], data: dict[str, Any]) -> None:
27+
def __init__(self, type: str, action: str, identifiers: dict[str, str], data: OdpDataDict) -> None:
2428
self.type = type
2529
self.action = action
2630
self.identifiers = identifiers
27-
self.data = data
31+
self.data = self._add_common_event_data(data)
32+
33+
def __repr__(self) -> str:
34+
return str(self.__dict__)
35+
36+
def __eq__(self, other: object) -> bool:
37+
if isinstance(other, OdpEvent):
38+
return self.__dict__ == other.__dict__
39+
elif isinstance(other, dict):
40+
return self.__dict__ == other
41+
else:
42+
return False
43+
44+
def _add_common_event_data(self, custom_data: OdpDataDict) -> OdpDataDict:
45+
data: OdpDataDict = {
46+
'idempotence_id': str(uuid.uuid4()),
47+
'data_source_type': 'sdk',
48+
'data_source': 'python-sdk',
49+
'data_source_version': version.__version__
50+
}
51+
data.update(custom_data)
52+
return data
53+
54+
55+
class OdpEventEncoder(json.JSONEncoder):
56+
def default(self, obj: object) -> Any:
57+
if isinstance(obj, OdpEvent):
58+
return obj.__dict__
59+
return json.JSONEncoder.default(self, obj)

optimizely/odp/odp_event_manager.py

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
# Copyright 2022, Optimizely
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
14+
from __future__ import annotations
15+
from enum import Enum
16+
from threading import Thread
17+
from typing import Optional
18+
import time
19+
from queue import Empty, Queue, Full
20+
21+
from optimizely import logger as _logging
22+
from .odp_event import OdpEvent, OdpDataDict
23+
from .odp_config import OdpConfig, OdpConfigState
24+
from .zaius_rest_api_manager import ZaiusRestApiManager
25+
from optimizely.helpers.enums import OdpEventManagerConfig, Errors
26+
27+
28+
class Signal(Enum):
29+
"""Enum for sending signals to the event queue."""
30+
SHUTDOWN = 1
31+
FLUSH = 2
32+
33+
34+
class OdpEventManager:
35+
"""
36+
Class that sends batches of ODP events.
37+
38+
The OdpEventManager maintains a single consumer thread that pulls events off of
39+
the queue and buffers them before events are sent to ODP.
40+
Sends events when the batch size is met or when the flush timeout has elapsed.
41+
"""
42+
43+
def __init__(
44+
self,
45+
odp_config: OdpConfig,
46+
logger: Optional[_logging.Logger] = None,
47+
api_manager: Optional[ZaiusRestApiManager] = None
48+
):
49+
"""OdpEventManager init method to configure event batching.
50+
51+
Args:
52+
odp_config: ODP integration config.
53+
logger: Optional component which provides a log method to log messages. By default nothing would be logged.
54+
api_manager: Optional component which sends events to ODP.
55+
"""
56+
self.logger = logger or _logging.NoOpLogger()
57+
self.zaius_manager = api_manager or ZaiusRestApiManager(self.logger)
58+
self.odp_config = odp_config
59+
self.event_queue: Queue[OdpEvent | Signal] = Queue(OdpEventManagerConfig.DEFAULT_QUEUE_CAPACITY)
60+
self.batch_size = OdpEventManagerConfig.DEFAULT_BATCH_SIZE
61+
self.flush_interval = OdpEventManagerConfig.DEFAULT_FLUSH_INTERVAL
62+
self._flush_deadline: float = 0
63+
self.retry_count = OdpEventManagerConfig.DEFAULT_RETRY_COUNT
64+
self._current_batch: list[OdpEvent] = []
65+
"""_current_batch should only be modified by the processing thread, as it is not thread safe"""
66+
self.thread = Thread(target=self._run, daemon=True)
67+
self.thread_exception = False
68+
"""thread_exception will be True if the processing thread did not exit cleanly"""
69+
70+
@property
71+
def is_running(self) -> bool:
72+
"""Property to check if consumer thread is alive or not."""
73+
return self.thread.is_alive()
74+
75+
def start(self) -> None:
76+
"""Starts the batch processing thread to batch events."""
77+
if self.is_running:
78+
self.logger.warning('ODP event queue already started.')
79+
return
80+
81+
self.thread.start()
82+
83+
def _run(self) -> None:
84+
"""Processes the event queue from a child thread. Events are batched until
85+
the batch size is met or until the flush timeout has elapsed.
86+
"""
87+
try:
88+
while True:
89+
timeout = self._get_queue_timeout()
90+
91+
try:
92+
item = self.event_queue.get(True, timeout)
93+
except Empty:
94+
item = None
95+
96+
if item == Signal.SHUTDOWN:
97+
self.logger.debug('ODP event queue: received shutdown signal.')
98+
break
99+
100+
elif item == Signal.FLUSH:
101+
self.logger.debug('ODP event queue: received flush signal.')
102+
self._flush_batch()
103+
self.event_queue.task_done()
104+
continue
105+
106+
elif isinstance(item, OdpEvent):
107+
self._add_to_batch(item)
108+
self.event_queue.task_done()
109+
110+
elif len(self._current_batch) > 0:
111+
self.logger.debug('ODP event queue: flushing on interval.')
112+
self._flush_batch()
113+
114+
except Exception as exception:
115+
self.thread_exception = True
116+
self.logger.error(f'Uncaught exception processing ODP events. Error: {exception}')
117+
118+
finally:
119+
self.logger.info('Exiting ODP event processing loop. Attempting to flush pending events.')
120+
self._flush_batch()
121+
if item == Signal.SHUTDOWN:
122+
self.event_queue.task_done()
123+
124+
def flush(self) -> None:
125+
"""Adds flush signal to event_queue."""
126+
try:
127+
self.event_queue.put_nowait(Signal.FLUSH)
128+
except Full:
129+
self.logger.error("Error flushing ODP event queue")
130+
131+
def _flush_batch(self) -> None:
132+
"""Flushes current batch by dispatching event.
133+
Should only be called by the processing thread."""
134+
batch_len = len(self._current_batch)
135+
if batch_len == 0:
136+
self.logger.debug('ODP event queue: nothing to flush.')
137+
return
138+
139+
api_key = self.odp_config.get_api_key()
140+
api_host = self.odp_config.get_api_host()
141+
142+
if not api_key or not api_host:
143+
self.logger.debug(Errors.ODP_NOT_INTEGRATED)
144+
self._current_batch.clear()
145+
return
146+
147+
self.logger.debug(f'ODP event queue: flushing batch size {batch_len}.')
148+
should_retry = False
149+
150+
for i in range(1 + self.retry_count):
151+
try:
152+
should_retry = self.zaius_manager.send_odp_events(api_key, api_host, self._current_batch)
153+
except Exception as error:
154+
should_retry = False
155+
self.logger.error(Errors.ODP_EVENT_FAILED.format(f'Error: {error} {self._current_batch}'))
156+
157+
if not should_retry:
158+
break
159+
if i < self.retry_count:
160+
self.logger.debug('Error dispatching ODP events, scheduled to retry.')
161+
162+
if should_retry:
163+
self.logger.error(Errors.ODP_EVENT_FAILED.format(f'Failed after {i} retries: {self._current_batch}'))
164+
165+
self._current_batch.clear()
166+
167+
def _add_to_batch(self, odp_event: OdpEvent) -> None:
168+
"""Appends received ODP event to current batch, flushing if batch is greater than batch size.
169+
Should only be called by the processing thread."""
170+
if not self._current_batch:
171+
self._set_flush_deadline()
172+
173+
self._current_batch.append(odp_event)
174+
if len(self._current_batch) >= self.batch_size:
175+
self.logger.debug('ODP event queue: flushing on batch size.')
176+
self._flush_batch()
177+
178+
def _set_flush_deadline(self) -> None:
179+
"""Sets time that next flush will occur."""
180+
self._flush_deadline = time.time() + self.flush_interval
181+
182+
def _get_time_till_flush(self) -> float:
183+
"""Returns seconds until next flush; no less than 0."""
184+
return max(0, self._flush_deadline - time.time())
185+
186+
def _get_queue_timeout(self) -> Optional[float]:
187+
"""Returns seconds until next flush or None if current batch is empty."""
188+
if len(self._current_batch) == 0:
189+
return None
190+
return self._get_time_till_flush()
191+
192+
def stop(self) -> None:
193+
"""Flushes and then stops ODP event queue."""
194+
try:
195+
self.event_queue.put_nowait(Signal.SHUTDOWN)
196+
except Full:
197+
self.logger.error('Error stopping ODP event queue.')
198+
return
199+
200+
self.logger.warning('Stopping ODP event queue.')
201+
202+
if self.is_running:
203+
self.thread.join()
204+
205+
if len(self._current_batch) > 0:
206+
self.logger.error(Errors.ODP_EVENT_FAILED.format(self._current_batch))
207+
208+
if self.is_running:
209+
self.logger.error('Error stopping ODP event queue.')
210+
211+
def send_event(self, type: str, action: str, identifiers: dict[str, str], data: OdpDataDict) -> None:
212+
"""Create OdpEvent and add it to the event queue."""
213+
odp_state = self.odp_config.odp_state()
214+
if odp_state == OdpConfigState.UNDETERMINED:
215+
self.logger.debug('ODP event queue: cannot send before the datafile has loaded.')
216+
return
217+
218+
if odp_state == OdpConfigState.NOT_INTEGRATED:
219+
self.logger.debug(Errors.ODP_NOT_INTEGRATED)
220+
return
221+
222+
self.dispatch(OdpEvent(type, action, identifiers, data))
223+
224+
def dispatch(self, event: OdpEvent) -> None:
225+
"""Add OdpEvent to the event queue."""
226+
if self.thread_exception:
227+
self.logger.error(Errors.ODP_EVENT_FAILED.format('Queue is down'))
228+
return
229+
230+
if not self.is_running:
231+
self.logger.warning('ODP event queue is shutdown, not accepting events.')
232+
return
233+
234+
try:
235+
self.logger.debug('ODP event queue: adding event.')
236+
self.event_queue.put_nowait(event)
237+
except Full:
238+
self.logger.warning(Errors.ODP_EVENT_FAILED.format("Queue is full"))

0 commit comments

Comments
 (0)