Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,122 @@ def wrap(cls, msg_id: _pulsar.MessageId):
self._msg_id = msg_id
return self


class EncryptionKey:
"""
The key used for encryption.
"""

def __init__(self, key: _pulsar.EncryptionKey):
"""
Create EncryptionKey instance.

Parameters
----------
key: _pulsar.EncryptionKey
The underlying EncryptionKey instance from the C extension.
"""
self._key = key

@property
def key(self) -> str:
"""
Returns the key, which is usually the key file's name.
"""
return self._key.key

@property
def value(self) -> bytes:
"""
Returns the value, which is usually the key bytes used for encryption.
"""
return self._key.value()

@property
def metadata(self) -> dict:
"""
Returns the metadata associated with the key.
"""
return self._key.metadata

def __str__(self) -> str:
return f"EncryptionKey(key={self.key}, value_len={len(self.value)}, metadata={self.metadata})"

def __repr__(self) -> str:
return self.__str__()


class EncryptionContext:
"""
It contains encryption and compression information in it using which application can decrypt
consumed message with encrypted-payload.
"""

def __init__(self, context: _pulsar.EncryptionContext):
"""
Create EncryptionContext instance.

Parameters
----------
key: _pulsar.EncryptionContext
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter name in the docstring is incorrect. It says "key: _pulsar.EncryptionContext" but should be "context: _pulsar.EncryptionContext" to match the actual parameter name.

Suggested change
key: _pulsar.EncryptionContext
context: _pulsar.EncryptionContext

Copilot uses AI. Check for mistakes.
The underlying EncryptionContext instance from the C extension.
"""
self._context = context

def keys(self) -> List[EncryptionKey]:
"""
Returns all EncryptionKey instances when performing encryption.
"""
keys = self._context.keys()
return [EncryptionKey(key) for key in keys]

def param(self) -> bytes:
"""
Returns the encryption param bytes.
"""
return self._context.param()

def algorithm(self) -> str:
"""
Returns the encryption algorithm.
"""
return self._context.algorithm()

def compression_type(self) -> CompressionType:
"""
Returns the compression type of the message.
"""
return self._context.compression_type()

def uncompressed_message_size(self) -> int:
"""
Returns the uncompressed message size or 0 if the compression type is NONE.
"""
return self._context.uncompressed_message_size()

def batch_size(self) -> int:
"""
Returns the number of messages in the batch or -1 if the message is not batched.
"""
return self._context.batch_size()

def is_decryption_failed(self) -> bool:
"""
Returns whether decryption has failed for this message.
"""
return self._context.is_decryption_failed()

def __str__(self) -> str:
return f"EncryptionContext(algorithm={self.algorithm()}, " \
f"compression_type={self.compression_type().name}, " \
f"uncompressed_message_size={self.uncompressed_message_size()}, " \
f"is_decryption_failed={self.is_decryption_failed()}, " \
f"keys=[{', '.join(str(key) for key in self.keys())}])"

def __repr__(self) -> str:
return self.__str__()


class Message:
"""
Message objects are returned by a consumer, either by calling `receive` or
Expand Down Expand Up @@ -250,6 +366,15 @@ def producer_name(self) -> str:
"""
return self._message.producer_name()

def encryption_context(self) -> EncryptionContext:
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return type annotation should be Optional[EncryptionContext] instead of EncryptionContext since the method can return None when the message is not encrypted, as documented in the docstring and shown in the implementation.

Suggested change
def encryption_context(self) -> EncryptionContext:
def encryption_context(self) -> Optional[EncryptionContext]:

Copilot uses AI. Check for mistakes.
"""
Get the encryption context for this message or None if it's not encrypted.

It should be noted that the result should not be accessed after the current Message instance is deleted.
"""
context = self._message.encryption_context()
return None if context is None else EncryptionContext(context)

@staticmethod
def _wrap(_message):
self = Message()
Expand Down
17 changes: 16 additions & 1 deletion src/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,20 @@ void export_message(py::module_& m) {
})
.def_static("deserialize", &MessageId::deserialize);

class_<EncryptionKey>(m, "EncryptionKey")
.def_readonly("key", &EncryptionKey::key)
.def("value", [](const EncryptionKey& key) { return bytes(key.value); })
.def_readonly("metadata", &EncryptionKey::metadata);

class_<EncryptionContext>(m, "EncryptionContext")
.def("keys", &EncryptionContext::keys)
.def("param", [](const EncryptionContext& context) { return bytes(context.param()); })
.def("algorithm", &EncryptionContext::algorithm, return_value_policy::copy)
.def("compression_type", &EncryptionContext::compressionType)
.def("uncompressed_message_size", &EncryptionContext::uncompressedMessageSize)
.def("batch_size", &EncryptionContext::batchSize)
.def("is_decryption_failed", &EncryptionContext::isDecryptionFailed);

