Skip to content

feat: support env variable fo influxdb v3 #138

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 81 additions & 3 deletions influxdb_client_3/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import importlib.util
import os
import urllib.parse
from typing import Any

import pyarrow as pa

Expand All @@ -8,11 +10,18 @@
from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point
from influxdb_client_3.write_client.client.exceptions import InfluxDBError
from influxdb_client_3.write_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, \
PointSettings
PointSettings, DefaultWriteOptions, WriteType
from influxdb_client_3.write_client.domain.write_precision import WritePrecision

polars = importlib.util.find_spec("polars") is not None

INFLUX_HOST = "INFLUX_HOST"
INFLUX_TOKEN = "INFLUX_TOKEN"
INFLUX_DATABASE = "INFLUX_DATABASE"
INFLUX_ORG = "INFLUX_ORG"
INFLUX_PRECISION = "INFLUX_PRECISION"
INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME"


def write_client_options(**kwargs):
"""
Expand Down Expand Up @@ -84,6 +93,27 @@ def _merge_options(defaults, exclude_keys=None, custom=None):
return _deep_merge(defaults, {key: value for key, value in custom.items() if key not in exclude_keys})


def _parse_precision(precision):
"""
Parses the precision value and ensures it is valid.

This function checks that the given `precision` is one of the allowed
values defined in `WritePrecision`. If the precision is invalid, it
raises a `ValueError`. The function returns the valid precision value
if it passes validation.

:param precision: The precision value to be validated.
Must be one of WritePrecision.NS, WritePrecision.MS,
WritePrecision.S, or WritePrecision.US.
:return: The valid precision value.
:rtype: WritePrecision
:raises ValueError: If the provided precision is not valid.
"""
if precision not in [WritePrecision.NS, WritePrecision.MS, WritePrecision.S, WritePrecision.US]:
raise ValueError(f"Invalid precision value: {precision}")
return precision


class InfluxDBClient3:
def __init__(
self,
Expand Down Expand Up @@ -137,8 +167,23 @@ def __init__(
self._org = org if org is not None else "default"
self._database = database
self._token = token
self._write_client_options = write_client_options if write_client_options is not None \
else default_client_options(write_options=SYNCHRONOUS)

write_type = DefaultWriteOptions.write_type.value
write_precision = DefaultWriteOptions.write_precision.value
if isinstance(write_client_options, dict) and write_client_options.get('write_options') is not None:
write_opts = write_client_options['write_options']
write_type = getattr(write_opts, 'write_type', write_type)
write_precision = getattr(write_opts, 'write_precision', write_precision)

write_options = WriteOptions(
write_type=write_type,
write_precision=write_precision,
)

self._write_client_options = {
"write_options": write_options,
**(write_client_options or {})
}

# Parse the host input
parsed_url = urllib.parse.urlparse(host)
Expand Down Expand Up @@ -179,6 +224,39 @@ def __init__(
flight_client_options=flight_client_options,
proxy=kwargs.get("proxy", None), options=q_opts_builder.build())

@classmethod
def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3':

required_vars = {
INFLUX_HOST: os.getenv(INFLUX_HOST),
INFLUX_TOKEN: os.getenv(INFLUX_TOKEN),
INFLUX_DATABASE: os.getenv(INFLUX_DATABASE)
}
missing_vars = [var for var, value in required_vars.items() if value is None or value == ""]
if missing_vars:
raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")

write_options = WriteOptions(write_type=WriteType.synchronous)

precision = os.getenv(INFLUX_PRECISION)
if precision is not None:
write_options.write_precision = _parse_precision(precision)

write_client_option = {'write_options': write_options}

if os.getenv(INFLUX_AUTH_SCHEME) is not None:
kwargs['auth_scheme'] = os.getenv(INFLUX_AUTH_SCHEME)

org = os.getenv(INFLUX_ORG, "default")
return InfluxDBClient3(
host=required_vars[INFLUX_HOST],
token=required_vars[INFLUX_TOKEN],
database=required_vars[INFLUX_DATABASE],
write_client_options=write_client_option,
org=org,
**kwargs
)

def write(self, record=None, database=None, **kwargs):
"""
Write data to InfluxDB.
Expand Down
2 changes: 1 addition & 1 deletion influxdb_client_3/write_client/client/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def _has_section(key: str):
profilers=profilers, proxy=proxy, **kwargs)

