Skip to content

Commit 73057ab

Browse files
authored
feat(aws-s3) Refactor the Download task to add multiple files download capability (#599)
related #296
1 parent 6955762 commit 73057ab

3 files changed

Lines changed: 377 additions & 22 deletions

File tree

src/main/java/io/kestra/plugin/aws/s3/Download.java

Lines changed: 160 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,21 @@
22

33
import io.kestra.core.models.annotations.Example;
44
import io.kestra.core.models.annotations.Plugin;
5-
import io.kestra.core.models.annotations.PluginProperty;
65
import io.kestra.core.models.property.Property;
76
import io.kestra.core.models.tasks.RunnableTask;
87
import io.kestra.core.runners.RunContext;
8+
import io.kestra.plugin.aws.s3.models.FileInfo;
9+
import io.kestra.plugin.aws.s3.models.S3Object;
910
import io.swagger.v3.oas.annotations.media.Schema;
10-
import jakarta.validation.constraints.NotNull;
1111
import lombok.*;
1212
import lombok.experimental.SuperBuilder;
1313
import org.apache.commons.lang3.tuple.Pair;
1414
import software.amazon.awssdk.services.s3.S3AsyncClient;
15-
import software.amazon.awssdk.services.s3.S3Client;
1615
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
1716
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
1817

1918
import java.net.URI;
19+
import java.util.HashMap;
2020
import java.util.Map;
2121

2222
@SuperBuilder
@@ -45,17 +45,25 @@
4545
}
4646
)
4747
@Schema(
48-
title = "Download a file from an S3 bucket."
48+
title = "Download a file(s) from an S3 bucket.",
49+
description = """
50+
This task can operate in two modes:
51+
1. Single file mode: When providing only the 'key' parameter, it downloads a specific file from S3
52+
2. Multiple files mode: When using filtering parameters (prefix, delimiter, regexp), it downloads multiple files matching the criteria
53+
54+
In single file mode, the output contains properties of a single file (uri, contentLength, etc.)
55+
In multiple files mode, the output contains maps that associate each file key with its properties (uris, contentLengths, etc.)"""
4956
)
5057
public class Download extends AbstractS3Object implements RunnableTask<Download.Output> {
5158
@Schema(
52-
title = "The key of a file to download."
59+
title = "The key of a file to download.",
60+
description = "When specified without filtering options (prefix, delimiter, regexp), the task will download a single file."
5361
)
54-
@NotNull
5562
private Property<String> key;
5663

5764
@Schema(
58-
title = "The specific version of the object."
65+
title = "The specific version of the object.",
66+
description = "Only applicable when downloading a single file with the key parameter."
5967
)
6068
protected Property<String> versionId;
6169

@@ -65,28 +73,73 @@ public class Download extends AbstractS3Object implements RunnableTask<Download.
6573
@Builder.Default
6674
private Property<Boolean> compatibilityMode = Property.of(false);
6775

76+
@Schema(
77+
title = "The prefix of files to download.",
78+
description = "When specified, the task switches to multiple files mode and downloads all files with keys starting with this prefix."
79+
)
80+
private Property<String> prefix;
81+
82+
@Schema(
83+
title = "A character used to group keys.",
84+
description = "When specified, the task switches to multiple files mode. The API returns all keys that share a common prefix up to the delimiter."
85+
)
86+
private Property<String> delimiter;
87+
88+
@Schema(
89+
title = "Used for pagination in multiple files mode.",
90+
description = "This is the key at which a previous listing ended."
91+
)
92+
private Property<String> marker;
93+
94+
@Schema(
95+
title = "The maximum number of keys to include in the response in multiple files mode."
96+
)
97+
@Builder.Default
98+
private Property<Integer> maxKeys = Property.of(1000);
99+
100+
@Schema(
101+
title = "A regular expression to filter the keys of the objects to download.",
102+
description = "When specified, the task switches to multiple files mode and only downloads files matching the pattern."
103+
)
104+
protected Property<String> regexp;
105+
106+
@Schema(
107+
title = "The account ID of the expected bucket owner.",
108+
description = "Requests will fail with a Forbidden error (access denied) if the bucket is owned by a different account."
109+
)
110+
private Property<String> expectedBucketOwner;
111+
112+
68113
@Override
69114
public Output run(RunContext runContext) throws Exception {
70115
String bucket = runContext.render(this.bucket).as(String.class).orElseThrow();
71-
String key = runContext.render(this.key).as(String.class).orElseThrow();
72116

73-
try (S3AsyncClient client = this.asyncClient(runContext)) {
74-
GetObjectRequest.Builder builder = GetObjectRequest.builder()
75-
.bucket(bucket)
76-
.key(key);
117+
if (isSingleFileMode()) {
118+
return downloadSingleFile(runContext, bucket);
119+
} else if (isValidMultipleFilesMode()) {
120+
return downloadMultipleFiles(runContext, bucket);
121+
} else {
122+
throw new IllegalArgumentException("Invalid configuration: either specify 'key' for single file download or at least one filtering parameter (prefix, delimiter, regexp) for multiple files download");
123+
}
124+
}
77125

78-
if (this.versionId != null) {
79-
builder.versionId(runContext.render(this.versionId).as(String.class).orElseThrow());
80-
}
126+
private boolean isValidMultipleFilesMode() {
127+
return this.prefix != null || this.delimiter != null || this.regexp != null;
128+
}
81129

82-
if (this.requestPayer != null) {
83-
builder.requestPayer(runContext.render(this.requestPayer).as(String.class).orElseThrow());
84-
}
130+
private boolean isSingleFileMode() {
131+
return this.key != null &&
132+
(this.prefix == null && this.delimiter == null && this.regexp == null);
133+
}
134+
135+
private Output downloadSingleFile(RunContext runContext, String bucket) throws Exception {
136+
String key = runContext.render(this.key).as(String.class).orElseThrow();
85137

86-
Pair<GetObjectResponse, URI> download = S3Service.download(runContext, client, builder.build());
138+
try (S3AsyncClient client = this.asyncClient(runContext)) {
139+
GetObjectRequest request = buildGetObjectRequest(runContext, bucket, key);
140+
Pair<GetObjectResponse, URI> download = S3Service.download(runContext, client, request);
87141

88-
return Output
89-
.builder()
142+
return Output.builder()
90143
.uri(download.getRight())
91144
.eTag(download.getLeft().eTag())
92145
.contentLength(download.getLeft().contentLength())
@@ -97,6 +150,85 @@ public Output run(RunContext runContext) throws Exception {
97150
}
98151
}
99152

153+
private GetObjectRequest buildGetObjectRequest(RunContext runContext, String bucket, String key) throws Exception {
154+
GetObjectRequest.Builder builder = GetObjectRequest.builder()
155+
.bucket(bucket)
156+
.key(key);
157+
158+
if (this.versionId != null) {
159+
builder.versionId(runContext.render(this.versionId).as(String.class).orElseThrow());
160+
}
161+
162+
if (this.requestPayer != null) {
163+
builder.requestPayer(runContext.render(this.requestPayer).as(String.class).orElseThrow());
164+
}
165+
166+
if (this.expectedBucketOwner != null) {
167+
builder.expectedBucketOwner(runContext.render(this.expectedBucketOwner).as(String.class).orElseThrow());
168+
}
169+
170+
return builder.build();
171+
}
172+
173+
private Output downloadMultipleFiles(RunContext runContext, String bucket) throws Exception {
174+
List.Output listResult = getObjectsList(runContext);
175+
176+
if (listResult.getObjects().isEmpty()) {
177+
runContext.logger().warn("No objects found matching the filter criteria");
178+
}
179+
180+
Map<String, FileInfo> files = new HashMap<>();
181+
182+
try (S3AsyncClient client = this.asyncClient(runContext)) {
183+
for (S3Object object : listResult.getObjects()) {
184+
GetObjectRequest request = buildGetObjectRequest(runContext, bucket, object.getKey());
185+
Pair<GetObjectResponse, URI> download = S3Service.download(runContext, client, request);
186+
187+
String key = object.getKey();
188+
files.put(key, FileInfo.builder()
189+
.uri(download.getRight())
190+
.contentLength(download.getLeft().contentLength())
191+
.contentType(download.getLeft().contentType())
192+
.metadata(download.getLeft().metadata())
193+
.eTag(download.getLeft().eTag())
194+
.versionId(download.getLeft().versionId())
195+
.build());
196+
}
197+
198+
return Output.builder()
199+
.files(files)
200+
.build();
201+
}
202+
}
203+
204+
private List.Output getObjectsList(RunContext runContext) throws Exception {
205+
List listTask = List.builder()
206+
.id(this.id)
207+
.type(List.class.getName())
208+
.region(this.region)
209+
.endpointOverride(this.endpointOverride)
210+
.accessKeyId(this.accessKeyId)
211+
.secretKeyId(this.secretKeyId)
212+
.sessionToken(this.sessionToken)
213+
.requestPayer(this.requestPayer)
214+
.bucket(this.bucket)
215+
.prefix(this.prefix)
216+
.delimiter(this.delimiter)
217+
.marker(this.marker)
218+
.maxKeys(this.maxKeys)
219+
.expectedBucketOwner(this.expectedBucketOwner)
220+
.regexp(this.regexp)
221+
.filter(Property.of(ListInterface.Filter.FILES))
222+
.stsRoleArn(this.stsRoleArn)
223+
.stsRoleSessionName(this.stsRoleSessionName)
224+
.stsRoleExternalId(this.stsRoleExternalId)
225+
.stsRoleSessionDuration(this.stsRoleSessionDuration)
226+
.stsEndpointOverride(this.stsEndpointOverride)
227+
.build();
228+
229+
return listTask.run(runContext);
230+
}
231+
100232
@SuperBuilder
101233
@Getter
102234
public static class Output extends ObjectOutput implements io.kestra.core.models.tasks.Output {
@@ -116,5 +248,11 @@ public static class Output extends ObjectOutput implements io.kestra.core.models
116248
title = "A map of metadata to store with the object in S3."
117249
)
118250
private final Map<String, String> metadata;
251+
252+
@Schema(
253+
title = "Map of object keys to their complete file information (multiple files mode only)"
254+
)
255+
private final Map<String, FileInfo> files;
256+
119257
}
120-
}
258+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package io.kestra.plugin.aws.s3.models;
2+
3+
import io.swagger.v3.oas.annotations.media.Schema;
4+
import lombok.Builder;
5+
import lombok.Getter;
6+
7+
import java.net.URI;
8+
import java.util.Map;
9+
10+
@Builder
11+
@Getter
12+
public class FileInfo {
13+
@Schema(
14+
title = "The URI of the downloaded file in Kestra's storage"
15+
)
16+
private URI uri;
17+
18+
@Schema(
19+
title = "The size of the file in bytes"
20+
)
21+
private Long contentLength;
22+
23+
@Schema(
24+
title = "The MIME type of the file"
25+
)
26+
private String contentType;
27+
28+
@Schema(
29+
title = "The metadata of the file"
30+
)
31+
private Map<String, String> metadata;
32+
33+
@Schema(
34+
title = "The version ID of the file"
35+
)
36+
private String versionId;
37+
38+
@Schema(
39+
title = "An ETag is an opaque identifier assigned by a web server to a specific version of a resource found at a URL."
40+
)
41+
private String eTag;
42+
}

0 commit comments

Comments
 (0)