class_<Message>(m, "Message")
.def(init<>())
.def("properties", &Message::getProperties)
Expand All @@ -106,7 +120,8 @@ void export_message(py::module_& m) {
.def("redelivery_count", &Message::getRedeliveryCount)
.def("int_schema_version", &Message::getLongSchemaVersion)
.def("schema_version", &Message::getSchemaVersion, return_value_policy::copy)
.def("producer_name", &Message::getProducerName, return_value_policy::copy);
.def("producer_name", &Message::getProducerName, return_value_policy::copy)
.def("encryption_context", &Message::getEncryptionContext, return_value_policy::reference);

MessageBatch& (MessageBatch::*MessageBatchParseFromString)(const std::string& payload,
uint32_t batchSize) = &MessageBatch::parseFrom;
Expand Down
39 changes: 34 additions & 5 deletions tests/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ def test_producer_send(self):
consumer.acknowledge(msg)
print("receive from {}".format(msg.message_id()))
self.assertEqual(msg_id, msg.message_id())
self.assertIsNone(msg.encryption_context())
client.close()

def test_producer_access_mode_exclusive(self):
Expand Down Expand Up @@ -489,15 +490,37 @@ def test_encryption_failure(self):
client = Client(self.serviceUrl)
topic = "my-python-test-end-to-end-encryption-failure-" + str(time.time())
producer = client.create_producer(
topic=topic, encryption_key="client-rsa.pem", crypto_key_reader=crypto_key_reader
topic=topic, encryption_key="client-rsa.pem", crypto_key_reader=crypto_key_reader,
compression_type=CompressionType.LZ4
)
producer.send(b"msg-0")

enc_key = None
def verify_encryption_context(context: pulsar.EncryptionContext, failed: bool, batch_size: int):
nonlocal enc_key
keys = context.keys()
self.assertEqual(len(keys), 1)
key = keys[0]
self.assertEqual(key.key, "client-rsa.pem")
self.assertTrue(len(key.value) > 0)
if enc_key is None:
enc_key = key.value
else:
self.assertEqual(key.value, enc_key)
self.assertEqual(key.metadata, {})
self.assertTrue(len(context.param()) > 0)
Comment on lines +505 to +511
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertTrue(a > b) cannot provide an informative message. Using assertGreater(a, b) instead will give more informative messages.

Suggested change
self.assertTrue(len(key.value) > 0)
if enc_key is None:
enc_key = key.value
else:
self.assertEqual(key.value, enc_key)
self.assertEqual(key.metadata, {})
self.assertTrue(len(context.param()) > 0)
self.assertGreater(len(key.value), 0)
if enc_key is None:
enc_key = key.value
else:
self.assertEqual(key.value, enc_key)
self.assertEqual(key.metadata, {})
self.assertGreater(len(context.param()), 0)

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertTrue(a > b) cannot provide an informative message. Using assertGreater(a, b) instead will give more informative messages.

Suggested change
self.assertTrue(len(context.param()) > 0)
self.assertGreater(len(context.param()), 0)

Copilot uses AI. Check for mistakes.
self.assertEqual(context.algorithm(), "")
self.assertEqual(context.compression_type(), CompressionType.LZ4)
self.assertEqual(context.uncompressed_message_size(), len(b"msg-0"))
self.assertEqual(context.batch_size(), batch_size)
self.assertEqual(context.is_decryption_failed(), failed)

def verify_next_message(value: bytes):
consumer = client.subscribe(topic, subscription,
crypto_key_reader=crypto_key_reader)
msg = consumer.receive(3000)
self.assertEqual(msg.data(), value)
verify_encryption_context(msg.encryption_context(), False, -1)
consumer.acknowledge(msg)
consumer.close()

Expand All @@ -520,22 +543,28 @@ def verify_next_message(value: bytes):

producer.send(b"msg-2")
verify_next_message(b"msg-2") # msg-1 is skipped since the crypto failure action is DISCARD
producer.send_async(b"msg-3", None)
producer.send_async(b"msg-4", None)
producer.flush()

def verify_undecrypted_message(msg: pulsar.Message, i: int):
self.assertNotEqual(msg.data(), f"msg-{i}".encode())
self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}")
verify_encryption_context(msg.encryption_context(), True, 2 if i >= 3 else -1)

# Encrypted messages will be consumed since the crypto failure action is CONSUME
consumer = client.subscribe(topic, 'another-sub',
initial_position=InitialPosition.Earliest,
crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME)
for i in range(3):
msg = consumer.receive(3000)
self.assertNotEqual(msg.data(), f"msg-{i}".encode())
self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}")
verify_undecrypted_message(msg, i)
Comment on lines 559 to +561
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop iterates only 3 times but there are 5 messages sent (msg-0, msg-1, msg-2, msg-3, msg-4). This should iterate 5 times using range(5) to verify all messages including the batched messages msg-3 and msg-4.

Copilot uses AI. Check for mistakes.

reader = client.create_reader(topic, MessageId.earliest,
crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME)
for i in range(3):
msg = reader.read_next(3000)
self.assertNotEqual(msg.data(), f"msg-{i}".encode())
self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}")
verify_undecrypted_message(msg, i)
Comment on lines 565 to +567
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop iterates only 3 times but there are 5 messages sent (msg-0, msg-1, msg-2, msg-3, msg-4). This should iterate 5 times using range(5) to verify all messages including the batched messages msg-3 and msg-4.

Copilot uses AI. Check for mistakes.

client.close()

Expand Down
Loading