@classmethod
@deprecated('Use _from_env() instead.')
@deprecated('Use InfluxDBClient3.from_env() instead.')
def _from_env_properties(cls, debug=None, enable_gzip=False, **kwargs):
url = os.getenv('INFLUXDB_V2_URL', "http://localhost:8086")
token = os.getenv('INFLUXDB_V2_TOKEN', "my-token")
Expand Down
18 changes: 1 addition & 17 deletions influxdb_client_3/write_client/client/influxdb_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def from_config_file(cls, config_file: str = "config.ini", debug=None, enable_gz
return InfluxDBClient._from_config_file(config_file=config_file, debug=debug, enable_gzip=enable_gzip, **kwargs)

@classmethod
@deprecated('Use InfluxDBClient.from_env() instead.')
@deprecated('Use InfluxDBClient3.from_env() instead.')
def from_env_properties(cls, debug=None, enable_gzip=False, **kwargs):
"""
Configure client via environment properties.
Expand Down Expand Up @@ -203,22 +203,6 @@ def from_env_properties(cls, debug=None, enable_gzip=False, **kwargs):
"""
return InfluxDBClient._from_env_properties(debug=debug, enable_gzip=enable_gzip, **kwargs)

@classmethod
def from_env(cls, debug=None, enable_gzip=False, **kwargs):
"""
Creates an instance of the class using environment configuration variables.

This class method retrieves configuration variables from the system environment
and uses them to configure and initialize an instance of the class. This allows
for dynamic configuration of the client without the need for hardcoding values
or explicitly passing them during instantiation.

:param debug: Enable verbose logging of http requests
:param enable_gzip: Enable Gzip compression for http requests. Currently, only the "Write" and "Query" endpoints
supports the Gzip compression.
"""
return InfluxDBClient._from_env(debug=debug, enable_gzip=enable_gzip, **kwargs)

def write_api(self, write_options=WriteOptions(), point_settings=PointSettings(), **kwargs) -> WriteApi:
"""
Create Write API instance.
Expand Down
19 changes: 16 additions & 3 deletions influxdb_client_3/write_client/client/write_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
from reactivex.scheduler import ThreadPoolScheduler
from reactivex.subject import Subject

from influxdb_client_3.write_client.domain import WritePrecision
from influxdb_client_3.write_client.client._base import _BaseWriteApi, _HAS_DATACLASS
from influxdb_client_3.write_client.client.util.helpers import get_org_query_param
from influxdb_client_3.write_client.client.write.dataframe_serializer import DataframeSerializer
from influxdb_client_3.write_client.client.write.point import Point, DEFAULT_WRITE_PRECISION
from influxdb_client_3.write_client.client.write.retry import WritesRetry
from influxdb_client_3.write_client.domain import WritePrecision
from influxdb_client_3.write_client.rest import _UTF_8_encoding

logger = logging.getLogger('influxdb_client_3.write_client.client.write_api')
Expand All @@ -39,6 +39,11 @@ class WriteType(Enum):
synchronous = 3


class DefaultWriteOptions(Enum):
write_type = WriteType.synchronous
write_precision = WritePrecision.NS


class WriteOptions(object):
"""Write configuration."""

Expand All @@ -51,6 +56,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
max_retry_time=180_000,
exponential_base=2,
max_close_wait=300_000,
write_precision=DEFAULT_WRITE_PRECISION,
write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None:
"""
Create write api configuration.
Expand Down Expand Up @@ -80,6 +86,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
self.exponential_base = exponential_base
self.write_scheduler = write_scheduler
self.max_close_wait = max_close_wait
self.write_precision = write_precision

def to_retry_strategy(self, **kwargs):
"""
Expand Down Expand Up @@ -290,7 +297,7 @@ def write(self, bucket: str, org: str = None,
str, Iterable['str'], Point, Iterable['Point'], dict, Iterable['dict'], bytes, Iterable['bytes'],
Observable, NamedTuple, Iterable['NamedTuple'], 'dataclass', Iterable['dataclass']
] = None,
write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs) -> Any:
write_precision: WritePrecision = None, **kwargs) -> Any:
"""
Write time-series data into InfluxDB.

Expand Down Expand Up @@ -361,6 +368,9 @@ def write(self, bucket: str, org: str = None,

self._append_default_tags(record)

if write_precision is None:
write_precision = self._write_options.write_precision

if self._write_options.write_type is WriteType.batching:
return self._write_batching(bucket, org, record,
write_precision, **kwargs)
Expand Down Expand Up @@ -443,8 +453,11 @@ def __del__(self):
pass

def _write_batching(self, bucket, org, data,
precision=DEFAULT_WRITE_PRECISION,
precision=None,
**kwargs):
if precision is None:
precision = self._write_options.write_precision

if isinstance(data, bytes):
_key = _BatchItemKey(bucket, org, precision)
self._subject.on_next(_BatchItem(key=_key, data=data))
Expand Down
34 changes: 0 additions & 34 deletions tests/test_influxdb_client.py

This file was deleted.

68 changes: 67 additions & 1 deletion tests/test_influxdb_client_3.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import unittest
from unittest.mock import patch

from influxdb_client_3 import InfluxDBClient3
from influxdb_client_3 import InfluxDBClient3, WritePrecision, DefaultWriteOptions, Point, WriteOptions, WriteType
from tests.util import asyncio_run
from tests.util.mocks import ConstantFlightServer, ConstantData

Expand Down Expand Up @@ -74,6 +74,72 @@ async def test_query_async(self):
assert {'data': 'sql_query', 'reference': query, 'value': -1.0} in result_list
assert {'data': 'query_type', 'reference': 'sql', 'value': -1.0} in result_list

def test_write_api_custom_options_no_error(self):
write_options = WriteOptions(write_type=WriteType.batching)
write_client_option = {'write_options': write_options}
client = InfluxDBClient3(write_client_options=write_client_option)
try:
client._write_api._write_batching("bucket", "org", Point.measurement("test"), None)
self.assertTrue(True)
except Exception as e:
self.fail(f"Write API with default options raised an exception: {str(e)}")

def test_default_client(self):
expected_precision = DefaultWriteOptions.write_precision.value
expected_write_type = DefaultWriteOptions.write_type.value

def verify_client_write_options(c):
write_options = c._write_client_options.get('write_options')
self.assertEqual(write_options.write_precision, expected_precision)
self.assertEqual(write_options.write_type, expected_write_type)

self.assertEqual(c._write_api._write_options.write_precision, expected_precision)
self.assertEqual(c._write_api._write_options.write_type, expected_write_type)

env_client = InfluxDBClient3.from_env()
verify_client_write_options(env_client)

default_client = InfluxDBClient3()
verify_client_write_options(default_client)

@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
'INFLUX_DATABASE': 'test_db', 'INFLUX_ORG': 'test_org',
'INFLUX_PRECISION': WritePrecision.MS, 'INFLUX_AUTH_SCHEME': 'custom_scheme'})
def test_from_env_all_env_vars_set(self):
client = InfluxDBClient3.from_env()
self.assertIsInstance(client, InfluxDBClient3)
self.assertEqual(client._token, "test_token")
self.assertEqual(client._client.url, "https://localhost:443")
self.assertEqual(client._client.auth_header_value, f"custom_scheme {client._token}")
self.assertEqual(client._database, "test_db")
self.assertEqual(client._org, "test_org")

write_options = client._write_client_options.get("write_options")
self.assertEqual(write_options.write_precision, WritePrecision.MS)

client._write_api._point_settings = {}

@patch.dict('os.environ', {'INFLUX_HOST': "", 'INFLUX_TOKEN': "",
'INFLUX_DATABASE': "", 'INFLUX_ORG': ""})
def test_from_env_missing_variables(self):
with self.assertRaises(ValueError) as context:
InfluxDBClient3.from_env()
self.assertIn("Missing required environment variables", str(context.exception))

@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
'INFLUX_DATABASE': 'test_db', 'INFLUX_PRECISION': WritePrecision.MS})
def test_parse_valid_write_precision(self):
client = InfluxDBClient3.from_env()
self.assertIsInstance(client, InfluxDBClient3)
self.assertEqual(client._write_client_options.get('write_options').write_precision, WritePrecision.MS)

@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
'INFLUX_DATABASE': 'test_db', 'INFLUX_PRECISION': 'invalid_value'})
def test_parse_invalid_write_precision(self):
with self.assertRaises(ValueError) as context:
InfluxDBClient3.from_env()
self.assertIn("Invalid precision value: invalid_value", str(context.exception))


if __name__ == '__main__':
unittest.main()
Loading