@@ -483,7 +483,7 @@ class NamespaceFS {
483
483
}
484
484
485
485
/**
486
- * @param {nb.ObjectSDK } object_sdk
486
+ * @param {nb.ObjectSDK } object_sdk
487
487
* @returns {nb.NativeFSContext }
488
488
*/
489
489
prepare_fs_context ( object_sdk ) {
@@ -1090,7 +1090,7 @@ class NamespaceFS {
1090
1090
// end the stream
1091
1091
res . end ( ) ;
1092
1092
1093
- await stream_utils . wait_finished ( res , { signal : object_sdk . abort_controller . signal } ) ;
1093
+ await stream_utils . wait_finished ( res , { readable : false , signal : object_sdk . abort_controller . signal } ) ;
1094
1094
object_sdk . throw_if_aborted ( ) ;
1095
1095
1096
1096
dbg . log0 ( 'NamespaceFS: read_object_stream completed file' , file_path , {
@@ -1209,9 +1209,7 @@ class NamespaceFS {
1209
1209
}
1210
1210
1211
1211
if ( copy_res ) {
1212
- if ( copy_res === copy_status_enum . FALLBACK ) {
1213
- params . copy_source . nsfs_copy_fallback ( ) ;
1214
- } else {
1212
+ if ( copy_res !== copy_status_enum . FALLBACK ) {
1215
1213
// open file after copy link/same inode should use read open mode
1216
1214
open_mode = config . NSFS_OPEN_READ_MODE ;
1217
1215
if ( copy_res === copy_status_enum . SAME_INODE ) open_path = file_path ;
@@ -1294,10 +1292,8 @@ class NamespaceFS {
1294
1292
let stat = await target_file . stat ( fs_context ) ;
1295
1293
this . _verify_encryption ( params . encryption , this . _get_encryption_info ( stat ) ) ;
1296
1294
1297
- // handle xattr
1298
- // assign user xattr on non copy / copy with xattr_copy header provided
1299
1295
const copy_xattr = params . copy_source && params . xattr_copy ;
1300
- let fs_xattr = copy_xattr ? undefined : to_fs_xattr ( params . xattr ) ;
1296
+ let fs_xattr = to_fs_xattr ( params . xattr ) ;
1301
1297
1302
1298
// assign noobaa internal xattr - content type, md5, versioning xattr
1303
1299
if ( params . content_type ) {
@@ -1511,7 +1507,7 @@ class NamespaceFS {
1511
1507
// Can be finetuned further on if needed and inserting the Semaphore logic inside
1512
1508
// Instead of wrapping the whole _upload_stream function (q_buffers lives outside of the data scope of the stream)
1513
1509
async _upload_stream ( { fs_context, params, target_file, object_sdk, offset } ) {
1514
- const { source_stream } = params ;
1510
+ const { source_stream, copy_source } = params ;
1515
1511
try {
1516
1512
// Not using async iterators with ReadableStreams due to unsettled promises issues on abort/destroy
1517
1513
const md5_enabled = this . _is_force_md5_enabled ( object_sdk ) ;
@@ -1526,8 +1522,12 @@ class NamespaceFS {
1526
1522
large_buf_size : multi_buffer_pool . get_buffers_pool ( undefined ) . buf_size
1527
1523
} ) ;
1528
1524
chunk_fs . on ( 'error' , err1 => dbg . error ( 'namespace_fs._upload_stream: error occured on stream ChunkFS: ' , err1 ) ) ;
1529
- await stream_utils . pipeline ( [ source_stream , chunk_fs ] ) ;
1530
- await stream_utils . wait_finished ( chunk_fs ) ;
1525
+ if ( copy_source ) {
1526
+ await this . read_object_stream ( copy_source , object_sdk , chunk_fs ) ;
1527
+ } else {
1528
+ await stream_utils . pipeline ( [ source_stream , chunk_fs ] ) ;
1529
+ await stream_utils . wait_finished ( chunk_fs ) ;
1530
+ }
1531
1531
return { digest : chunk_fs . digest , total_bytes : chunk_fs . total_bytes } ;
1532
1532
} catch ( error ) {
1533
1533
dbg . error ( '_upload_stream had error: ' , error ) ;
@@ -1813,6 +1813,7 @@ class NamespaceFS {
1813
1813
upload_params . params . xattr = create_params_parsed . xattr ;
1814
1814
upload_params . params . storage_class = create_params_parsed . storage_class ;
1815
1815
upload_params . digest = MD5Async && ( ( ( await MD5Async . digest ( ) ) . toString ( 'hex' ) ) + '-' + multiparts . length ) ;
1816
+ upload_params . params . content_type = create_params_parsed . content_type ;
1816
1817
1817
1818
const upload_info = await this . _finish_upload ( upload_params ) ;
1818
1819
0 commit comments