@@ -25,19 +25,20 @@ function make_dummy_object_sdk(nsfs_config, uid, gid) {
25
25
} ;
26
26
}
27
27
28
+ const tmp_fs_path = path . join ( TMP_PATH , 'test_versioning_concurrency' ) ;
29
+
30
+ const nsfs = new NamespaceFS ( {
31
+ bucket_path : tmp_fs_path ,
32
+ bucket_id : '1' ,
33
+ namespace_resource_id : undefined ,
34
+ access_mode : undefined ,
35
+ versioning : 'ENABLED' ,
36
+ force_md5_etag : false ,
37
+ stats : endpoint_stats_collector . instance ( ) ,
38
+ } ) ;
39
+
28
40
const DUMMY_OBJECT_SDK = make_dummy_object_sdk ( true ) ;
29
41
describe ( 'test versioning concurrency' , ( ) => {
30
- const tmp_fs_path = path . join ( TMP_PATH , 'test_versioning_concurrency' ) ;
31
-
32
- const nsfs = new NamespaceFS ( {
33
- bucket_path : tmp_fs_path ,
34
- bucket_id : '1' ,
35
- namespace_resource_id : undefined ,
36
- access_mode : undefined ,
37
- versioning : 'ENABLED' ,
38
- force_md5_etag : false ,
39
- stats : endpoint_stats_collector . instance ( ) ,
40
- } ) ;
41
42
42
43
beforeEach ( async ( ) => {
43
44
await fs_utils . create_fresh_path ( tmp_fs_path ) ;
@@ -63,14 +64,9 @@ describe('test versioning concurrency', () => {
63
64
it ( 'multiple delete version id and key' , async ( ) => {
64
65
const bucket = 'bucket1' ;
65
66
const key = 'key2' ;
66
- const versions_arr = [ ] ;
67
- // upload 5 versions of key2
68
- for ( let i = 0 ; i < 5 ; i ++ ) {
69
- const random_data = Buffer . from ( String ( i ) ) ;
70
- const body = buffer_utils . buffer_to_read_stream ( random_data ) ;
71
- const res = await nsfs . upload_object ( { bucket : bucket , key : key , source_stream : body } , DUMMY_OBJECT_SDK ) . catch ( err => console . log ( 'put error - ' , err ) ) ;
72
- versions_arr . push ( res . etag ) ;
73
- }
67
+ const number_of_versions = 5 ;
68
+ const versions_arr = await _upload_versions ( bucket , key , number_of_versions ) ;
69
+
74
70
const mid_version_id = versions_arr [ 3 ] ;
75
71
const number_of_successful_operations = [ ] ;
76
72
for ( let i = 0 ; i < 15 ; i ++ ) {
@@ -81,4 +77,68 @@ describe('test versioning concurrency', () => {
81
77
await P . delay ( 1000 ) ;
82
78
expect ( number_of_successful_operations . length ) . toBe ( 15 ) ;
83
79
} ) ;
80
+
81
+ it ( 'concurrent delete of latest version' , async ( ) => {
82
+ const bucket = 'bucket1' ;
83
+ const key = 'key3' ;
84
+ const number_of_versions = 5 ;
85
+ const versions_arr = await _upload_versions ( bucket , key , number_of_versions ) ;
86
+ expect ( versions_arr . length ) . toBe ( number_of_versions ) ;
87
+
88
+ const successful_operations = [ ] ;
89
+ for ( let i = 0 ; i < 3 ; i ++ ) {
90
+ nsfs . delete_object ( { bucket : bucket , key : key } , DUMMY_OBJECT_SDK )
91
+ . then ( res => successful_operations . push ( res ) )
92
+ . catch ( err => console . log ( 'delete latest version error - ' , err ) ) ;
93
+ }
94
+
95
+ await P . delay ( 1000 ) ;
96
+ expect ( successful_operations . length ) . toBe ( 3 ) ;
97
+ const versions = await nsfs . list_object_versions ( { bucket : bucket } , DUMMY_OBJECT_SDK ) ;
98
+ expect ( versions . objects . length ) . toBe ( 8 ) ; // 5 versions before + 3 delete markers concurrent
99
+ const delete_marker_arr = versions . objects . filter ( object => object . delete_marker === true ) ;
100
+ expect ( delete_marker_arr . length ) . toBe ( 3 ) ;
101
+ } ) ;
102
+
103
+ it ( 'concurrent put object and head object latest version' , async ( ) => {
104
+ const bucket = 'bucket1' ;
105
+ const key = 'key4' ;
106
+ await _upload_versions ( bucket , key , 1 ) ;
107
+
108
+ const successful_operations = [ ] ;
109
+ const number_of_iterations = 5 ; // by changing it to 10 it sometimes fails
110
+ for ( let i = 0 ; i < number_of_iterations ; i ++ ) {
111
+ const random_data = Buffer . from ( String ( i ) ) ;
112
+ const body = buffer_utils . buffer_to_read_stream ( random_data ) ;
113
+ nsfs . upload_object ( { bucket : bucket , key : key , source_stream : body } , DUMMY_OBJECT_SDK )
114
+ . then ( res => successful_operations . push ( res ) )
115
+ . catch ( err => console . log ( 'multiple puts of the same key error - ' , err ) ) ;
116
+ nsfs . read_object_md ( { bucket : bucket , key : key } , DUMMY_OBJECT_SDK )
117
+ . then ( res => successful_operations . push ( res ) )
118
+ . catch ( err => console . log ( 'multiple heads of the same key error - ' , err ) ) ;
119
+ }
120
+ await P . delay ( 1000 ) ;
121
+ const expected_number_of_successful_operations = number_of_iterations * 2 ;
122
+ expect ( successful_operations . length ) . toBe ( expected_number_of_successful_operations ) ;
123
+ const versions = await nsfs . list_object_versions ( { bucket : bucket } , DUMMY_OBJECT_SDK ) ;
124
+ expect ( versions . objects . length ) . toBe ( number_of_iterations + 1 ) ; // 1 version before + 10 versions concurrent
125
+ } ) ;
84
126
} ) ;
127
+
128
+ /**
129
+ * _upload_versions uploads number_of_versions of key in bucket with a body of random data
130
+ * note: this function is not concurrent, it's a helper function for preparing a bucket with a couple of versions
131
+ * @param {string } bucket
132
+ * @param {string } key
133
+ * @param {number } number_of_versions
134
+ */
135
+ async function _upload_versions ( bucket , key , number_of_versions ) {
136
+ const versions_arr = [ ] ;
137
+ for ( let i = 0 ; i < number_of_versions ; i ++ ) {
138
+ const random_data = Buffer . from ( String ( i ) ) ;
139
+ const body = buffer_utils . buffer_to_read_stream ( random_data ) ;
140
+ const res = await nsfs . upload_object ( { bucket : bucket , key : key , source_stream : body } , DUMMY_OBJECT_SDK ) . catch ( err => console . log ( 'put error - ' , err ) ) ;
141
+ versions_arr . push ( res . etag ) ;
142
+ }
143
+ return versions_arr ;
144
+ }
0 commit comments