Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ services:
container_name: redis-standalone
environment:
- TLS_ENABLED=yes
- TLS_CLIENT_CNS=test_user
- REDIS_CLUSTER=no
- PORT=6379
- TLS_PORT=6666
Expand Down Expand Up @@ -56,6 +57,7 @@ services:
- NODES=6
- REPLICAS=1
- TLS_ENABLED=yes
- TLS_CLIENT_CNS=test_user
- PORT=16379
- TLS_PORT=27379
command: ${REDIS_EXTRA_ARGS:---enable-debug-command yes --enable-module-command yes --tls-auth-clients optional --save "" --tls-cluster yes}
Expand Down
10 changes: 10 additions & 0 deletions tests/ssl_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
import os
from collections import namedtuple

CN_USERNAME = "test_user"
CLIENT_CERT_NAME = "client.crt"
CLIENT_CN_CERT_NAME = f"{CN_USERNAME}.crt"
CLIENT_KEY_NAME = "client.key"
CLIENT_CN_KEY_NAME = f"{CN_USERNAME}.key"
SERVER_CERT_NAME = "redis.crt"
SERVER_KEY_NAME = "redis.key"
CA_CERT_NAME = "ca.crt"
Expand All @@ -12,6 +15,7 @@
class CertificateType(str, enum.Enum):
client = "client"
server = "server"
client_cn = "client-cn"


TLSFiles = namedtuple("TLSFiles", ["certfile", "keyfile", "ca_certfile"])
Expand Down Expand Up @@ -41,3 +45,9 @@ def get_tls_certificates(
os.path.join(cert_dir, SERVER_KEY_NAME),
os.path.join(cert_dir, CA_CERT_NAME),
)
elif cert_type == CertificateType.client_cn:
return TLSFiles(
os.path.join(cert_dir, CLIENT_CN_CERT_NAME),
os.path.join(cert_dir, CLIENT_CN_KEY_NAME),
os.path.join(cert_dir, CA_CERT_NAME),
)
40 changes: 40 additions & 0 deletions tests/test_asyncio/test_ssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import pytest
import pytest_asyncio
import redis.asyncio as redis
from tests.conftest import skip_if_server_version_lt
from tests.ssl_utils import get_tls_certificates, CertificateType, CN_USERNAME

# Skip test or not based on cryptography installation
try:
Expand Down Expand Up @@ -193,3 +195,41 @@ async def test_ssl_password_parameter(self, request):
assert conn.ssl_context.password == test_password
finally:
await r.aclose()

@skip_if_server_version_lt("8.5.0")
async def test_ssl_authenticate_with_client_cert(self, request, r):
"""Test that when client certificate is used for authentication,
the connection is created successfully"""

try:
# Non SSL client, to setup ACL
assert await r.acl_setuser(
CN_USERNAME,
enabled=True,
reset=True,
passwords=["+clientpass"],
keys=["*"],
commands=["+acl"],
)
finally:
await r.close()

ssl_url = request.config.option.redis_ssl_url
p = urlparse(ssl_url)[1].split(":")
client_cn_cert, client_cn_key, ca_cert = get_tls_certificates(
request.session.config.REDIS_INFO["tls_cert_subdir"],
CertificateType.client_cn,
)
r = redis.Redis(
host=p[0],
port=p[1],
ssl=True,
ssl_certfile=client_cn_cert,
ssl_keyfile=client_cn_key,
ssl_cert_reqs="required",
ssl_ca_certs=ca_cert,
)
try:
assert await r.acl_whoami() == CN_USERNAME
finally:
await r.close()
51 changes: 46 additions & 5 deletions tests/test_ssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
import redis
from redis.exceptions import ConnectionError, RedisError

from .conftest import skip_if_cryptography, skip_if_nocryptography
from .ssl_utils import CertificateType, get_tls_certificates
from .conftest import (
skip_if_cryptography,
skip_if_nocryptography,
skip_if_server_version_lt,
)
from .ssl_utils import CertificateType, get_tls_certificates, CN_USERNAME


@pytest.mark.ssl
Expand All @@ -20,10 +24,10 @@ class TestSSL:

@pytest.fixture(autouse=True)
def _set_ssl_certs(self, request):
tls_cert_subdir = request.session.config.REDIS_INFO["tls_cert_subdir"]
self.client_certs = get_tls_certificates(tls_cert_subdir)
self.tls_cert_subdir = request.session.config.REDIS_INFO["tls_cert_subdir"]
self.client_certs = get_tls_certificates(self.tls_cert_subdir)
self.server_certs = get_tls_certificates(
tls_cert_subdir, cert_type=CertificateType.server
self.tls_cert_subdir, cert_type=CertificateType.server
)

def test_ssl_with_invalid_cert(self, request):
Expand Down Expand Up @@ -425,3 +429,40 @@ def capture_context_wrap_socket(context_self, sock, **_kwargs):

finally:
r.close()

@skip_if_server_version_lt("8.5.0")
def test_ssl_authenticate_with_client_cert(self, request, r):
"""Test that when client certificate is used for authentication,
the connection is created successfully"""

try:
# Non SSL client, to setup ACL
assert r.acl_setuser(
CN_USERNAME,
enabled=True,
reset=True,
passwords=["+clientpass"],
keys=["*"],
commands=["+acl"],
)
finally:
r.close()

ssl_url = request.config.option.redis_ssl_url
p = urlparse(ssl_url)[1].split(":")
client_cn_cert, client_cn_key, ca_cert = get_tls_certificates(
self.tls_cert_subdir, CertificateType.client_cn
)
r = redis.Redis(
host=p[0],
port=p[1],
ssl=True,
ssl_certfile=client_cn_cert,
ssl_keyfile=client_cn_key,
ssl_cert_reqs="required",
ssl_ca_certs=ca_cert,
)
try:
assert r.acl_whoami() == CN_USERNAME
finally:
r.close()