Skip to content

Commit 6deb28f

Browse files
committed
Simplified gRPC streams helpers
1 parent 4a4d1fd commit 6deb28f

File tree

3 files changed

+38
-23
lines changed

3 files changed

+38
-23
lines changed

arduino/utils/stream.go

+29-5
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,39 @@
1616
package utils
1717

1818
import (
19-
"context"
2019
"io"
20+
"sync"
2121
"time"
2222

2323
"github.com/djherbis/buffer"
2424
"github.com/djherbis/nio/v3"
2525
)
2626

27+
// implWriteCloser is an helper struct to implement an anonymous io.WriteCloser
28+
type implWriteCloser struct {
29+
write func(buff []byte) (int, error)
30+
close func() error
31+
}
32+
33+
func (w *implWriteCloser) Write(buff []byte) (int, error) {
34+
return w.write(buff)
35+
}
36+
37+
func (w *implWriteCloser) Close() error {
38+
return w.close()
39+
}
40+
2741
// FeedStreamTo creates a pipe to pass data to the writer function.
2842
// FeedStreamTo returns the io.WriteCloser side of the pipe, on which the user can write data.
2943
// The user must call Close() on the returned io.WriteCloser to release all the resources.
3044
// If needed, the context can be used to detect when all the data has been processed after
3145
// closing the writer.
32-
func FeedStreamTo(writer func(data []byte)) (io.WriteCloser, context.Context) {
33-
ctx, cancel := context.WithCancel(context.Background())
46+
func FeedStreamTo(writer func(data []byte)) io.WriteCloser {
3447
r, w := nio.Pipe(buffer.New(32 * 1024))
48+
var wg sync.WaitGroup
49+
wg.Add(1)
3550
go func() {
36-
defer cancel()
51+
defer wg.Done()
3752
data := make([]byte, 16384)
3853
for {
3954
if n, err := r.Read(data); err == nil {
@@ -50,7 +65,16 @@ func FeedStreamTo(writer func(data []byte)) (io.WriteCloser, context.Context) {
5065
}
5166
}
5267
}()
53-
return w, ctx
68+
return &implWriteCloser{
69+
write: w.Write,
70+
close: func() error {
71+
if err := w.Close(); err != nil {
72+
return err
73+
}
74+
wg.Wait()
75+
return nil
76+
},
77+
}
5478
}
5579

5680
// ConsumeStreamFrom creates a pipe to consume data from the reader function.

commands/daemon/daemon.go

+8-16
Original file line numberDiff line numberDiff line change
@@ -259,16 +259,14 @@ func (s *ArduinoCoreServerImpl) LoadSketch(ctx context.Context, req *rpc.LoadSke
259259

260260
// Compile FIXMEDOC
261261
func (s *ArduinoCoreServerImpl) Compile(req *rpc.CompileRequest, stream rpc.ArduinoCoreService_CompileServer) error {
262-
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{OutStream: data}) })
263-
errStream, errCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{ErrStream: data}) })
262+
outStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{OutStream: data}) })
263+
errStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{ErrStream: data}) })
264264
compileResp, compileErr := compile.Compile(
265265
stream.Context(), req, outStream, errStream,
266266
func(p *rpc.TaskProgress) { stream.Send(&rpc.CompileResponse{Progress: p}) },
267267
false) // Set debug to false
268268
outStream.Close()
269269
errStream.Close()
270-
<-outCtx.Done()
271-
<-errCtx.Done()
272270
var compileRespSendErr error
273271
if compileResp != nil {
274272
compileRespSendErr = stream.Send(compileResp)
@@ -346,31 +344,27 @@ func (s *ArduinoCoreServerImpl) PlatformList(ctx context.Context, req *rpc.Platf
346344

347345
// Upload FIXMEDOC
348346
func (s *ArduinoCoreServerImpl) Upload(req *rpc.UploadRequest, stream rpc.ArduinoCoreService_UploadServer) error {
349-
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{OutStream: data}) })
350-
errStream, errCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{ErrStream: data}) })
347+
outStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{OutStream: data}) })
348+
errStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{ErrStream: data}) })
351349
resp, err := upload.Upload(stream.Context(), req, outStream, errStream)
352350
outStream.Close()
353351
errStream.Close()
354352
if err != nil {
355353
return convertErrorToRPCStatus(err)
356354
}
357-
<-outCtx.Done()
358-
<-errCtx.Done()
359355
return stream.Send(resp)
360356
}
361357

362358
// UploadUsingProgrammer FIXMEDOC
363359
func (s *ArduinoCoreServerImpl) UploadUsingProgrammer(req *rpc.UploadUsingProgrammerRequest, stream rpc.ArduinoCoreService_UploadUsingProgrammerServer) error {
364-
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{OutStream: data}) })
365-
errStream, errCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{ErrStream: data}) })
360+
outStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{OutStream: data}) })
361+
errStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{ErrStream: data}) })
366362
resp, err := upload.UsingProgrammer(stream.Context(), req, outStream, errStream)
367363
outStream.Close()
368364
errStream.Close()
369365
if err != nil {
370366
return convertErrorToRPCStatus(err)
371367
}
372-
<-outCtx.Done()
373-
<-errCtx.Done()
374368
return stream.Send(resp)
375369
}
376370

@@ -382,16 +376,14 @@ func (s *ArduinoCoreServerImpl) SupportedUserFields(ctx context.Context, req *rp
382376

383377
// BurnBootloader FIXMEDOC
384378
func (s *ArduinoCoreServerImpl) BurnBootloader(req *rpc.BurnBootloaderRequest, stream rpc.ArduinoCoreService_BurnBootloaderServer) error {
385-
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{OutStream: data}) })
386-
errStream, errCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{ErrStream: data}) })
379+
outStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{OutStream: data}) })
380+
errStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{ErrStream: data}) })
387381
resp, err := upload.BurnBootloader(stream.Context(), req, outStream, errStream)
388382
outStream.Close()
389383
errStream.Close()
390384
if err != nil {
391385
return convertErrorToRPCStatus(err)
392386
}
393-
<-outCtx.Done()
394-
<-errCtx.Done()
395387
return stream.Send(resp)
396388
}
397389

commands/daemon/debug.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (s *DebugService) Debug(stream dbg.DebugService_DebugServer) error {
5050
// Launch debug recipe attaching stdin and out to grpc streaming
5151
signalChan := make(chan os.Signal)
5252
defer close(signalChan)
53-
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&dbg.DebugResponse{Data: data}) })
53+
outStream := utils.FeedStreamTo(func(data []byte) { stream.Send(&dbg.DebugResponse{Data: data}) })
5454
resp, debugErr := cmd.Debug(stream.Context(), req,
5555
utils.ConsumeStreamFrom(func() ([]byte, error) {
5656
command, err := stream.Recv()
@@ -65,7 +65,6 @@ func (s *DebugService) Debug(stream dbg.DebugService_DebugServer) error {
6565
if debugErr != nil {
6666
return debugErr
6767
}
68-
<-outCtx.Done()
6968
return stream.Send(resp)
7069
}
7170

0 commit comments

Comments
 (0)