|
25 | 25 | import java.util.ArrayList;
|
26 | 26 | import java.util.EnumSet;
|
27 | 27 | import java.util.List;
|
| 28 | +import java.util.Random; |
28 | 29 | import java.util.UUID;
|
29 | 30 | import java.util.concurrent.CompletableFuture;
|
30 | 31 | import java.util.concurrent.CompletionException;
|
31 | 32 | import java.util.concurrent.ExecutorService;
|
32 | 33 | import java.util.concurrent.Executors;
|
| 34 | +import java.util.concurrent.Future; |
33 | 35 |
|
34 | 36 | import org.assertj.core.api.Assertions;
|
35 | 37 | import org.junit.Assume;
|
|
72 | 74 | import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
|
73 | 75 | import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
|
74 | 76 | import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
|
| 77 | +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE; |
75 | 78 | import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_MKDIR_OVERWRITE;
|
76 | 79 | import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
|
77 | 80 | import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX;
|
@@ -925,6 +928,96 @@ public void testCreateSubPath() throws Exception {
|
925 | 928 | intercept(IOException.class, () -> fs.create(new Path("a/b")));
|
926 | 929 | }
|
927 | 930 |
|
| 931 | + /** |
| 932 | + * Test create path in parallel with overwrite false. |
| 933 | + **/ |
| 934 | + @Test |
| 935 | + public void testParallelCreateOverwriteFalse() |
| 936 | + throws Exception { |
| 937 | + Configuration configuration = getRawConfiguration(); |
| 938 | + configuration.set(FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE, "false"); |
| 939 | + try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance( |
| 940 | + configuration)) { |
| 941 | + ExecutorService executorService = Executors.newFixedThreadPool(5); |
| 942 | + List<Future<?>> futures = new ArrayList<>(); |
| 943 | + |
| 944 | + final byte[] b = new byte[8 * ONE_MB]; |
| 945 | + new Random().nextBytes(b); |
| 946 | + final Path filePath = path("/testPath"); |
| 947 | + |
| 948 | + futures.add(executorService.submit(() -> { |
| 949 | + try { |
| 950 | + fs.create(filePath, false); |
| 951 | + } catch (IOException e) { |
| 952 | + throw new RuntimeException(e); |
| 953 | + } |
| 954 | + })); |
| 955 | + |
| 956 | + futures.add(executorService.submit(() -> { |
| 957 | + try { |
| 958 | + fs.create(filePath, false); |
| 959 | + } catch (IOException e) { |
| 960 | + throw new RuntimeException(e); |
| 961 | + } |
| 962 | + })); |
| 963 | + |
| 964 | + futures.add(executorService.submit(() -> { |
| 965 | + try { |
| 966 | + fs.create(filePath, false); |
| 967 | + } catch (IOException e) { |
| 968 | + throw new RuntimeException(e); |
| 969 | + } |
| 970 | + })); |
| 971 | + |
| 972 | + checkFuturesForExceptions(futures, 2); |
| 973 | + } |
| 974 | + } |
| 975 | + |
| 976 | + /** |
| 977 | + * Test create path in parallel with overwrite true. |
| 978 | + **/ |
| 979 | + @Test |
| 980 | + public void testParallelCreateOverwriteTrue() |
| 981 | + throws Exception { |
| 982 | + Configuration configuration = getRawConfiguration(); |
| 983 | + configuration.set(FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE, "false"); |
| 984 | + try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance( |
| 985 | + configuration)) { |
| 986 | + ExecutorService executorService = Executors.newFixedThreadPool(5); |
| 987 | + List<Future<?>> futures = new ArrayList<>(); |
| 988 | + |
| 989 | + final byte[] b = new byte[8 * ONE_MB]; |
| 990 | + new Random().nextBytes(b); |
| 991 | + final Path filePath = path("/testPath"); |
| 992 | + |
| 993 | + futures.add(executorService.submit(() -> { |
| 994 | + try { |
| 995 | + fs.create(filePath); |
| 996 | + } catch (IOException e) { |
| 997 | + throw new RuntimeException(e); |
| 998 | + } |
| 999 | + })); |
| 1000 | + |
| 1001 | + futures.add(executorService.submit(() -> { |
| 1002 | + try { |
| 1003 | + fs.create(filePath); |
| 1004 | + } catch (IOException e) { |
| 1005 | + throw new RuntimeException(e); |
| 1006 | + } |
| 1007 | + })); |
| 1008 | + |
| 1009 | + futures.add(executorService.submit(() -> { |
| 1010 | + try { |
| 1011 | + fs.create(filePath); |
| 1012 | + } catch (IOException e) { |
| 1013 | + throw new RuntimeException(e); |
| 1014 | + } |
| 1015 | + })); |
| 1016 | + |
| 1017 | + checkFuturesForExceptions(futures, 0); |
| 1018 | + } |
| 1019 | + } |
| 1020 | + |
928 | 1021 | /**
|
929 | 1022 | * Creating path with parent explicit.
|
930 | 1023 | */
|
|
0 commit comments