Skip to content

Commit 4d5c009

Browse files
committed
python 2.7
1 parent b7adeb5 commit 4d5c009

File tree

6 files changed

+352
-68
lines changed

6 files changed

+352
-68
lines changed
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
from __future__ import absolute_import
2+
3+
from sentry_sdk import Hub
4+
from sentry_sdk.consts import OP
5+
from sentry_sdk.utils import capture_internal_exceptions, logger
6+
from sentry_sdk.integrations import Integration, DidNotEnable
7+
8+
from sentry_sdk._types import MYPY
9+
10+
if MYPY:
11+
from typing import Any, Sequence
12+
from sentry_sdk.tracing import Span
13+
14+
_SINGLE_KEY_COMMANDS = frozenset(
15+
["decr", "decrby", "get", "incr", "incrby", "pttl", "set", "setex", "setnx", "ttl"]
16+
)
17+
_MULTI_KEY_COMMANDS = frozenset(["del", "touch", "unlink"])
18+
19+
#: Trim argument lists to this many values
20+
_MAX_NUM_ARGS = 10
21+
22+
23+
def _set_pipeline_data(
24+
span, is_cluster, get_command_args_fn, is_transaction, command_stack
25+
):
26+
# type: (Span, bool, Any, bool, Sequence[Any]) -> None
27+
span.set_tag("redis.is_cluster", is_cluster)
28+
transaction = is_transaction if not is_cluster else False
29+
span.set_tag("redis.transaction", transaction)
30+
31+
commands = []
32+
for i, arg in enumerate(command_stack):
33+
if i > _MAX_NUM_ARGS:
34+
break
35+
command_args = []
36+
for j, command_arg in enumerate(get_command_args_fn(arg)):
37+
if j > 0:
38+
command_arg = repr(command_arg)
39+
command_args.append(command_arg)
40+
commands.append(" ".join(command_args))
41+
42+
span.set_data(
43+
"redis.commands",
44+
{"count": len(command_stack), "first_ten": commands},
45+
)
46+
47+
48+
def patch_redis_pipeline(pipeline_cls, is_cluster, get_command_args_fn):
49+
# type: (Any, bool, Any) -> None
50+
old_execute = pipeline_cls.execute
51+
52+
def sentry_patched_execute(self, *args, **kwargs):
53+
# type: (Any, *Any, **Any) -> Any
54+
hub = Hub.current
55+
56+
if hub.get_integration(RedisIntegration) is None:
57+
return old_execute(self, *args, **kwargs)
58+
59+
with hub.start_span(
60+
op=OP.DB_REDIS, description="redis.pipeline.execute"
61+
) as span:
62+
with capture_internal_exceptions():
63+
_set_pipeline_data(
64+
span,
65+
is_cluster,
66+
get_command_args_fn,
67+
self.transaction,
68+
self.command_stack,
69+
)
70+
71+
return old_execute(self, *args, **kwargs)
72+
73+
pipeline_cls.execute = sentry_patched_execute
74+
75+
76+
def _get_redis_command_args(command):
77+
# type: (Any) -> Sequence[Any]
78+
return command[0]
79+
80+
81+
def _parse_rediscluster_command(command):
82+
# type: (Any) -> Sequence[Any]
83+
return command.args
84+
85+
86+
def _patch_redis(redis):
87+
# type: (Any) -> None
88+
patch_redis_client(redis.StrictRedis, is_cluster=False)
89+
patch_redis_pipeline(redis.client.Pipeline, False, _get_redis_command_args)
90+
try:
91+
strict_pipeline = redis.client.StrictPipeline
92+
except AttributeError:
93+
pass
94+
else:
95+
patch_redis_pipeline(strict_pipeline, False, _get_redis_command_args)
96+
try:
97+
import redis.asyncio # type: ignore
98+
except ImportError:
99+
pass
100+
else:
101+
from sentry_sdk.integrations.redis.asyncio import (
102+
patch_redis_async_client,
103+
patch_redis_async_pipeline,
104+
)
105+
106+
patch_redis_async_client(redis.asyncio.client.StrictRedis)
107+
patch_redis_async_pipeline(redis.asyncio.client.Pipeline)
108+
109+
110+
def _patch_rb():
111+
# type: () -> None
112+
try:
113+
import rb.clients # type: ignore
114+
except ImportError:
115+
pass
116+
else:
117+
patch_redis_client(rb.clients.FanoutClient, is_cluster=False)
118+
patch_redis_client(rb.clients.MappingClient, is_cluster=False)
119+
patch_redis_client(rb.clients.RoutingClient, is_cluster=False)
120+
121+
122+
def _patch_rediscluster():
123+
# type: () -> None
124+
try:
125+
import rediscluster # type: ignore
126+
except ImportError:
127+
return
128+
129+
patch_redis_client(rediscluster.RedisCluster, is_cluster=True)
130+
131+
# up to v1.3.6, __version__ attribute is a tuple
132+
# from v2.0.0, __version__ is a string and VERSION a tuple
133+
version = getattr(rediscluster, "VERSION", rediscluster.__version__)
134+
135+
# StrictRedisCluster was introduced in v0.2.0 and removed in v2.0.0
136+
# https://github.com/Grokzen/redis-py-cluster/blob/master/docs/release-notes.rst
137+
if (0, 2, 0) < version < (2, 0, 0):
138+
pipeline_cls = rediscluster.pipeline.StrictClusterPipeline
139+
patch_redis_client(rediscluster.StrictRedisCluster, is_cluster=True)
140+
else:
141+
pipeline_cls = rediscluster.pipeline.ClusterPipeline
142+
143+
patch_redis_pipeline(pipeline_cls, True, _parse_rediscluster_command)
144+
145+
146+
class RedisIntegration(Integration):
147+
identifier = "redis"
148+
149+
@staticmethod
150+
def setup_once():
151+
# type: () -> None
152+
try:
153+
import redis
154+
except ImportError:
155+
raise DidNotEnable("Redis client not installed")
156+
157+
_patch_redis(redis)
158+
_patch_rb()
159+
160+
try:
161+
_patch_rediscluster()
162+
except Exception:
163+
logger.exception("Error occurred while patching `rediscluster` library")
164+
165+
166+
def _get_span_description(name, *args):
167+
# type: (str, *Any) -> str
168+
description = name
169+
170+
with capture_internal_exceptions():
171+
description_parts = [name]
172+
for i, arg in enumerate(args):
173+
if i > _MAX_NUM_ARGS:
174+
break
175+
176+
description_parts.append(repr(arg))
177+
178+
description = " ".join(description_parts)
179+
180+
return description
181+
182+
183+
def _set_client_data(span, is_cluster, name, *args):
184+
# type: (Span, bool, str, *Any) -> None
185+
span.set_tag("redis.is_cluster", is_cluster)
186+
if name:
187+
span.set_tag("redis.command", name)
188+
189+
if name and args:
190+
name_low = name.lower()
191+
if (name_low in _SINGLE_KEY_COMMANDS) or (
192+
name_low in _MULTI_KEY_COMMANDS and len(args) == 1
193+
):
194+
span.set_tag("redis.key", args[0])
195+
196+
197+
def patch_redis_client(cls, is_cluster):
198+
# type: (Any, bool) -> None
199+
"""
200+
This function can be used to instrument custom redis client classes or
201+
subclasses.
202+
"""
203+
old_execute_command = cls.execute_command
204+
205+
def sentry_patched_execute_command(self, name, *args, **kwargs):
206+
# type: (Any, str, *Any, **Any) -> Any
207+
hub = Hub.current
208+
209+
if hub.get_integration(RedisIntegration) is None:
210+
return old_execute_command(self, name, *args, **kwargs)
211+
212+
description = _get_span_description(name, *args)
213+
214+
with hub.start_span(op=OP.DB_REDIS, description=description) as span:
215+
_set_client_data(span, is_cluster, name, *args)
216+
217+
return old_execute_command(self, name, *args, **kwargs)
218+
219+
cls.execute_command = sentry_patched_execute_command
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
from __future__ import absolute_import
2+
3+
from sentry_sdk import Hub
4+
from sentry_sdk.consts import OP
5+
from sentry_sdk.utils import capture_internal_exceptions
6+
from sentry_sdk.integrations.redis import (
7+
RedisIntegration,
8+
_get_redis_command_args,
9+
_get_span_description,
10+
_set_client_data,
11+
_set_pipeline_data,
12+
)
13+
14+
15+
from sentry_sdk._types import MYPY
16+
17+
if MYPY:
18+
from typing import Any
19+
20+
21+
def patch_redis_async_pipeline(pipeline_cls):
22+
# type: (Any) -> None
23+
old_execute = pipeline_cls.execute
24+
25+
async def _sentry_execute(self, *args, **kwargs):
26+
# type: (Any, *Any, **Any) -> Any
27+
hub = Hub.current
28+
29+
if hub.get_integration(RedisIntegration) is None:
30+
return await old_execute(self, *args, **kwargs)
31+
32+
with hub.start_span(
33+
op=OP.DB_REDIS, description="redis.pipeline.execute"
34+
) as span:
35+
with capture_internal_exceptions():
36+
_set_pipeline_data(
37+
span,
38+
False,
39+
_get_redis_command_args,
40+
self.is_transaction,
41+
self.command_stack,
42+
)
43+
44+
return await old_execute(self, *args, **kwargs)
45+
46+
pipeline_cls.execute = _sentry_execute
47+
48+
49+
def patch_redis_async_client(cls):
50+
# type: (Any) -> None
51+
old_execute_command = cls.execute_command
52+
53+
async def _sentry_execute_command(self, name, *args, **kwargs):
54+
# type: (Any, str, *Any, **Any) -> Any
55+
hub = Hub.current
56+
57+
if hub.get_integration(RedisIntegration) is None:
58+
return await old_execute_command(self, name, *args, **kwargs)
59+
60+
description = _get_span_description(name, *args)
61+
62+
with hub.start_span(op=OP.DB_REDIS, description=description) as span:
63+
_set_client_data(span, False, name, *args)
64+
65+
return await old_execute_command(self, name, *args, **kwargs)
66+
67+
cls.execute_command = _sentry_execute_command
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
import pytest
2+
3+
pytest.importorskip("fakeredis.aioredis")
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import pytest
2+
3+
from sentry_sdk import capture_message, start_transaction
4+
from sentry_sdk.integrations.redis import RedisIntegration
5+
6+
from fakeredis.aioredis import FakeRedis
7+
8+
9+
@pytest.mark.asyncio
10+
async def test_async_basic(sentry_init, capture_events):
11+
sentry_init(integrations=[RedisIntegration()])
12+
events = capture_events()
13+
14+
connection = FakeRedis()
15+
16+
await connection.get("foobar")
17+
capture_message("hi")
18+
19+
(event,) = events
20+
(crumb,) = event["breadcrumbs"]["values"]
21+
22+
assert crumb == {
23+
"category": "redis",
24+
"message": "GET 'foobar'",
25+
"data": {
26+
"redis.key": "foobar",
27+
"redis.command": "GET",
28+
"redis.is_cluster": False,
29+
},
30+
"timestamp": crumb["timestamp"],
31+
"type": "redis",
32+
}
33+
34+
35+
@pytest.mark.parametrize("is_transaction", [False, True])
36+
@pytest.mark.asyncio
37+
async def test_async_redis_pipeline(sentry_init, capture_events, is_transaction):
38+
sentry_init(integrations=[RedisIntegration()], traces_sample_rate=1.0)
39+
events = capture_events()
40+
41+
connection = FakeRedis()
42+
with start_transaction():
43+
44+
pipeline = connection.pipeline(transaction=is_transaction)
45+
pipeline.get("foo")
46+
pipeline.set("bar", 1)
47+
pipeline.set("baz", 2)
48+
await pipeline.execute()
49+
50+
(event,) = events
51+
(span,) = event["spans"]
52+
assert span["op"] == "db.redis"
53+
assert span["description"] == "redis.pipeline.execute"
54+
assert span["data"] == {
55+
"redis.commands": {
56+
"count": 3,
57+
"first_ten": ["GET 'foo'", "SET 'bar' 1", "SET 'baz' 2"],
58+
}
59+
}
60+
assert span["tags"] == {
61+
"redis.transaction": is_transaction,
62+
"redis.is_cluster": False,
63+
}

0 commit comments

Comments
 (0)