@@ -19,6 +19,7 @@ package rpc
19
19
import (
20
20
"context"
21
21
"encoding/json"
22
+ "fmt"
22
23
"reflect"
23
24
"strconv"
24
25
"strings"
@@ -34,21 +35,20 @@ import (
34
35
//
35
36
// The entry points for incoming messages are:
36
37
//
37
- // h.handleMsg(message)
38
- // h.handleBatch(message)
38
+ // h.handleMsg(message)
39
+ // h.handleBatch(message)
39
40
//
40
41
// Outgoing calls use the requestOp struct. Register the request before sending it
41
42
// on the connection:
42
43
//
43
- // op := &requestOp{ids: ...}
44
- // h.addRequestOp(op)
44
+ // op := &requestOp{ids: ...}
45
+ // h.addRequestOp(op)
45
46
//
46
47
// Now send the request, then wait for the reply to be delivered through handleMsg:
47
48
//
48
- // if err := op.wait(...); err != nil {
49
- // h.removeRequestOp(op) // timeout, etc.
50
- // }
51
- //
49
+ // if err := op.wait(...); err != nil {
50
+ // h.removeRequestOp(op) // timeout, etc.
51
+ // }
52
52
type handler struct {
53
53
reg * serviceRegistry
54
54
unsubscribeCb * callback
@@ -92,6 +92,8 @@ func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *
92
92
return h
93
93
}
94
94
95
+ const maxBatchResponseSize int = 10_000_000 // 10MB
96
+
95
97
// handleBatch executes all messages in a batch and returns the responses.
96
98
func (h * handler ) handleBatch (msgs []* jsonrpcMessage ) {
97
99
// Emit error response for empty batches:
@@ -114,10 +116,21 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
114
116
}
115
117
// Process calls on a goroutine because they may block indefinitely:
116
118
h .startCallProc (func (cp * callProc ) {
117
- answers := make ([]* jsonrpcMessage , 0 , len (msgs ))
119
+ answers := make ([]json.RawMessage , 0 , len (msgs ))
120
+ var totalSize int
118
121
for _ , msg := range calls {
119
122
if answer := h .handleCallMsg (cp , msg ); answer != nil {
120
- answers = append (answers , answer )
123
+ serialized , err := json .Marshal (answer )
124
+ if err != nil {
125
+ h .conn .writeJSON (cp .ctx , errorMessage (& parseError {"error serializing response: " + err .Error ()}))
126
+ return
127
+ }
128
+ totalSize += len (serialized )
129
+ if totalSize > maxBatchResponseSize {
130
+ h .conn .writeJSON (cp .ctx , errorMessage (& invalidRequestError {fmt .Sprintf ("batch response exceeded limit of %v bytes" , maxBatchResponseSize )}))
131
+ return
132
+ }
133
+ answers = append (answers , serialized )
121
134
}
122
135
}
123
136
h .addSubscriptions (cp .notifiers )
0 commit comments