Skip to content

Commit 0a66168

Browse files
committed
refactor(redis): switch to redis-py
We have to vendor redis-py typing because types-redis has an incomplete redis.asyncio module typing. I proposed my typing here: python/typeshed#7820 The change also makes mypy discovers some un-catched typing issue.
1 parent a5a5008 commit 0a66168

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

73 files changed

+4750
-146
lines changed

mergify_engine/context.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,7 @@ async def clear_user_permission_cache_for_user(
620620
) -> None:
621621
await redis.hdel(
622622
cls._users_permission_cache_key_for_repo(owner["id"], repo["id"]),
623-
user["id"],
623+
str(user["id"]),
624624
)
625625

626626
@classmethod
@@ -655,7 +655,7 @@ async def get_user_permission(
655655
key = self._users_permission_cache_key
656656
cached_permission = typing.cast(
657657
typing.Optional[github_types.GitHubRepositoryPermission],
658-
await self.installation.redis.cache.hget(key, user["id"]),
658+
await self.installation.redis.cache.hget(key, str(user["id"])),
659659
)
660660
if cached_permission is None:
661661
permission = typing.cast(

mergify_engine/count_seats.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ def _add_user(user: github_types.GitHubAccount) -> None:
181181
transaction = await redis_cache.pipeline()
182182
for user_id, user_login in users.items():
183183
user_key = f"{user_id}~{user_login}"
184-
await transaction.zadd(repo_key, **{user_key: time.time()})
184+
await transaction.zadd(repo_key, {user_key: time.time()})
185185

186186
await transaction.execute()
187187

@@ -425,7 +425,7 @@ async def count_and_send(redis_cache: redis_utils.RedisCache) -> None:
425425

426426

427427
async def report(args: argparse.Namespace) -> None:
428-
redis_links = redis_utils.RedisLinks()
428+
redis_links = redis_utils.RedisLinks(name="report")
429429
if args.daemon:
430430
service.setup("count-seats")
431431
await count_and_send(redis_links.cache)

mergify_engine/debug.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,13 @@ async def report_dashboard_synchro(
7777
)
7878

7979

80-
async def report_worker_status(owner: github_types.GitHubLogin) -> None:
80+
async def report_worker_status(
81+
redis_links: redis_utils.RedisLinks, owner: github_types.GitHubLogin
82+
) -> None:
8183
stream_name = f"stream~{owner}".encode()
82-
redis_links = redis_utils.RedisLinks()
83-
streams = await redis_links.stream.zrangebyscore(
84+
streams: typing.List[
85+
typing.Tuple[bytes, float]
86+
] = await redis_links.stream.zrangebyscore(
8487
"streams", min=0, max="+inf", withscores=True
8588
)
8689

@@ -91,9 +94,13 @@ async def report_worker_status(owner: github_types.GitHubLogin) -> None:
9194
print("* WORKER: Installation not queued to process")
9295
return
9396

94-
planned = datetime.datetime.utcfromtimestamp(streams[pos]).isoformat()
97+
planned = datetime.datetime.utcfromtimestamp(streams[pos][1]).isoformat()
9598

96-
attempts = await redis_links.stream.hget("attempts", stream_name) or 0
99+
attempts_raw = await redis_links.stream.hget("attempts", stream_name)
100+
if attempts_raw is None:
101+
attempts = 0
102+
else:
103+
attempts = int(attempts)
97104
print(
98105
"* WORKER: Installation queued, "
99106
f" pos: {pos}/{len(streams)},"
@@ -170,7 +177,7 @@ def _url_parser(
170177
async def report(
171178
url: str,
172179
) -> typing.Union[context.Context, github.AsyncGithubInstallationClient, None]:
173-
redis_links = redis_utils.RedisLinks(max_idle_time=0)
180+
redis_links = redis_utils.RedisLinks(name="debug")
174181

175182
try:
176183
owner_login, repo, pull_number = _url_parser(url)
@@ -234,7 +241,7 @@ async def report(
234241
installation.installation["id"], db_sub, db_tokens, "DASHBOARD", slug
235242
)
236243

237-
await report_worker_status(owner_login)
244+
await report_worker_status(redis_links, owner_login)
238245

239246
if repo is not None:
240247
repository = await installation.get_repository_by_name(repo)
@@ -281,6 +288,7 @@ async def report(
281288
)
282289
except http.HTTPNotFound:
283290
print(f"Pull request `{url}` does not exist")
291+
await redis_links.shutdown_all()
284292
return client
285293

286294
# FIXME queues could also be printed if no pull number given
@@ -319,9 +327,10 @@ async def report(
319327
)
320328
print(f"[Summary]: success | {summary_title}")
321329
print("> " + "\n> ".join(summary.strip().split("\n")))
322-
330+
await redis_links.shutdown_all()
323331
return ctxt
324332

333+
await redis_links.shutdown_all()
325334
return client
326335

327336

mergify_engine/delayed_refresh.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ async def _set_current_refresh_datetime(
6161
) -> None:
6262
await repository.installation.redis.cache.zadd(
6363
DELAYED_REFRESH_KEY,
64-
**{_redis_key(repository, pull_number): at.timestamp()},
64+
{_redis_key(repository, pull_number): at.timestamp()},
6565
)
6666

6767

@@ -129,16 +129,18 @@ async def send(
129129
for subkey in keys:
130130
(
131131
owner_id_str,
132-
owner_login,
132+
owner_login_str,
133133
repository_id_str,
134-
repository_name,
134+
repository_name_str,
135135
pull_request_number_str,
136136
) = subkey.split("~")
137137
owner_id = github_types.GitHubAccountIdType(int(owner_id_str))
138138
repository_id = github_types.GitHubRepositoryIdType(int(repository_id_str))
139139
pull_request_number = github_types.GitHubPullRequestNumber(
140140
int(pull_request_number_str)
141141
)
142+
repository_name = github_types.GitHubRepositoryName(repository_name_str)
143+
owner_login = github_types.GitHubLogin(owner_login_str)
142144

143145
LOG.info(
144146
"sending delayed pull request refresh",

mergify_engine/exceptions.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import datetime
1616
import typing
1717

18-
import yaaredis
18+
from redis import exceptions as redis_exceptions
1919

2020
from mergify_engine.clients import http
2121

@@ -118,11 +118,11 @@ def need_retry(
118118
elif exception.response.status_code == 403:
119119
return datetime.timedelta(minutes=3)
120120

121-
elif isinstance(exception, yaaredis.exceptions.ResponseError):
121+
elif isinstance(exception, redis_exceptions.ResponseError):
122122
# Redis script bug or OOM
123123
return datetime.timedelta(minutes=1)
124124

125-
elif isinstance(exception, yaaredis.exceptions.ConnectionError):
125+
elif isinstance(exception, redis_exceptions.ConnectionError):
126126
# Redis down
127127
return datetime.timedelta(minutes=1)
128128

mergify_engine/migrations/__init__.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,12 @@ async def run(redis_cache: redis_utils.RedisCache) -> None:
3333
await _run_scripts("cache", redis_cache)
3434

3535

36-
async def _run_scripts(
37-
dirname: str, redis: typing.Union[redis_utils.RedisCache, redis_utils.RedisStream]
38-
) -> None:
39-
current_version = await redis.get(MIGRATION_STAMPS_KEY)
40-
if current_version is None:
36+
async def _run_scripts(dirname: str, redis: redis_utils.RedisCache) -> None:
37+
current_version_raw: typing.Optional[str] = await redis.get(MIGRATION_STAMPS_KEY)
38+
if current_version_raw is None:
4139
current_version = 0
4240
else:
43-
current_version = int(current_version)
41+
current_version = int(current_version_raw)
4442

4543
files = pkg_resources.resource_listdir(__name__, dirname)
4644
for script in sorted(files):

mergify_engine/queue/merge_train.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1354,7 +1354,7 @@ async def iter_trains(
13541354
await train.load(train_raw)
13551355
yield train
13561356

1357-
async def load(self, train_raw: typing.Optional[bytes] = None) -> None:
1357+
async def load(self, train_raw: typing.Optional[str] = None) -> None:
13581358
if train_raw is None:
13591359
train_raw = await self.repository.installation.redis.cache.hget(
13601360
self._get_redis_key(), self._get_redis_hash_key()

0 commit comments

Comments
 (0)