Skip to content

Commit 8fa7d83

Browse files
arunkumarchackocnauroth
authored andcommitted
HADOOP-19343. Add support for append(), compose(), concat()
Closes #7773 Signed-off-by: Chris Nauroth <[email protected]>
1 parent f5633ea commit 8fa7d83

12 files changed

+396
-184
lines changed

hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/Constants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.hadoop.fs.gs;
2020

2121
final class Constants {
22+
static final int MAX_COMPOSE_OBJECTS = 32;
23+
2224
private Constants() {}
2325

2426
// URI scheme for GCS.

hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateFileOptions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
* Options that can be specified when creating a file in the {@link GoogleCloudStorageFileSystem}.
3030
*/
3131
final class CreateFileOptions {
32+
static final CreateFileOptions DEFAULT = CreateFileOptions.builder().build();
3233
private final ImmutableMap<String, byte[]> attributes;
3334
private final String contentType;
3435
private final long overwriteGenerationId;
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.gs;
20+
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
24+
import com.google.cloud.storage.Blob;
25+
import com.google.cloud.storage.Storage;
26+
27+
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument;
28+
29+
final class GcsListOperation {
30+
private static final int ALL = 0;
31+
private final Storage.BlobListOption[] listOptions;
32+
private final String bucketName;
33+
private final Storage storage;
34+
private final int limit;
35+
36+
private GcsListOperation(Builder builder) {
37+
this.listOptions = builder.blobListOptions
38+
.toArray(new Storage.BlobListOption[builder.blobListOptions.size()]);
39+
this.bucketName = builder.bucket;
40+
this.storage = builder.storage;
41+
this.limit = builder.limit;
42+
}
43+
44+
public List<Blob> execute() {
45+
List<Blob> result = new ArrayList<>();
46+
for (Blob blob : storage.list(bucketName, listOptions).iterateAll()) {
47+
result.add(blob);
48+
49+
if (limit != ALL && result.size() >= limit) {
50+
break;
51+
}
52+
}
53+
54+
return result;
55+
}
56+
57+
static class Builder {
58+
private final ArrayList<Storage.BlobListOption> blobListOptions = new ArrayList<>();
59+
private String prefix;
60+
private final String bucket;
61+
private final Storage storage;
62+
private int limit = GcsListOperation.ALL;
63+
64+
Builder(final String bucketName, final String thePrefix, Storage storage) {
65+
this.storage = storage;
66+
this.bucket = bucketName;
67+
this.prefix = thePrefix;
68+
}
69+
70+
Builder forRecursiveListing() {
71+
return this;
72+
}
73+
74+
GcsListOperation build() {
75+
blobListOptions.add(Storage.BlobListOption.prefix(prefix));
76+
return new GcsListOperation(this);
77+
}
78+
79+
Builder forCurrentDirectoryListing() {
80+
blobListOptions.add(Storage.BlobListOption.currentDirectory());
81+
blobListOptions.add(Storage.BlobListOption.includeTrailingDelimiter());
82+
return this;
83+
}
84+
85+
Builder forCurrentDirectoryListingWithLimit(int theLimit) {
86+
checkArgument(
87+
theLimit > 0,
88+
"limit should be greater than 0. found %d; prefix=%s", theLimit, prefix);
89+
90+
this.limit = theLimit;
91+
prefix = StringPaths.toDirectoryPath(prefix);
92+
93+
blobListOptions.add(Storage.BlobListOption.pageSize(1));
94+
forCurrentDirectoryListing();
95+
return this;
96+
}
97+
}
98+
}

hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java

Lines changed: 107 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -288,49 +288,6 @@ private static GoogleCloudStorageItemInfo createItemInfoForBucket(StorageResourc
288288
bucket.getStorageClass() == null ? null : bucket.getStorageClass().name());
289289
}
290290

291-
List<GoogleCloudStorageItemInfo> listObjectInfo(
292-
String bucketName,
293-
String objectNamePrefix,
294-
ListObjectOptions listOptions) throws IOException {
295-
try {
296-
long maxResults = listOptions.getMaxResults() > 0 ?
297-
listOptions.getMaxResults() + (listOptions.isIncludePrefix() ? 0 : 1) :
298-
listOptions.getMaxResults();
299-
300-
Storage.BlobListOption[] blobListOptions =
301-
getBlobListOptions(objectNamePrefix, listOptions, maxResults);
302-
Page<Blob> blobs = storage.list(bucketName, blobListOptions);
303-
ListOperationResult result = new ListOperationResult(maxResults);
304-
for (Blob blob : blobs.iterateAll()) {
305-
result.add(blob);
306-
}
307-
308-
return result.getItems();
309-
} catch (StorageException e) {
310-
throw new IOException(
311-
String.format("listing object '%s' failed.", BlobId.of(bucketName, objectNamePrefix)),
312-
e);
313-
}
314-
}
315-
316-
private Storage.BlobListOption[] getBlobListOptions(
317-
String objectNamePrefix, ListObjectOptions listOptions, long maxResults) {
318-
List<Storage.BlobListOption> options = new ArrayList<>();
319-
320-
options.add(Storage.BlobListOption.fields(BLOB_FIELDS.toArray(new Storage.BlobField[0])));
321-
options.add(Storage.BlobListOption.prefix(objectNamePrefix));
322-
// TODO: set max results as a BlobListOption
323-
if ("/".equals(listOptions.getDelimiter())) {
324-
options.add(Storage.BlobListOption.currentDirectory());
325-
}
326-
327-
if (listOptions.getDelimiter() != null) {
328-
options.add(Storage.BlobListOption.includeTrailingDelimiter());
329-
}
330-
331-
return options.toArray(new Storage.BlobListOption[0]);
332-
}
333-
334291
private GoogleCloudStorageItemInfo createItemInfoForBlob(Blob blob) {
335292
long generationId = blob.getGeneration() == null ? 0L : blob.getGeneration();
336293
StorageResourceId resourceId =
@@ -403,8 +360,7 @@ void createEmptyObject(StorageResourceId resourceId, CreateObjectOptions options
403360
}
404361
}
405362

406-
407-
public GoogleCloudStorageItemInfo composeObjects(
363+
GoogleCloudStorageItemInfo composeObjects(
408364
List<StorageResourceId> sources, StorageResourceId destination, CreateObjectOptions options)
409365
throws IOException {
410366
LOG.trace("composeObjects({}, {}, {})", sources, destination, options);
@@ -538,13 +494,14 @@ List<GoogleCloudStorageItemInfo> listDirectoryRecursive(String bucketName, Strin
538494
// TODO: Take delimiter from config
539495
// TODO: Set specific fields
540496

497+
checkArgument(objectName.endsWith("/"), String.format("%s should end with /", objectName));
541498
try {
542-
Page<Blob> blobs = storage.list(
543-
bucketName,
544-
Storage.BlobListOption.prefix(objectName));
499+
List<Blob> blobs = new GcsListOperation.Builder(bucketName, objectName, storage)
500+
.forRecursiveListing().build()
501+
.execute();
545502

546503
List<GoogleCloudStorageItemInfo> result = new ArrayList<>();
547-
for (Blob blob : blobs.iterateAll()) {
504+
for (Blob blob : blobs) {
548505
result.add(createItemInfoForBlob(blob));
549506
}
550507

@@ -624,7 +581,7 @@ private List<Bucket> listBucketsInternal() throws IOException {
624581
return allBuckets;
625582
}
626583

627-
public SeekableByteChannel open(GoogleCloudStorageItemInfo itemInfo,
584+
SeekableByteChannel open(GoogleCloudStorageItemInfo itemInfo,
628585
GoogleHadoopFileSystemConfiguration config) throws IOException {
629586
LOG.trace("open({})", itemInfo);
630587
checkNotNull(itemInfo, "itemInfo should not be null");
@@ -647,7 +604,7 @@ private SeekableByteChannel open(
647604
config);
648605
}
649606

650-
public void move(Map<StorageResourceId, StorageResourceId> sourceToDestinationObjectsMap)
607+
void move(Map<StorageResourceId, StorageResourceId> sourceToDestinationObjectsMap)
651608
throws IOException {
652609
validateMoveArguments(sourceToDestinationObjectsMap);
653610

@@ -739,7 +696,7 @@ private Storage.MoveBlobRequest.Builder createMoveRequestBuilder(
739696
* Validates basic argument constraints like non-null, non-empty Strings, using {@code
740697
* Preconditions} in addition to checking for src/dst bucket equality.
741698
*/
742-
public static void validateMoveArguments(
699+
static void validateMoveArguments(
743700
Map<StorageResourceId, StorageResourceId> sourceToDestinationObjectsMap) throws IOException {
744701
checkNotNull(sourceToDestinationObjectsMap, "srcObjects must not be null");
745702

@@ -837,7 +794,7 @@ private void copyInternal(
837794
}
838795
}
839796

840-
public static void validateCopyArguments(
797+
static void validateCopyArguments(
841798
Map<StorageResourceId, StorageResourceId> sourceToDestinationObjectsMap,
842799
GoogleCloudStorage gcsImpl)
843800
throws IOException {
@@ -927,19 +884,110 @@ List<GoogleCloudStorageItemInfo> getItemInfos(List<StorageResourceId> resourceId
927884
return result;
928885
}
929886

887+
List<GoogleCloudStorageItemInfo> listDirectory(String bucketName, String objectNamePrefix)
888+
throws IOException {
889+
checkArgument(
890+
objectNamePrefix.endsWith("/"),
891+
String.format("%s should end with /", objectNamePrefix));
892+
893+
try {
894+
List<Blob> blobs = new GcsListOperation.Builder(bucketName, objectNamePrefix, storage)
895+
.forCurrentDirectoryListing().build()
896+
.execute();
897+
898+
ListOperationResult result = new ListOperationResult();
899+
for (Blob blob : blobs) {
900+
result.add(blob);
901+
}
902+
903+
return result.getItems();
904+
} catch (StorageException e) {
905+
throw new IOException(
906+
String.format("listing object '%s' failed.", BlobId.of(bucketName, objectNamePrefix)),
907+
e);
908+
}
909+
}
910+
911+
void compose(
912+
String bucketName, List<String> sources, String destination, String contentType)
913+
throws IOException {
914+
LOG.trace("compose({}, {}, {}, {})", bucketName, sources, destination, contentType);
915+
List<StorageResourceId> sourceIds =
916+
sources.stream()
917+
.map(objectName -> new StorageResourceId(bucketName, objectName))
918+
.collect(Collectors.toList());
919+
StorageResourceId destinationId = new StorageResourceId(bucketName, destination);
920+
CreateObjectOptions options =
921+
CreateObjectOptions.DEFAULT_OVERWRITE.toBuilder()
922+
.setContentType(contentType)
923+
.setEnsureEmptyObjectsMetadataMatch(false)
924+
.build();
925+
composeObjects(sourceIds, destinationId, options);
926+
}
927+
928+
/**
929+
* Get metadata for the given resourceId. The resourceId can be a file or a directory.
930+
*
931+
* For a resourceId gs://b/foo/a, it can be a file or a directory (gs:/b/foo/a/).
932+
* This method checks for both and return the one that is found. "NotFound" is returned
933+
* if not found.
934+
*/
935+
GoogleCloudStorageItemInfo getFileOrDirectoryInfo(StorageResourceId resourceId) {
936+
BlobId blobId = resourceId.toBlobId();
937+
if (resourceId.isDirectory()) {
938+
// Do not check for "file" for directory paths.
939+
Blob blob = storage.get(blobId);
940+
if (blob != null) {
941+
return createItemInfoForBlob(blob);
942+
}
943+
} else {
944+
BlobId dirId = resourceId.toDirectoryId().toBlobId();
945+
946+
// Check for both file and directory.
947+
List<Blob> blobs = storage.get(blobId, dirId);
948+
for (Blob blob : blobs) {
949+
if (blob != null) {
950+
return createItemInfoForBlob(blob);
951+
}
952+
}
953+
}
954+
955+
return GoogleCloudStorageItemInfo.createNotFound(resourceId);
956+
}
957+
958+
/**
959+
* Check if any "implicit" directory exists for the given resourceId.
960+
*
961+
* Note that GCS object store does not have a concept of directories for non-HNS buckets.
962+
* For e.g. one could create an object gs://bucket/foo/bar/a.txt, without creating the
963+
* parent directories (i.e. placeholder emtpy files ending with a /). In this case we might
964+
* want to treat gs://bucket/foo/ and gs://bucket/foo/bar/ as directories.
965+
*
966+
* This method helps check if a given resourceId (e.g. gs://bucket/foo/bar/) is an "implicit"
967+
* directory.
968+
*
969+
* Note that this will result in a list operation and is more expensive than "get metadata".
970+
*/
971+
GoogleCloudStorageItemInfo getImplicitDirectory(StorageResourceId resourceId) {
972+
List<Blob> blobs = new GcsListOperation
973+
.Builder(resourceId.getBucketName(), resourceId.getObjectName(), storage)
974+
.forCurrentDirectoryListingWithLimit(1).build()
975+
.execute();
976+
977+
if (blobs.isEmpty()) {
978+
return GoogleCloudStorageItemInfo.createNotFound(resourceId);
979+
}
980+
981+
return GoogleCloudStorageItemInfo.createInferredDirectory(resourceId.toDirectoryId());
982+
}
983+
930984
// Helper class to capture the results of list operation.
931985
private class ListOperationResult {
932986
private final Map<String, Blob> prefixes = new HashMap<>();
933987
private final List<Blob> objects = new ArrayList<>();
934988

935989
private final Set<String> objectsSet = new HashSet<>();
936990

937-
private final long maxResults;
938-
939-
ListOperationResult(long maxResults) {
940-
this.maxResults = maxResults;
941-
}
942-
943991
void add(Blob blob) {
944992
String path = blob.getBlobId().toGsUtilUri();
945993
if (blob.getGeneration() != null) {
@@ -957,17 +1005,9 @@ List<GoogleCloudStorageItemInfo> getItems() {
9571005

9581006
for (Blob blob : objects) {
9591007
result.add(createItemInfoForBlob(blob));
960-
961-
if (result.size() == maxResults) {
962-
return result;
963-
}
9641008
}
9651009

9661010
for (Blob blob : prefixes.values()) {
967-
if (result.size() == maxResults) {
968-
return result;
969-
}
970-
9711011
result.add(createItemInfoForBlob(blob));
9721012
}
9731013

0 commit comments

Comments
 (0)