@@ -488,7 +488,7 @@ const (
488
488
)
489
489
490
490
func (js * jetStream ) CreateObjectStore (ctx context.Context , cfg ObjectStoreConfig ) (ObjectStore , error ) {
491
- scfg , err := js .prepareObjectStoreConfig (ctx , cfg )
491
+ scfg , err := js .prepareObjectStoreConfig (cfg )
492
492
if err != nil {
493
493
return nil , err
494
494
}
@@ -511,7 +511,7 @@ func (js *jetStream) CreateObjectStore(ctx context.Context, cfg ObjectStoreConfi
511
511
}
512
512
513
513
func (js * jetStream ) UpdateObjectStore (ctx context.Context , cfg ObjectStoreConfig ) (ObjectStore , error ) {
514
- scfg , err := js .prepareObjectStoreConfig (ctx , cfg )
514
+ scfg , err := js .prepareObjectStoreConfig (cfg )
515
515
if err != nil {
516
516
return nil , err
517
517
}
@@ -533,7 +533,7 @@ func (js *jetStream) UpdateObjectStore(ctx context.Context, cfg ObjectStoreConfi
533
533
}
534
534
535
535
func (js * jetStream ) CreateOrUpdateObjectStore (ctx context.Context , cfg ObjectStoreConfig ) (ObjectStore , error ) {
536
- scfg , err := js .prepareObjectStoreConfig (ctx , cfg )
536
+ scfg , err := js .prepareObjectStoreConfig (cfg )
537
537
if err != nil {
538
538
return nil , err
539
539
}
@@ -550,7 +550,7 @@ func (js *jetStream) CreateOrUpdateObjectStore(ctx context.Context, cfg ObjectSt
550
550
return mapStreamToObjectStore (js , pushJS , cfg .Bucket , stream ), nil
551
551
}
552
552
553
- func (js * jetStream ) prepareObjectStoreConfig (ctx context. Context , cfg ObjectStoreConfig ) (StreamConfig , error ) {
553
+ func (js * jetStream ) prepareObjectStoreConfig (cfg ObjectStoreConfig ) (StreamConfig , error ) {
554
554
if ! validBucketRe .MatchString (cfg .Bucket ) {
555
555
return StreamConfig {}, ErrInvalidStoreName
556
556
}
@@ -824,6 +824,7 @@ func (info *ObjectInfo) isLink() bool {
824
824
825
825
// Get will pull the object from the underlying stream.
826
826
func (obs * obs ) Get (ctx context.Context , name string , opts ... GetObjectOpt ) (ObjectResult , error ) {
827
+ ctx , cancel := obs .js .wrapContextWithoutDeadline (ctx )
827
828
var o getObjectOpts
828
829
for _ , opt := range opts {
829
830
if opt != nil {
@@ -933,10 +934,15 @@ func (obs *obs) Get(ctx context.Context, name string, opts ...GetObjectOpt) (Obj
933
934
nats .Context (ctx ),
934
935
nats .BindStream (streamName ),
935
936
}
936
- _ , err = obs .pushJS .Subscribe (chunkSubj , processChunk , subscribeOpts ... )
937
+ sub , err : = obs .pushJS .Subscribe (chunkSubj , processChunk , subscribeOpts ... )
937
938
if err != nil {
938
939
return nil , err
939
940
}
941
+ sub .SetClosedHandler (func (subject string ) {
942
+ if cancel != nil {
943
+ cancel ()
944
+ }
945
+ })
940
946
941
947
return result , nil
942
948
}
0 commit comments