@@ -2032,6 +2032,171 @@ def test_auto_prefix_trimming(self, with_failures, source_cluster_spec):
20322032 with self .producer_consumer (topic = topic .name , msg_size = 128 , msg_cnt = 100000 ):
20332033 self ._perform_auto_prefix_trimming (topic .name , partition_count )
20342034
2035+ @cluster (num_nodes = 7 )
2036+ @ignore (
2037+ with_failures = True ,
2038+ source_cluster_spec = SecondaryClusterSpec (
2039+ ServiceType .KAFKA , kafka_version = "3.8.0" , kafka_quorum = "COMBINED_KRAFT"
2040+ ),
2041+ )
2042+ @matrix (
2043+ with_failures = [True , False ],
2044+ source_cluster_spec = [
2045+ SecondaryClusterSpec (ServiceType .REDPANDA ),
2046+ SecondaryClusterSpec (
2047+ ServiceType .KAFKA , kafka_version = "3.8.0" , kafka_quorum = "COMBINED_KRAFT"
2048+ ),
2049+ ],
2050+ )
2051+ def test_start_offset_catch_up (self , with_failures , source_cluster_spec ):
2052+ """
2053+ Test that verifies shadow link can catch up to a source topic that has been
2054+ prefix-trimmed to its HWM (i.e., all data has been trimmed).
2055+
2056+ 1. Create a source topic with 5 partitions
2057+ 2. Write data to the topic across all partitions
2058+ 3. Trim the prefix of each partition of the source topic to the partition's HWM
2059+ 4. Create a new Shadow Link on the Shadow Cluster
2060+ 5. Wait for the shadow topic to be created on the Shadow Cluster
2061+ 6. Verify that the start offset and HWM of all shadow partitions match the source partitions
2062+ 7. Write data to the source partitions
2063+ 8. Verify that the shadow partitions replicate that data
2064+ """
2065+ partition_count = 5
2066+ topic = TopicSpec (
2067+ name = "source-topic" , partition_count = partition_count , replication_factor = 3
2068+ )
2069+ self .source_default_client ().create_topic (topic )
2070+
2071+ # Step 2: Write data to the topic across all partitions
2072+ initial_msg_count = 1000
2073+ KgoVerifierProducer .oneshot (
2074+ self .test_context ,
2075+ self .source_cluster .service ,
2076+ topic = topic .name ,
2077+ msg_size = 128 ,
2078+ msg_count = initial_msg_count ,
2079+ custom_node = self .preallocated_nodes ,
2080+ )
2081+
2082+ # Wait for all messages to be written (sum of HWMs across all partitions should equal msg_count)
2083+ def all_messages_written ():
2084+ total_hwm = 0
2085+ for part in self .source_cluster_rpk .describe_topic (topic .name ):
2086+ total_hwm += part .high_watermark or 0
2087+ return total_hwm >= initial_msg_count
2088+
2089+ self .source_cluster .service .wait_until (
2090+ all_messages_written ,
2091+ timeout_sec = 30 ,
2092+ backoff_sec = 1 ,
2093+ err_msg = f"Timed out waiting for { initial_msg_count } messages to be written" ,
2094+ )
2095+
2096+ # Step 3: Trim the prefix of each partition to its HWM
2097+ # First, collect the HWM for each partition
2098+ source_hwms : dict [int , int ] = {}
2099+ for part in self .source_cluster_rpk .describe_topic (topic .name ):
2100+ source_hwms [part .id ] = part .high_watermark
2101+ self .logger .info (f"Source partition { part .id } : HWM={ part .high_watermark } " )
2102+
2103+ # Trim each partition to its HWM
2104+ for part_id , hwm in source_hwms .items ():
2105+ self .logger .info (f"Trimming partition { part_id } to offset { hwm } " )
2106+ self .source_cluster_rpk .trim_prefix (
2107+ topic = topic .name , offset = hwm , partitions = [part_id ]
2108+ )
2109+
2110+ # Wait for the trim to take effect on all partitions
2111+ def all_partitions_trimmed ():
2112+ for part in self .source_cluster_rpk .describe_topic (topic .name ):
2113+ expected_offset = source_hwms [part .id ]
2114+ if (part .start_offset or 0 ) != expected_offset :
2115+ self .logger .debug (
2116+ f"Partition { part .id } : start_offset={ part .start_offset } , expected={ expected_offset } "
2117+ )
2118+ return False
2119+ return True
2120+
2121+ self .source_cluster .service .wait_until (
2122+ all_partitions_trimmed ,
2123+ timeout_sec = 30 ,
2124+ backoff_sec = 1 ,
2125+ err_msg = "Timed out waiting for prefix trim to take effect" ,
2126+ )
2127+
2128+ # Step 4: Create a new Shadow Link on the Shadow Cluster
2129+ with self ._maybe_failure_injector (with_failures ):
2130+ self .create_link ("test-link" )
2131+
2132+ # Step 5: Wait for the shadow topic to be created on the Shadow Cluster
2133+ self .target_cluster .service .wait_until (
2134+ lambda : self .topic_partitions_exists_in_target (topic ),
2135+ timeout_sec = 30 ,
2136+ backoff_sec = 1 ,
2137+ err_msg = f"Topic { topic .name } not found in target cluster" ,
2138+ )
2139+
2140+ # Step 6: Verify that the start offset and HWM of all shadow partitions match the source partitions
2141+ def shadow_partitions_match_source ():
2142+ target_parts = {
2143+ p .id : p for p in self .target_cluster_rpk .describe_topic (topic .name )
2144+ }
2145+ source_parts = {
2146+ p .id : p for p in self .source_cluster_rpk .describe_topic (topic .name )
2147+ }
2148+
2149+ if len (target_parts ) != partition_count :
2150+ self .logger .debug (
2151+ f"Target partition count mismatch: { len (target_parts )} != { partition_count } "
2152+ )
2153+ return False
2154+
2155+ for part_id in range (partition_count ):
2156+ if part_id not in target_parts or part_id not in source_parts :
2157+ return False
2158+
2159+ target_part = target_parts [part_id ]
2160+ source_part = source_parts [part_id ]
2161+
2162+ # Start offset should match
2163+ if target_part .start_offset != source_part .start_offset :
2164+ self .logger .debug (
2165+ f"Partition { part_id } : target start_offset={ target_part .start_offset } , "
2166+ f"source start_offset={ source_part .start_offset } "
2167+ )
2168+ return False
2169+
2170+ # HWM should match (both should be equal to start_offset since topic was trimmed to HWM)
2171+ if target_part .high_watermark != source_part .high_watermark :
2172+ self .logger .debug (
2173+ f"Partition { part_id } : target HWM={ target_part .high_watermark } , "
2174+ f"source HWM={ source_part .high_watermark } "
2175+ )
2176+ return False
2177+
2178+ return True
2179+
2180+ self .target_cluster .service .wait_until (
2181+ shadow_partitions_match_source ,
2182+ timeout_sec = 60 ,
2183+ backoff_sec = 1 ,
2184+ err_msg = "Shadow partitions do not match source partitions after prefix trim" ,
2185+ )
2186+
2187+ # Log the final state after matching
2188+ self .logger .info (
2189+ "Shadow partitions match source partitions after prefix trim:"
2190+ )
2191+ for part in self .target_cluster_rpk .describe_topic (topic .name ):
2192+ self .logger .info (
2193+ f" Partition { part .id } : start_offset={ part .start_offset } , HWM={ part .high_watermark } "
2194+ )
2195+
2196+ # Step 7 & 8: Write data to the source partitions and verify replication
2197+ with self .producer_consumer (topic = topic .name , msg_size = 128 , msg_cnt = 10000 ):
2198+ self .verify ()
2199+
20352200 @cluster (num_nodes = 7 )
20362201 @matrix (
20372202 timestamp_type = [
0 commit comments