@@ -483,7 +483,7 @@ class NamespaceFS {
483
483
}
484
484
485
485
/**
486
- * @param {nb.ObjectSDK } object_sdk
486
+ * @param {nb.ObjectSDK } object_sdk
487
487
* @returns {nb.NativeFSContext }
488
488
*/
489
489
prepare_fs_context ( object_sdk ) {
@@ -1090,7 +1090,7 @@ class NamespaceFS {
1090
1090
// end the stream
1091
1091
res . end ( ) ;
1092
1092
1093
- await stream_utils . wait_finished ( res , { signal : object_sdk . abort_controller . signal } ) ;
1093
+ await stream_utils . wait_finished ( res , { readable : false , signal : object_sdk . abort_controller . signal } ) ;
1094
1094
object_sdk . throw_if_aborted ( ) ;
1095
1095
1096
1096
dbg . log0 ( 'NamespaceFS: read_object_stream completed file' , file_path , {
@@ -1209,9 +1209,7 @@ class NamespaceFS {
1209
1209
}
1210
1210
1211
1211
if ( copy_res ) {
1212
- if ( copy_res === copy_status_enum . FALLBACK ) {
1213
- params . copy_source . nsfs_copy_fallback ( ) ;
1214
- } else {
1212
+ if ( copy_res !== copy_status_enum . FALLBACK ) {
1215
1213
// open file after copy link/same inode should use read open mode
1216
1214
open_mode = config . NSFS_OPEN_READ_MODE ;
1217
1215
if ( copy_res === copy_status_enum . SAME_INODE ) open_path = file_path ;
@@ -1294,10 +1292,8 @@ class NamespaceFS {
1294
1292
let stat = await target_file . stat ( fs_context ) ;
1295
1293
this . _verify_encryption ( params . encryption , this . _get_encryption_info ( stat ) ) ;
1296
1294
1297
- // handle xattr
1298
- // assign user xattr on non copy / copy with xattr_copy header provided
1299
1295
const copy_xattr = params . copy_source && params . xattr_copy ;
1300
- let fs_xattr = copy_xattr ? undefined : to_fs_xattr ( params . xattr ) ;
1296
+ let fs_xattr = to_fs_xattr ( params . xattr ) ;
1301
1297
1302
1298
// assign noobaa internal xattr - content type, md5, versioning xattr
1303
1299
if ( params . content_type ) {
@@ -1510,7 +1506,7 @@ class NamespaceFS {
1510
1506
// Can be finetuned further on if needed and inserting the Semaphore logic inside
1511
1507
// Instead of wrapping the whole _upload_stream function (q_buffers lives outside of the data scope of the stream)
1512
1508
async _upload_stream ( { fs_context, params, target_file, object_sdk, offset } ) {
1513
- const { source_stream } = params ;
1509
+ const { source_stream, copy_source } = params ;
1514
1510
try {
1515
1511
// Not using async iterators with ReadableStreams due to unsettled promises issues on abort/destroy
1516
1512
const md5_enabled = this . _is_force_md5_enabled ( object_sdk ) ;
@@ -1525,8 +1521,12 @@ class NamespaceFS {
1525
1521
large_buf_size : multi_buffer_pool . get_buffers_pool ( undefined ) . buf_size
1526
1522
} ) ;
1527
1523
chunk_fs . on ( 'error' , err1 => dbg . error ( 'namespace_fs._upload_stream: error occured on stream ChunkFS: ' , err1 ) ) ;
1528
- await stream_utils . pipeline ( [ source_stream , chunk_fs ] ) ;
1529
- await stream_utils . wait_finished ( chunk_fs ) ;
1524
+ if ( copy_source ) {
1525
+ await this . read_object_stream ( copy_source , object_sdk , chunk_fs ) ;
1526
+ } else {
1527
+ await stream_utils . pipeline ( [ source_stream , chunk_fs ] ) ;
1528
+ await stream_utils . wait_finished ( chunk_fs ) ;
1529
+ }
1530
1530
return { digest : chunk_fs . digest , total_bytes : chunk_fs . total_bytes } ;
1531
1531
} catch ( error ) {
1532
1532
dbg . error ( '_upload_stream had error: ' , error ) ;
@@ -1812,6 +1812,7 @@ class NamespaceFS {
1812
1812
upload_params . params . xattr = create_params_parsed . xattr ;
1813
1813
upload_params . params . storage_class = create_params_parsed . storage_class ;
1814
1814
upload_params . digest = MD5Async && ( ( ( await MD5Async . digest ( ) ) . toString ( 'hex' ) ) + '-' + multiparts . length ) ;
1815
+ upload_params . params . content_type = create_params_parsed . content_type ;
1815
1816
1816
1817
const upload_info = await this . _finish_upload ( upload_params ) ;
1817
1818
0 commit comments