Skip to content

Commit 0c05f9f

Browse files
authored
Add temporary SendTransactionAsync to PublicTransactionPoolAPI (ethereum#32)
1 parent a5d3e90 commit 0c05f9f

File tree

1 file changed

+111
-0
lines changed

1 file changed

+111
-0
lines changed

internal/ethapi/api.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ import (
2222
"encoding/json"
2323
"fmt"
2424
"math/big"
25+
"net/http"
2526
"strings"
27+
"sync"
2628
"time"
2729

2830
"github.com/ethereum/ethash"
@@ -296,6 +298,115 @@ func (s *PrivateAccountAPI) SendTransaction(ctx context.Context, args SendTxArgs
296298
return submitTransaction(ctx, s.b, tx, signature, isPrivate)
297299
}
298300

301+
// Please note: This is a temporary integration to improve performance in low-latency
302+
// environments when sending many private transactions. It will be removed at a later
303+
// date when account management is handled outside Ethereum.
304+
305+
type AsyncSendTxArgs struct {
306+
SendTxArgs
307+
CallbackUrl string `json:"callbackUrl"`
308+
}
309+
310+
type AsyncResult struct {
311+
TxHash common.Hash `json:"txHash"`
312+
Error string `json:"error"`
313+
}
314+
315+
type argsAndPayload struct {
316+
args AsyncSendTxArgs
317+
b []byte
318+
}
319+
320+
type Async struct {
321+
sync.Mutex
322+
sem chan struct{}
323+
pool []*argsAndPayload
324+
}
325+
326+
func (a *Async) send(ctx context.Context, s *PublicTransactionPoolAPI, asyncArgs AsyncSendTxArgs) {
327+
res := new(AsyncResult)
328+
defer func() {
329+
buf := new(bytes.Buffer)
330+
err := json.NewEncoder(buf).Encode(res)
331+
if err != nil {
332+
glog.V(logger.Info).Infof("Error encoding callback JSON: %v", err)
333+
return
334+
}
335+
_, err = http.Post(asyncArgs.CallbackUrl, "application/json", buf)
336+
if err != nil {
337+
glog.V(logger.Info).Infof("Error sending callback: %v", err)
338+
return
339+
}
340+
}()
341+
args, err := prepareSendTxArgs(ctx, asyncArgs.SendTxArgs, s.b)
342+
if err != nil {
343+
glog.V(logger.Info).Infof("Async.send: Error doing prepareSendTxArgs: %v", err)
344+
res.Error = err.Error()
345+
return
346+
}
347+
b, err := private.P.Send(common.FromHex(args.Data), args.PrivateFrom, args.PrivateFor)
348+
if err != nil {
349+
glog.V(logger.Info).Infof("Error running Private.P.Send: %v", err)
350+
res.Error = err.Error()
351+
return
352+
}
353+
res.TxHash, err = a.save(ctx, s, args, b)
354+
if err != nil {
355+
res.Error = err.Error()
356+
}
357+
}
358+
359+
func (a *Async) save(ctx context.Context, s *PublicTransactionPoolAPI, args SendTxArgs, data []byte) (common.Hash, error) {
360+
a.Lock()
361+
defer a.Unlock()
362+
if args.Nonce == nil {
363+
nonce, err := s.b.GetPoolNonce(ctx, args.From)
364+
if err != nil {
365+
return common.Hash{}, err
366+
}
367+
args.Nonce = rpc.NewHexNumber(nonce)
368+
}
369+
var tx *types.Transaction
370+
if args.To == nil {
371+
tx = types.NewContractCreation(args.Nonce.Uint64(), args.Value.BigInt(), args.Gas.BigInt(), args.GasPrice.BigInt(), data)
372+
} else {
373+
tx = types.NewTransaction(args.Nonce.Uint64(), *args.To, args.Value.BigInt(), args.Gas.BigInt(), args.GasPrice.BigInt(), data)
374+
}
375+
signature, err := s.b.AccountManager().SignEthereum(args.From, tx.SigHash().Bytes())
376+
if err != nil {
377+
return common.Hash{}, err
378+
}
379+
return submitTransaction(ctx, s.b, tx, signature, len(args.PrivateFor) > 0)
380+
}
381+
382+
func newAsync(n int) *Async {
383+
a := &Async{
384+
sem: make(chan struct{}, n),
385+
}
386+
return a
387+
}
388+
389+
var async = newAsync(100)
390+
391+
// SendTransactionAsync creates a transaction for the given argument, signs it, and
392+
// submits it to the transaction pool. This call returns immediately to allow sending
393+
// many private transactions/bursts of transactions without waiting for the recipient
394+
// parties to confirm receipt of the encrypted payloads. An optional callbackUrl may
395+
// be specified--when a transaction is submitted to the transaction pool, it will be
396+
// called with a POST request containing either {"error": "error message"} or
397+
// {"txHash": "0x..."}.
398+
//
399+
// Please note: This is a temporary integration to improve performance in low-latency
400+
// environments when sending many private transactions. It will be removed at a later
401+
// date when account management is handled outside Ethereum.
402+
func (s *PublicTransactionPoolAPI) SendTransactionAsync(ctx context.Context, args AsyncSendTxArgs) {
403+
async.sem <- struct{}{}
404+
go func() {
405+
async.send(ctx, s, args)
406+
<-async.sem
407+
}()
408+
}
409+
299410
// signHash is a helper function that calculates a hash for the given message that can be
300411
// safely used to calculate a signature from. The hash is calulcated with:
301412
// keccak256("\x19Ethereum Signed Message:\n"${message length}${message}).

0 commit comments

Comments
 (0)