Skip to content

Commit c293424

Browse files
HADOOP-19497: [ABFS] Enable rename and create recovery from client transaction id over DFS endpoint (#7509) (#7612)
Contributed by Manish Bhatt Reiewed by Anmol Asrani, Anuj Modi, Manika Joshi Signed off by: Anuj Modi<[email protected]>
1 parent 8d353d5 commit c293424

File tree

9 files changed

+332
-171
lines changed

9 files changed

+332
-171
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -199,13 +199,10 @@ public String toString() {
199199
}
200200

201201
public static ApiVersion getCurrentVersion() {
202-
return DEC_12_2019;
202+
return NOV_04_2024;
203203
}
204204
}
205205

206-
@Deprecated
207-
public static final String DECEMBER_2019_API_VERSION = ApiVersion.DEC_12_2019.toString();
208-
209206
/**
210207
* List of Constants Used by Blob Endpoint Rest APIs.
211208
*/

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ public final class FileSystemConfigurations {
198198

199199
public static final int DEFAULT_FS_AZURE_BLOB_DELETE_THREAD = DEFAULT_FS_AZURE_LISTING_ACTION_THREADS;
200200

201-
public static final boolean DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = false;
201+
public static final boolean DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = true;
202202

203203
private FileSystemConfigurations() {}
204204
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,6 @@ private AbfsClient(final URL baseUrl,
218218
this.encryptionContextProvider = encryptionContextProvider;
219219
// Version update needed to support x-ms-encryption-context header
220220
// @link https://learn.microsoft.com/en-us/rest/api/storageservices/put-block?tabs=microsoft-entra-id}
221-
xMsVersion = ApiVersion.AUG_03_2023; // will be default once server change deployed
222221
encryptionType = EncryptionType.ENCRYPTION_CONTEXT;
223222
} else if (abfsConfiguration.getEncodedClientProvidedEncryptionKey() != null) {
224223
clientProvidedEncryptionKey =
@@ -228,11 +227,6 @@ private AbfsClient(final URL baseUrl,
228227
encryptionType = EncryptionType.GLOBAL_KEY;
229228
}
230229

231-
// Version update needed to support x-ms-client-transaction-id header
232-
if (abfsConfiguration.getIsClientTransactionIdEnabled()) {
233-
xMsVersion = ApiVersion.NOV_04_2024;
234-
}
235-
236230
String sslProviderName = null;
237231

238232
if (this.baseUrl.toString().startsWith(HTTPS_SCHEME)) {

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java

Lines changed: 301 additions & 144 deletions
Large diffs are not rendered by default.

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public final class AbfsErrors {
7272
public static final String ERR_CREATE_RECOVERY =
7373
"Error while recovering from create failure.";
7474
public static final String ERR_RENAME_RECOVERY =
75-
"Error while recovering from rename failure.";
75+
"Error while recovering from rename failure for path: ";
7676
public static final String ERR_BLOB_LIST_PARSING = "Parsing of XML List Response Failed in BlobClient.";
7777
public static final String ERR_DFS_LIST_PARSING = "Parsing of Json List Response Failed in DfsClient.";
7878
public static final String INCORRECT_INGRESS_TYPE = "Ingress Type Cannot be DFS for Blob endpoint configured filesystem.";

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2289,6 +2289,6 @@ public void answer(final AbfsRestOperation mockedObj,
22892289
null, op);
22902290
}
22912291
}
2292-
});
2292+
}, 0);
22932293
}
22942294
}

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2673,12 +2673,13 @@ public void testFailureInGetPathStatusDuringRenameRecovery() throws Exception {
26732673
final String[] clientTransactionId = new String[1];
26742674
mockAddClientTransactionIdToHeader(abfsDfsClient, clientTransactionId);
26752675
mockRetriedRequest(abfsDfsClient, new ArrayList<>());
2676-
boolean[] flag = new boolean[1];
2676+
int[] flag = new int[1];
26772677
Mockito.doAnswer(getPathStatus -> {
2678-
if (!flag[0]) {
2679-
flag[0] = true;
2678+
if (flag[0] == 1) {
2679+
flag[0] += 1;
26802680
throw new AbfsRestOperationException(HTTP_CLIENT_TIMEOUT, "", "", new Exception());
26812681
}
2682+
flag[0] += 1;
26822683
return getPathStatus.callRealMethod();
26832684
}).when(abfsDfsClient).getPathStatus(
26842685
Mockito.nullable(String.class), Mockito.nullable(Boolean.class),
@@ -2737,6 +2738,6 @@ public void answer(final AbfsRestOperation mockedObj,
27372738
SOURCE_PATH_NOT_FOUND.getErrorCode(), EMPTY_STRING, null, op);
27382739
}
27392740
}
2740-
});
2741+
}, 1);
27412742
}
27422743
}

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,11 @@ private boolean isThreadRunning(String threadName) {
152152
* @throws Exception if an error occurs while mocking the operation creation
153153
*/
154154
public static void mockAbfsOperationCreation(final AbfsClient abfsClient,
155-
final MockIntercept mockIntercept) throws Exception {
156-
boolean[] flag = new boolean[1];
155+
final MockIntercept mockIntercept, int failedCall) throws Exception {
156+
int[] flag = new int[1];
157157
Mockito.doAnswer(answer -> {
158-
if (!flag[0]) {
159-
flag[0] = true;
158+
if (flag[0] == failedCall) {
159+
flag[0] += 1;
160160
AbfsRestOperation op = Mockito.spy(
161161
new AbfsRestOperation(
162162
answer.getArgument(0),
@@ -174,6 +174,7 @@ public static void mockAbfsOperationCreation(final AbfsClient abfsClient,
174174
Mockito.doReturn(true).when(op).isARetriedRequest();
175175
return op;
176176
}
177+
flag[0] += 1;
177178
return answer.callRealMethod();
178179
}).when(abfsClient)
179180
.getAbfsRestOperation(any(), any(), any(), any());

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
4545
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
4646
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
47+
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
4748
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
4849
import org.apache.hadoop.fs.statistics.IOStatistics;
4950

@@ -58,7 +59,9 @@
5859
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
5960
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupCounterStatistic;
6061
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
62+
import static org.mockito.ArgumentMatchers.anyBoolean;
6163
import static org.mockito.ArgumentMatchers.anyList;
64+
import static org.mockito.ArgumentMatchers.anyString;
6265
import static org.mockito.Mockito.doReturn;
6366
import static org.mockito.Mockito.mock;
6467
import static org.mockito.Mockito.times;
@@ -180,6 +183,12 @@ AbfsClient getMockAbfsClient() throws IOException {
180183
.when(spyClient)
181184
.createRenameRestOperation(Mockito.any(URL.class), anyList());
182185

186+
Mockito.doCallRealMethod()
187+
.when(spyClient)
188+
.getPathStatus(anyString(), anyBoolean(),
189+
Mockito.any(TracingContext.class),
190+
Mockito.any(ContextEncryptionAdapter.class));
191+
183192
return spyClient;
184193

185194
}
@@ -275,9 +284,14 @@ public void testRenameRecoveryEtagMatchFsLevel() throws IOException {
275284
// 4 calls should have happened in total for rename
276285
// 1 -> original rename rest call, 2 -> first retry,
277286
// +2 for getPathStatus calls
287+
int totalConnections = 4;
288+
if (!getConfiguration().getIsClientTransactionIdEnabled()) {
289+
// 1 additional getPathStatus call to get dest etag
290+
totalConnections++;
291+
}
278292
assertThatStatisticCounter(ioStats,
279293
CONNECTIONS_MADE.getStatName())
280-
.isEqualTo(5 + connMadeBeforeRename);
294+
.isEqualTo(totalConnections + connMadeBeforeRename);
281295
// the RENAME_PATH_ATTEMPTS stat should be incremented by 1
282296
// retries happen internally within AbfsRestOperation execute()
283297
// the stat for RENAME_PATH_ATTEMPTS is updated only once before execute() is called
@@ -350,21 +364,18 @@ public void testRenameRecoveryFailsForDirFsLevel() throws Exception {
350364
if (getConfiguration().getIsClientTransactionIdEnabled()) {
351365
// Recovery based on client transaction id should be successful
352366
assertTrue(renameResult);
353-
// One extra getPathStatus call should have happened
354-
newConnections = 5;
355367
} else {
356368
assertFalse(renameResult);
357-
newConnections = 4;
358369
}
359370

360371
// validating stat counters after rename
361-
// 3 calls should have happened in total for rename
372+
// 4 calls should have happened in total for rename
362373
// 1 -> original rename rest call, 2 -> first retry,
363374
// +1 for getPathStatus calls
364375
// last getPathStatus call should be skipped
365376
assertThatStatisticCounter(ioStats,
366377
CONNECTIONS_MADE.getStatName())
367-
.isEqualTo(newConnections + connMadeBeforeRename);
378+
.isEqualTo(4 + connMadeBeforeRename);
368379

369380
// the RENAME_PATH_ATTEMPTS stat should be incremented by 1
370381
// retries happen internally within AbfsRestOperation execute()

0 commit comments

Comments
 (0)