Skip to content

[To dev/1.3] Pipe: Add upper bound check for Pipe request decompression buffer (#15699) #15806

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: dev/1.3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ public class CommonConfig {
private long pipeReceiverLoginPeriodicVerificationIntervalMs = 300000;
private double pipeReceiverActualToEstimatedMemoryRatio = 3;

private int pipeReceiverReqDecompressedMaxLengthInBytes = 1073741824; // 1GB

private int pipeMaxAllowedHistoricalTsFilePerDataRegion = Integer.MAX_VALUE; // Deprecated
private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = Integer.MAX_VALUE; // Deprecated
private int pipeMaxAllowedPinnedMemTableCount = Integer.MAX_VALUE; // per data region
Expand Down Expand Up @@ -1477,6 +1479,22 @@ public double getPipeReceiverActualToEstimatedMemoryRatio() {
return pipeReceiverActualToEstimatedMemoryRatio;
}

public void setPipeReceiverReqDecompressedMaxLengthInBytes(
int pipeReceiverReqDecompressedMaxLengthInBytes) {
if (this.pipeReceiverReqDecompressedMaxLengthInBytes
== pipeReceiverReqDecompressedMaxLengthInBytes) {
return;
}
this.pipeReceiverReqDecompressedMaxLengthInBytes = pipeReceiverReqDecompressedMaxLengthInBytes;
logger.info(
"pipeReceiverReqDecompressedMaxLengthInBytes is set to {}.",
pipeReceiverReqDecompressedMaxLengthInBytes);
}

public int getPipeReceiverReqDecompressedMaxLengthInBytes() {
return pipeReceiverReqDecompressedMaxLengthInBytes;
}

public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
return pipeMaxAllowedHistoricalTsFilePerDataRegion;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,10 @@ public double getPipeReceiverActualToEstimatedMemoryRatio() {
return COMMON_CONFIG.getPipeReceiverActualToEstimatedMemoryRatio();
}

public int getPipeReceiverReqDecompressedMaxLengthInBytes() {
return COMMON_CONFIG.getPipeReceiverReqDecompressedMaxLengthInBytes();
}

/////////////////////////////// Hybrid Mode ///////////////////////////////

public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
Expand Down Expand Up @@ -614,6 +618,9 @@ public void printAllConfigs() {
LOGGER.info(
"PipeReceiverActualToEstimatedMemoryRatio: {}",
getPipeReceiverActualToEstimatedMemoryRatio());
LOGGER.info(
"PipeReceiverReqDecompressedMaxLengthInBytes: {}",
getPipeReceiverReqDecompressedMaxLengthInBytes());

LOGGER.info(
"PipeMaxAllowedHistoricalTsFilePerDataRegion: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,11 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
properties.getProperty(
"pipe_receiver_actual_to_estimated_memory_ratio",
Double.toString(config.getPipeReceiverActualToEstimatedMemoryRatio()))));
config.setPipeReceiverReqDecompressedMaxLengthInBytes(
Integer.parseInt(
properties.getProperty(
"pipe_receiver_req_decompressed_max_length_in_bytes",
String.valueOf(config.getPipeReceiverReqDecompressedMaxLengthInBytes()))));

config.setPipeMaxAllowedHistoricalTsFilePerDataRegion(
Integer.parseInt(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.commons.pipe.connector.payload.thrift.request;

import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressor;
import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorFactory;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
Expand Down Expand Up @@ -91,6 +92,7 @@ public static TPipeTransferReq fromTPipeTransferReq(final TPipeTransferReq trans
compressors.add(
PipeCompressorFactory.getCompressor(ReadWriteIOUtils.readByte(compressedBuffer)));
uncompressedLengths.add(ReadWriteIOUtils.readInt(compressedBuffer));
checkDecompressedLength(uncompressedLengths.get(i));
}

byte[] body = new byte[compressedBuffer.remaining()];
Expand All @@ -110,6 +112,19 @@ public static TPipeTransferReq fromTPipeTransferReq(final TPipeTransferReq trans
return decompressedReq;
}

/** This method is used to prevent decompression bomb attacks. */
private static void checkDecompressedLength(final int decompressedLength)
throws IllegalArgumentException {
final int maxDecompressedLength =
PipeConfig.getInstance().getPipeReceiverReqDecompressedMaxLengthInBytes();
if (decompressedLength < 0 || decompressedLength > maxDecompressedLength) {
throw new IllegalArgumentException(
String.format(
"Decompressed length should be between 0 and %d, but got %d.",
maxDecompressedLength, decompressedLength));
}
}

/**
* For air-gap connectors. Generate the bytes of a compressed req from the bytes of original req.
*/
Expand Down
Loading