diff --git a/exp/lighthorizon/actions/apidocs.go b/exp/lighthorizon/actions/apidocs.go new file mode 100644 index 0000000000..d30e74f588 --- /dev/null +++ b/exp/lighthorizon/actions/apidocs.go @@ -0,0 +1,25 @@ +package actions + +import ( + "net/http" +) + +func ApiDocs() func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + r.URL.Scheme = "http" + r.URL.Host = "localhost:8080" + + if r.Method != "GET" { + sendErrorResponse(w, http.StatusMethodNotAllowed, "") + return + } + + p, err := staticFiles.ReadFile("static/api_docs.yml") + if err != nil { + w.WriteHeader(http.StatusNotFound) + return + } + w.Header().Set("Content-Type", "application/openapi+yaml") + w.Write(p) + } +} diff --git a/exp/lighthorizon/actions/main.go b/exp/lighthorizon/actions/main.go new file mode 100644 index 0000000000..8a5b1f9b4c --- /dev/null +++ b/exp/lighthorizon/actions/main.go @@ -0,0 +1,94 @@ +package actions + +import ( + "embed" + "encoding/json" + "net/http" + "net/url" + "strconv" + + "github.com/stellar/go/support/log" + "github.com/stellar/go/support/render/hal" +) + +var ( + //go:embed static + staticFiles embed.FS +) + +type Order string +type ErrorMessage string + +const ( + OrderAsc Order = "asc" + OrderDesc Order = "desc" +) + +const ( + ServerError ErrorMessage = "Error: A problem occurred on the server while processing request" + InvalidPagingParameters ErrorMessage = "Error: Invalid paging parameters" +) + +type Pagination struct { + Limit int64 + Cursor int64 + Order +} + +func sendPageResponse(w http.ResponseWriter, page hal.Page) { + encoder := json.NewEncoder(w) + encoder.SetIndent("", " ") + err := encoder.Encode(page) + if err != nil { + log.Error(err) + sendErrorResponse(w, http.StatusInternalServerError, "") + } +} + +func sendErrorResponse(w http.ResponseWriter, errorCode int, errorMsg string) { + if errorMsg != "" { + http.Error(w, errorMsg, errorCode) + } else { + http.Error(w, string(ServerError), errorCode) + } +} + +func RequestUnaryParam(r *http.Request, paramName string) (string, error) { + query, err := url.ParseQuery(r.URL.RawQuery) + if err != nil { + return "", err + } + return query.Get(paramName), nil +} + +func Paging(r *http.Request) (Pagination, error) { + paginate := Pagination{ + Order: OrderAsc, + } + + if cursorRequested, err := RequestUnaryParam(r, "cursor"); err != nil { + return Pagination{}, err + } else if cursorRequested != "" { + paginate.Cursor, err = strconv.ParseInt(cursorRequested, 10, 64) + if err != nil { + return Pagination{}, err + } + } + + if limitRequested, err := RequestUnaryParam(r, "limit"); err != nil { + return Pagination{}, err + } else if limitRequested != "" { + paginate.Limit, err = strconv.ParseInt(limitRequested, 10, 64) + if err != nil { + return Pagination{}, err + } + } + + if orderRequested, err := RequestUnaryParam(r, "order"); err != nil { + return Pagination{}, err + } else if orderRequested != "" && orderRequested == string(OrderDesc) { + paginate.Order = OrderDesc + } + + return paginate, nil +} diff --git a/exp/lighthorizon/actions/operation.go b/exp/lighthorizon/actions/operation.go index d95cfda0b8..6c64b89d3f 100644 --- a/exp/lighthorizon/actions/operation.go +++ b/exp/lighthorizon/actions/operation.go @@ -1,11 +1,9 @@ package actions import ( - "encoding/json" - "fmt" + "github.com/stellar/go/support/log" "io" "net/http" - "net/url" "strconv" "github.com/stellar/go/exp/lighthorizon/adapters" @@ -26,80 +24,66 @@ func Operations(archiveWrapper archive.Wrapper, indexStore index.Store) func(htt return } - query, err := url.ParseQuery(r.URL.RawQuery) + paginate, err := Paging(r) if err != nil { - fmt.Fprintf(w, "Error: %v", err) + sendErrorResponse(w, http.StatusBadRequest, string(InvalidPagingParameters)) return } - var cursor int64 - if query.Get("cursor") == "" { - cursor = toid.New(1, 1, 1).ToInt64() - } else { - cursor, err = strconv.ParseInt(query.Get("cursor"), 10, 64) - if err != nil { - fmt.Fprintf(w, "Error: %v", err) - return - } + if paginate.Cursor < 1 { + paginate.Cursor = toid.New(1, 1, 1).ToInt64() } - var limit int64 - if query.Get("limit") == "" { - limit = 10 - } else { - limit, err = strconv.ParseInt(query.Get("limit"), 10, 64) - if err != nil { - fmt.Fprintf(w, "Error: %v", err) - return - } - } - - if limit == 0 || limit > 200 { - limit = 10 + if paginate.Limit < 1 || paginate.Limit > 200 { + paginate.Limit = 10 } page := hal.Page{ - Cursor: query.Get("cursor"), - Order: "asc", - Limit: uint64(limit), + Cursor: strconv.FormatInt(paginate.Cursor, 10), + Order: string(paginate.Order), + Limit: uint64(paginate.Limit), } page.Init() page.FullURL = r.URL // For now, use a query param for now to avoid dragging in chi-router. Not // really the point of the experiment yet. - account := query.Get("account") + account, err := RequestUnaryParam(r, "account") + if err != nil { + log.Error(err) + sendErrorResponse(w, http.StatusInternalServerError, "") + return + } + if account != "" { // Skip the cursor ahead to the next active checkpoint for this account - checkpoint, err := indexStore.NextActive(account, "all_all", uint32(toid.Parse(cursor).LedgerSequence/64)) + var checkpoint uint32 + checkpoint, err = indexStore.NextActive(account, "all/all", uint32(toid.Parse(paginate.Cursor).LedgerSequence/64)) if err == io.EOF { // never active. No results. page.PopulateLinks() - - encoder := json.NewEncoder(w) - encoder.SetIndent("", " ") - err = encoder.Encode(page) - if err != nil { - fmt.Fprintf(w, "Error: %v", err) - return - } + sendPageResponse(w, page) return } else if err != nil { - fmt.Fprintf(w, "Error: %v", err) + log.Error(err) + sendErrorResponse(w, http.StatusInternalServerError, "") return } ledger := int32(checkpoint * 64) if ledger < 0 { // Check we don't overflow going from uint32 -> int32 - fmt.Fprintf(w, "Error: Ledger overflow") + log.Error(err) + sendErrorResponse(w, http.StatusInternalServerError, "") return } - cursor = toid.New(ledger, 1, 1).ToInt64() + paginate.Cursor = toid.New(ledger, 1, 1).ToInt64() } - ops, err := archiveWrapper.GetOperations(cursor, limit) + //TODO - implement paginate.Order(asc/desc) + ops, err := archiveWrapper.GetOperations(r.Context(), paginate.Cursor, paginate.Limit) if err != nil { - fmt.Fprintf(w, "Error: %v", err) + log.Error(err) + sendErrorResponse(w, http.StatusInternalServerError, "") return } @@ -107,7 +91,8 @@ func Operations(archiveWrapper archive.Wrapper, indexStore index.Store) func(htt var response operations.Operation response, err = adapters.PopulateOperation(r, &op) if err != nil { - fmt.Fprintf(w, "Error: %v", err) + log.Error(err) + sendErrorResponse(w, http.StatusInternalServerError, "") return } @@ -115,13 +100,6 @@ func Operations(archiveWrapper archive.Wrapper, indexStore index.Store) func(htt } page.PopulateLinks() - - encoder := json.NewEncoder(w) - encoder.SetIndent("", " ") - err = encoder.Encode(page) - if err != nil { - fmt.Fprintf(w, "Error: %v", err) - return - } + sendPageResponse(w, page) } } diff --git a/exp/lighthorizon/actions/static/api_docs.yml b/exp/lighthorizon/actions/static/api_docs.yml new file mode 100644 index 0000000000..6bc7eb5e9c --- /dev/null +++ b/exp/lighthorizon/actions/static/api_docs.yml @@ -0,0 +1,243 @@ +openapi: 3.0.3 +info: + title: Horizon Light API + version: 0.0.1 + description: |- + The Light API is a published web service on port parameter specified when running Horizon Light `--port=`. +servers: + - url: http://localhost:8080/ +paths: + /transactions: + get: + responses: + '200': + description: OK + headers: {} + content: + application/json: + schema: + $ref: '#/components/schemas/CollectionModel_Tx' + example: + _links: + self: + href: http://localhost:8080/transactions?cursor=&limit=0&order= + next: + href: http://localhost:8080/transactions?cursor=6606621773930497&limit=0&order= + prev: + href: http://localhost:8080/transactions?cursor=6606621773930497&limit=0&order=asc + _embedded: + records: + - memo: xdr.MemoText("psp:1405") + _links: + self: + href: http://localhost:8080/transactions/5fef21d5ef75ecf18d65a160cfab17dca8dbf6dbc4e2fd66a510719ad8dddb09 + account: + href: '' + ledger: + href: '' + operations: + href: '' + effects: + href: http://localhost:8080/transactions/5fef21d5ef75ecf18d65a160cfab17dca8dbf6dbc4e2fd66a510719ad8dddb09/effects + precedes: + href: http://localhost:8080/effects?order=asc&cursor=6606621773930497 + succeeds: + href: http://localhost:8080/effects?order=desc&cursor=6606621773930497 + transaction: + href: '' + id: 5fef21d5ef75ecf18d65a160cfab17dca8dbf6dbc4e2fd66a510719ad8dddb09 + paging_token: '6606621773930497' + successful: false + hash: 5fef21d5ef75ecf18d65a160cfab17dca8dbf6dbc4e2fd66a510719ad8dddb09 + ledger: 1538224 + created_at: '2022-06-17T23:29:42Z' + source_account: GCFJN22UG6IZHXKDVAJWAVEQ3NERGCRCURR2FHARNRBNLYFEQZGML4PW + source_account_sequence: '' + fee_account: '' + fee_charged: '3000' + max_fee: '0' + operation_count: 1 + envelope_xdr: AAAAAgAAAACKlutUN5GT3UOoE2BUkNtJEwoipGOinBFsQtXgpIZMxQAAJxAAE05oAAHUKAAAAAEAAAAAAAAAAAAAAABirQ6AAAAAAQAAAAhwc3A6MTQwNQAAAAEAAAAAAAAAAQAAAADpPdN37FA9KVcJfmMBuD8pPcaT5jqlrMeYEOTP36Zo2AAAAAJBVE1ZUgAAAAAAAAAAAAAAZ8rWY3iaDnWNtfpvLpNaCEbKdDjrd2gQODOuKpmj1vMAAAAAGHAagAAAAAAAAAABpIZMxQAAAEDNJwYToiBR6bzElRL4ORJdXXZYO9cE3-ishQLC_fWGrPGhWrW7_UkPJWvxWdQDJBjVOHuA1Jjc94NSe91hSwEL + result_xdr: AAAAAAAAC7j_____AAAAAQAAAAAAAAAB____-gAAAAA= + result_meta_xdr: '' + fee_meta_xdr: '' + memo_type: MemoTypeMemoText + signatures: + - pIZMxQAAAEDNJwYToiBR6bzElRL4ORJdXXZYO9cE3-ishQLC_fWGrPGhWrW7_UkPJWvxWdQDJBjVOHuA1Jjc94NSe91hSwEL + summary: Get Transactions by paged list + operationId: GetTransactions + description: Retrieve transactcions by paged listing. + tags: [] + parameters: + - $ref: '#/components/parameters/CursorParam' + - $ref: '#/components/parameters/LimitParam' + - in: query + name: id + required: false + schema: + type: string + description: The transaction ID, the hash value. + /operations: + get: + responses: + '200': + description: OK + headers: {} + content: + application/json: + schema: + $ref: '#/components/schemas/CollectionModel_Operation' + example: + _links: + self: + href: http://localhost:8080/operations?cursor=6606617478959105&limit=1&order=asc + next: + href: http://localhost:8080/operations?cursor=6606621773926401&limit=1&order=asc + prev: + href: http://localhost:8080/operations?cursor=6606621773926401&limit=1&order=desc + _embedded: + records: + - _links: + self: + href: http://localhost:8080/operations/6606621773926401 + transaction: + href: http://localhost:8080/transactions/544469b76cd90978345a4734a0ce69a9d0ddb4a6595a7afc503225a77826722a + effects: + href: http://localhost:8080/operations/6606621773926401/effects + succeeds: + href: http://localhost:8080/effects?order=desc&cursor=6606621773926401 + precedes: + href: http://localhost:8080/effects?order=asc&cursor=6606621773926401 + id: '6606621773926401' + paging_token: '6606621773926401' + transaction_successful: true + source_account: GBGTCH47BOEEKLPHHMR2GOK6KQFGL3O7Q53FIZTJ7S7YEDWYJ5IUDJDJ + type: manage_sell_offer + type_i: 3 + created_at: '2022-06-17T23:29:42Z' + transaction_hash: 544469b76cd90978345a4734a0ce69a9d0ddb4a6595a7afc503225a77826722a + amount: '0.0000000' + price: '0.0000001' + price_r: + n: 1 + d: 10000000 + buying_asset_type: credit_alphanum4 + buying_asset_code: USDV + buying_asset_issuer: GAXXMQMTDUQ4YEPXJMKFBGN3GETPJNEXEUHFCQJKGJDVI3XQCNBU3OZI + selling_asset_type: credit_alphanum4 + selling_asset_code: EURV + selling_asset_issuer: GAXXMQMTDUQ4YEPXJMKFBGN3GETPJNEXEUHFCQJKGJDVI3XQCNBU3OZI + offer_id: '425531' + summary: Get Operations by paged list + operationId: GetOperations + description: Retrieve operations by paged listing. + tags: [] + parameters: + - $ref: '#/components/parameters/CursorParam' + - $ref: '#/components/parameters/LimitParam' + - in: query + name: account + required: false + schema: + type: string + description: Get all operations that are related to this account id +components: + parameters: + CursorParam: + name: cursor + in: query + required: false + schema: + type: integer + example: 6606617478959105 + description: The packed order id consisting of Ledger Num, TX Order Num, Operation Order Num + LimitParam: + in: query + name: limit + required: false + schema: + type: integer + default: 10 + description: The numbers of items to return + schemas: + CollectionModelItem: + type: object + properties: + _embedded: + type: object + properties: + records: + type: array + items: + "$ref": "#/components/schemas/Item" + _links: + "$ref": "#/components/schemas/Links" + Item: + type: object + properties: + id: + type: string + _links: + "$ref": "#/components/schemas/Links" + CollectionModel_Tx: + type: object + allOf: + - $ref: "#/components/schemas/CollectionModelItem" + properties: + _embedded: + type: object + properties: + records: + type: array + items: + $ref: "#/components/schemas/EntityModel_Tx" + EntityModel_Tx: + type: object + allOf: + - $ref: "#/components/schemas/Tx" + - $ref: "#/components/schemas/Links" + Tx: + type: object + properties: + id: + type: string + hash: + type: string + ledger: + type: integer + CollectionModel_Operation: + type: object + allOf: + - $ref: "#/components/schemas/CollectionModelItem" + properties: + _embedded: + type: object + properties: + records: + type: array + items: + $ref: "#/components/schemas/EntityModel_Operation" + EntityModel_Operation: + type: object + allOf: + - $ref: "#/components/schemas/Operation" + - $ref: "#/components/schemas/Links" + Operation: + type: object + properties: + id: + type: string + type: + type: string + source_account: + type: string + Links: + type: object + additionalProperties: + "$ref": "#/components/schemas/Link" + Link: + type: object + properties: + href: + type: string +tags: [] diff --git a/exp/lighthorizon/actions/transaction.go b/exp/lighthorizon/actions/transaction.go index b66d60c245..a0884b5743 100644 --- a/exp/lighthorizon/actions/transaction.go +++ b/exp/lighthorizon/actions/transaction.go @@ -2,17 +2,17 @@ package actions import ( "encoding/hex" - "encoding/json" - "fmt" + "github.com/stellar/go/support/log" "io" "net/http" - "net/url" + "strconv" "github.com/stellar/go/exp/lighthorizon/adapters" "github.com/stellar/go/exp/lighthorizon/archive" "github.com/stellar/go/exp/lighthorizon/index" hProtocol "github.com/stellar/go/protocols/horizon" "github.com/stellar/go/support/render/hal" + "github.com/stellar/go/toid" ) func Transactions(archiveWrapper archive.Wrapper, indexStore index.Store) func(http.ResponseWriter, *http.Request) { @@ -22,75 +22,81 @@ func Transactions(archiveWrapper archive.Wrapper, indexStore index.Store) func(h r.URL.Host = "localhost:8080" if r.Method != "GET" { + sendErrorResponse(w, http.StatusMethodNotAllowed, "") return } - query, err := url.ParseQuery(r.URL.RawQuery) + paginate, err := Paging(r) if err != nil { - fmt.Fprintf(w, "Error: %v", err) + sendErrorResponse(w, http.StatusBadRequest, string(InvalidPagingParameters)) return } - page := hal.Page{} + if paginate.Cursor < 1 { + paginate.Cursor = toid.New(1, 1, 1).ToInt64() + } + + if paginate.Limit < 1 { + paginate.Limit = 10 + } + + page := hal.Page{ + Cursor: strconv.FormatInt(paginate.Cursor, 10), + Order: string(paginate.Order), + Limit: uint64(paginate.Limit), + } page.Init() page.FullURL = r.URL // For now, use a query param for now to avoid dragging in chi-router. Not // really the point of the experiment yet. - id := query.Get("id") - var cursor int64 - if id != "" { - b, err := hex.DecodeString(id) + txId, err := RequestUnaryParam(r, "id") + if err != nil { + log.Error(err) + sendErrorResponse(w, http.StatusInternalServerError, "") + return + } + + if txId != "" { + // if 'id' is on request, it overrides any paging cursor that may be on request. + var b []byte + b, err = hex.DecodeString(txId) if err != nil { - fmt.Fprintf(w, "Error: %v", err) + sendErrorResponse(w, http.StatusBadRequest, "Invalid transaction id request parameter, not valid hex encoding") return } if len(b) != 32 { - fmt.Fprintf(w, "Error: invalid hash") + sendErrorResponse(w, http.StatusBadRequest, "Invalid transaction id request parameter, the encoded hex value must decode to length of 32 bytes") return } var hash [32]byte copy(hash[:], b) - // Skip the cursor ahead to the next active checkpoint for this account - txnToid, err := indexStore.TransactionTOID(hash) + + if paginate.Cursor, err = indexStore.TransactionTOID(hash); err != nil { + log.Error(err) + sendErrorResponse(w, http.StatusInternalServerError, "") + } if err == io.EOF { - // never active. No results. page.PopulateLinks() - - encoder := json.NewEncoder(w) - encoder.SetIndent("", " ") - err = encoder.Encode(page) - if err != nil { - fmt.Fprintf(w, "Error: %v", err) - return - } - return - } else if err != nil { - fmt.Fprintf(w, "Error: %v", err) + sendPageResponse(w, page) return } - cursor = txnToid } - txns, err := archiveWrapper.GetTransactions(cursor, 1) + //TODO - implement paginate.Order(asc/desc) + txns, err := archiveWrapper.GetTransactions(r.Context(), paginate.Cursor, paginate.Limit) if err != nil { - fmt.Fprintf(w, "Error: %v", err) + log.Error(err) + sendErrorResponse(w, http.StatusInternalServerError, "") return } for _, txn := range txns { - hash, err := txn.TransactionHash() - if err != nil { - fmt.Fprintf(w, "Error: %v", err) - return - } - if id != "" && hash != id { - continue - } var response hProtocol.Transaction response, err = adapters.PopulateTransaction(r, &txn) if err != nil { - fmt.Fprintf(w, "Error: %v", err) + log.Error(err) + sendErrorResponse(w, http.StatusInternalServerError, "") return } @@ -98,13 +104,6 @@ func Transactions(archiveWrapper archive.Wrapper, indexStore index.Store) func(h } page.PopulateLinks() - - encoder := json.NewEncoder(w) - encoder.SetIndent("", " ") - err = encoder.Encode(page) - if err != nil { - fmt.Fprintf(w, "Error: %v", err) - return - } + sendPageResponse(w, page) } } diff --git a/exp/lighthorizon/archive/ingest_archive.go b/exp/lighthorizon/archive/ingest_archive.go new file mode 100644 index 0000000000..b6963333db --- /dev/null +++ b/exp/lighthorizon/archive/ingest_archive.go @@ -0,0 +1,62 @@ +package archive + +import ( + "context" + + "github.com/stellar/go/ingest" + "github.com/stellar/go/ingest/ledgerbackend" + + "github.com/stellar/go/historyarchive" + "github.com/stellar/go/xdr" +) + +// This is an implementation of LightHorizon Archive that uses the existing horizon ingestion backend. +type ingestArchive struct { + *ledgerbackend.HistoryArchiveBackend +} + +func (ingestArchive) NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (LedgerTransactionReader, error) { + ingestReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase, ledgerCloseMeta) + + if err != nil { + return nil, err + } + + return &ingestTransactionReaderAdaption{ingestReader}, nil +} + +type ingestTransactionReaderAdaption struct { + *ingest.LedgerTransactionReader +} + +func (adaptation *ingestTransactionReaderAdaption) Read() (LedgerTransaction, error) { + tx := LedgerTransaction{} + ingestLedgerTransaction, err := adaptation.LedgerTransactionReader.Read() + if err != nil { + return tx, err + } + + tx.Index = ingestLedgerTransaction.Index + tx.Envelope = ingestLedgerTransaction.Envelope + tx.Result = ingestLedgerTransaction.Result + tx.FeeChanges = ingestLedgerTransaction.FeeChanges + tx.UnsafeMeta = ingestLedgerTransaction.UnsafeMeta + + return tx, nil +} + +func NewIngestArchive(sourceUrl string, networkPassphrase string) (Archive, error) { + // Simple file os access + source, err := historyarchive.ConnectBackend( + sourceUrl, + historyarchive.ConnectOptions{ + Context: context.Background(), + NetworkPassphrase: networkPassphrase, + }, + ) + if err != nil { + return nil, err + } + ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source) + return ingestArchive{ledgerBackend}, nil +} diff --git a/exp/lighthorizon/archive/main.go b/exp/lighthorizon/archive/main.go index 5002647803..03e40a4f10 100644 --- a/exp/lighthorizon/archive/main.go +++ b/exp/lighthorizon/archive/main.go @@ -5,7 +5,6 @@ import ( "io" "github.com/stellar/go/exp/lighthorizon/common" - "github.com/stellar/go/ingest" "github.com/stellar/go/support/errors" "github.com/stellar/go/support/log" "github.com/stellar/go/toid" @@ -19,11 +18,27 @@ import ( // have to download many ledgers until it's able to fill the list completely. // This can be solved by keeping an index/list of empty ledgers. // TODO: make this configurable. +//lint:ignore U1000 Ignore unused temporarily const checkpointsToLookup = 1 -// Archive here only has the methods we care about, to make caching/wrapping easier +// LightHorizon data model +type LedgerTransaction struct { + Index uint32 + Envelope xdr.TransactionEnvelope + Result xdr.TransactionResultPair + FeeChanges xdr.LedgerEntryChanges + UnsafeMeta xdr.TransactionMeta +} + +type LedgerTransactionReader interface { + Read() (LedgerTransaction, error) +} + +// Archive here only has the methods LightHorizon cares about, to make caching/wrapping easier type Archive interface { GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) + Close() error + NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (LedgerTransactionReader, error) } type Wrapper struct { @@ -31,7 +46,7 @@ type Wrapper struct { Passphrase string } -func (a *Wrapper) GetOperations(cursor int64, limit int64) ([]common.Operation, error) { +func (a *Wrapper) GetOperations(ctx context.Context, cursor int64, limit int64) ([]common.Operation, error) { parsedID := toid.Parse(cursor) ledgerSequence := uint32(parsedID.LedgerSequence) if ledgerSequence < 2 { @@ -43,16 +58,15 @@ func (a *Wrapper) GetOperations(cursor int64, limit int64) ([]common.Operation, ops := []common.Operation{} appending := false - ctx := context.Background() for { log.Debugf("Checking ledger %d", ledgerSequence) ledger, err := a.GetLedger(ctx, ledgerSequence) if err != nil { - return nil, err + return ops, nil } - reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(a.Passphrase, ledger) + reader, err := a.NewLedgerTransactionReaderFromLedgerCloseMeta(a.Passphrase, ledger) if err != nil { return nil, errors.Wrapf(err, "error in ledger %d", ledgerSequence) } @@ -67,8 +81,9 @@ func (a *Wrapper) GetOperations(cursor int64, limit int64) ([]common.Operation, return nil, err } + transactionOrder++ for operationOrder := range tx.Envelope.Operations() { - currID := toid.New(int32(ledgerSequence), transactionOrder+1, int32(operationOrder+1)).ToInt64() + currID := toid.New(int32(ledgerSequence), transactionOrder, int32(operationOrder+1)).ToInt64() if currID >= cursor { appending = true @@ -83,7 +98,7 @@ func (a *Wrapper) GetOperations(cursor int64, limit int64) ([]common.Operation, TransactionResult: &tx.Result.Result, // TODO: Use a method to get the header LedgerHeader: &ledger.V0.LedgerHeader.Header, - OpIndex: int32(operationOrder), + OpIndex: int32(operationOrder + 1), TxIndex: int32(transactionOrder), }) } @@ -92,37 +107,34 @@ func (a *Wrapper) GetOperations(cursor int64, limit int64) ([]common.Operation, return ops, nil } } - - transactionOrder++ } ledgerSequence++ } } -func (a *Wrapper) GetTransactions(cursor int64, limit int64) ([]common.Transaction, error) { +func (a *Wrapper) GetTransactions(ctx context.Context, cursor int64, limit int64) ([]common.Transaction, error) { parsedID := toid.Parse(cursor) ledgerSequence := uint32(parsedID.LedgerSequence) if ledgerSequence < 2 { ledgerSequence = 2 } - log.Debugf("Searching tx %d", cursor) + log.Debugf("Searching tx %d starting at", cursor) log.Debugf("Getting ledgers starting at %d", ledgerSequence) txns := []common.Transaction{} appending := false - ctx := context.Background() - for { log.Debugf("Checking ledger %d", ledgerSequence) ledger, err := a.GetLedger(ctx, ledgerSequence) if err != nil { - return nil, err + // no 'NotFound' distinction on err, treat all as not found. + return txns, nil } - reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(a.Passphrase, ledger) + reader, err := a.NewLedgerTransactionReaderFromLedgerCloseMeta(a.Passphrase, ledger) if err != nil { return nil, err } @@ -137,7 +149,8 @@ func (a *Wrapper) GetTransactions(cursor int64, limit int64) ([]common.Transacti return nil, err } - currID := toid.New(int32(ledgerSequence), transactionOrder+1, 1).ToInt64() + transactionOrder++ + currID := toid.New(int32(ledgerSequence), transactionOrder, 1).ToInt64() if currID >= cursor { appending = true @@ -160,7 +173,9 @@ func (a *Wrapper) GetTransactions(cursor int64, limit int64) ([]common.Transacti return txns, nil } - transactionOrder++ + if ctx.Err() != nil { + return nil, ctx.Err() + } } ledgerSequence++ diff --git a/exp/lighthorizon/archive/main_test.go b/exp/lighthorizon/archive/main_test.go new file mode 100644 index 0000000000..a4eb4bc2f5 --- /dev/null +++ b/exp/lighthorizon/archive/main_test.go @@ -0,0 +1,143 @@ +package archive + +import ( + "context" + "fmt" + "io" + "testing" + + "github.com/stellar/go/xdr" + "github.com/stretchr/testify/require" +) + +func TestItGetsSequentialOperationsForLimitBeyondEnd(tt *testing.T) { + // l=1586111, t=1, o=1 + ctx := context.Background() + cursor := int64(6812294872829953) + passphrase := "Red New England clam chowder" + archiveWrapper := Wrapper{Archive: mockArchiveFixture(ctx, passphrase), Passphrase: passphrase} + ops, err := archiveWrapper.GetOperations(ctx, cursor, 5) + require.NoError(tt, err) + require.Len(tt, ops, 3) + require.Equal(tt, ops[0].LedgerHeader.LedgerSeq, xdr.Uint32(1586111)) + require.Equal(tt, ops[0].TxIndex, int32(1)) + require.Equal(tt, ops[0].OpIndex, int32(2)) + require.Equal(tt, ops[1].LedgerHeader.LedgerSeq, xdr.Uint32(1586111)) + require.Equal(tt, ops[1].TxIndex, int32(2)) + require.Equal(tt, ops[1].OpIndex, int32(1)) + require.Equal(tt, ops[2].LedgerHeader.LedgerSeq, xdr.Uint32(1586112)) + require.Equal(tt, ops[2].TxIndex, int32(1)) + require.Equal(tt, ops[2].OpIndex, int32(1)) +} + +func TestItGetsSequentialOperationsForLimitBeforeEnd(tt *testing.T) { + // l=1586111, t=1, o=1 + ctx := context.Background() + cursor := int64(6812294872829953) + passphrase := "White New England clam chowder" + archiveWrapper := Wrapper{Archive: mockArchiveFixture(ctx, passphrase), Passphrase: passphrase} + ops, err := archiveWrapper.GetOperations(ctx, cursor, 2) + require.NoError(tt, err) + require.Len(tt, ops, 2) + require.Equal(tt, ops[0].LedgerHeader.LedgerSeq, xdr.Uint32(1586111)) + require.Equal(tt, ops[0].TxIndex, int32(1)) + require.Equal(tt, ops[0].OpIndex, int32(2)) + require.Equal(tt, ops[1].LedgerHeader.LedgerSeq, xdr.Uint32(1586111)) + require.Equal(tt, ops[1].TxIndex, int32(2)) + require.Equal(tt, ops[1].OpIndex, int32(1)) +} + +func TestItGetsSequentialTransactionsForLimitBeyondEnd(tt *testing.T) { + // l=1586111, t=1, o=1 + ctx := context.Background() + cursor := int64(6812294872829953) + passphrase := "White New England clam chowder" + archiveWrapper := Wrapper{Archive: mockArchiveFixture(ctx, passphrase), Passphrase: passphrase} + txs, err := archiveWrapper.GetTransactions(ctx, cursor, 5) + require.NoError(tt, err) + require.Len(tt, txs, 2) + require.Equal(tt, txs[0].LedgerHeader.LedgerSeq, xdr.Uint32(1586111)) + require.Equal(tt, txs[0].TxIndex, int32(2)) + require.Equal(tt, txs[1].LedgerHeader.LedgerSeq, xdr.Uint32(1586112)) + require.Equal(tt, txs[1].TxIndex, int32(1)) +} + +func TestItGetsSequentialTransactionsForLimitBeforeEnd(tt *testing.T) { + // l=1586111, t=1, o=1 + ctx := context.Background() + cursor := int64(6812294872829953) + passphrase := "White New England clam chowder" + archiveWrapper := Wrapper{Archive: mockArchiveFixture(ctx, passphrase), Passphrase: passphrase} + txs, err := archiveWrapper.GetTransactions(ctx, cursor, 1) + require.NoError(tt, err) + require.Len(tt, txs, 1) + require.Equal(tt, txs[0].LedgerHeader.LedgerSeq, xdr.Uint32(1586111)) + require.Equal(tt, txs[0].TxIndex, int32(2)) +} + +func mockArchiveFixture(ctx context.Context, passphrase string) *MockArchive { + mockArchive := &MockArchive{} + mockReaderLedger1 := &MockLedgerTransactionReader{} + mockReaderLedger2 := &MockLedgerTransactionReader{} + + expectedLedger1 := testLedger(1586111) + expectedLedger2 := testLedger(1586112) + source := xdr.MustAddress("GCXKG6RN4ONIEPCMNFB732A436Z5PNDSRLGWK7GBLCMQLIFO4S7EYWVU") + // assert results iterate sequentially across ops-tx-ledgers + expectedLedger1Transaction1 := testLedgerTx(source, 34, 34) + expectedLedger1Transaction2 := testLedgerTx(source, 34) + expectedLedger2Transaction1 := testLedgerTx(source, 34) + + mockArchive.On("GetLedger", ctx, uint32(1586111)).Return(expectedLedger1, nil) + mockArchive.On("GetLedger", ctx, uint32(1586112)).Return(expectedLedger2, nil) + mockArchive.On("GetLedger", ctx, uint32(1586113)).Return(xdr.LedgerCloseMeta{}, fmt.Errorf("ledger not found")) + mockArchive.On("NewLedgerTransactionReaderFromLedgerCloseMeta", passphrase, expectedLedger1).Return(mockReaderLedger1, nil) + mockArchive.On("NewLedgerTransactionReaderFromLedgerCloseMeta", passphrase, expectedLedger2).Return(mockReaderLedger2, nil) + mockReaderLedger1.On("Read").Return(expectedLedger1Transaction1, nil).Once() + mockReaderLedger1.On("Read").Return(expectedLedger1Transaction2, nil).Once() + mockReaderLedger1.On("Read").Return(LedgerTransaction{}, io.EOF).Once() + mockReaderLedger2.On("Read").Return(expectedLedger2Transaction1, nil).Once() + mockReaderLedger2.On("Read").Return(LedgerTransaction{}, io.EOF).Once() + return mockArchive +} + +func testLedger(seq int) xdr.LedgerCloseMeta { + return xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(seq), + }, + }, + }, + } +} + +func testLedgerTx(source xdr.AccountId, bumpTos ...int) LedgerTransaction { + + ops := []xdr.Operation{} + for _, bumpTo := range bumpTos { + ops = append(ops, xdr.Operation{ + Body: xdr.OperationBody{ + BumpSequenceOp: &xdr.BumpSequenceOp{ + BumpTo: xdr.SequenceNumber(bumpTo), + }, + }, + }) + } + + tx := LedgerTransaction{ + Envelope: xdr.TransactionEnvelope{ + Type: xdr.EnvelopeTypeEnvelopeTypeTx, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + SourceAccount: source.ToMuxedAccount(), + Fee: xdr.Uint32(1), + Operations: ops, + }, + }, + }, + } + + return tx +} diff --git a/exp/lighthorizon/archive/mock_archive.go b/exp/lighthorizon/archive/mock_archive.go new file mode 100644 index 0000000000..bdfd6b9149 --- /dev/null +++ b/exp/lighthorizon/archive/mock_archive.go @@ -0,0 +1,36 @@ +package archive + +import ( + "context" + + "github.com/stellar/go/xdr" + "github.com/stretchr/testify/mock" +) + +type MockLedgerTransactionReader struct { + mock.Mock +} + +func (m *MockLedgerTransactionReader) Read() (LedgerTransaction, error) { + args := m.Called() + return args.Get(0).(LedgerTransaction), args.Error(1) +} + +type MockArchive struct { + mock.Mock +} + +func (m *MockArchive) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) { + args := m.Called(ctx, sequence) + return args.Get(0).(xdr.LedgerCloseMeta), args.Error(1) +} + +func (m *MockArchive) Close() error { + args := m.Called() + return args.Error(0) +} + +func (m *MockArchive) NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (LedgerTransactionReader, error) { + args := m.Called(networkPassphrase, ledgerCloseMeta) + return args.Get(0).(LedgerTransactionReader), args.Error(1) +} diff --git a/exp/lighthorizon/index/builder.go b/exp/lighthorizon/index/builder.go index a9162b17dc..ccce5883fc 100644 --- a/exp/lighthorizon/index/builder.go +++ b/exp/lighthorizon/index/builder.go @@ -35,14 +35,14 @@ func BuildIndices( L := log.Ctx(ctx) - indexStore, err := Connect(targetUrl) - if err != nil { - return nil, err + indexStore, indexErr := Connect(targetUrl) + if indexErr != nil { + return nil, indexErr } // We use historyarchive as a backend here just to abstract away dealing // with the filesystem directly. - source, err := historyarchive.ConnectBackend( + source, backendErr := historyarchive.ConnectBackend( sourceUrl, historyarchive.ConnectOptions{ Context: ctx, @@ -50,14 +50,15 @@ func BuildIndices( S3Region: "us-east-1", }, ) - if err != nil { - return nil, err + if backendErr != nil { + return nil, backendErr } ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source) defer ledgerBackend.Close() if endLedger == 0 { + latest, err := ledgerBackend.GetLatestLedgerSequence(ctx) if err != nil { return nil, err @@ -126,7 +127,7 @@ func BuildIndices( L.Debugf("Working on checkpoint range [%d, %d]", ledgerRange.Low, ledgerRange.High) - if err = indexBuilder.Build(ctx, ledgerRange); err != nil { + if err := indexBuilder.Build(ctx, ledgerRange); err != nil { return errors.Wrap(err, "building indices failed") } @@ -255,10 +256,10 @@ func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchi } func (b *IndexBuilder) Watch(ctx context.Context) error { - latestLedger, err := b.ledgerBackend.GetLatestLedgerSequence(ctx) - if err != nil { - log.Errorf("Failed to retrieve latest ledger: %v", err) - return err + latestLedger, seqErr := b.ledgerBackend.GetLatestLedgerSequence(ctx) + if seqErr != nil { + log.Errorf("Failed to retrieve latest ledger: %v", seqErr) + return seqErr } nextLedger := b.lastBuiltLedger + 1 @@ -317,7 +318,7 @@ func (b *IndexBuilder) Watch(ctx context.Context) error { continue } - return errors.Wrap(err, "awaiting next ledger failed") + return errors.Wrap(buildErr, "awaiting next ledger failed") } } } diff --git a/exp/lighthorizon/index/cmd/batch/reduce/main.go b/exp/lighthorizon/index/cmd/batch/reduce/main.go index ac80ac932b..ec2c68abe3 100644 --- a/exp/lighthorizon/index/cmd/batch/reduce/main.go +++ b/exp/lighthorizon/index/cmd/batch/reduce/main.go @@ -106,10 +106,10 @@ func main() { func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error { doneAccounts := NewSafeStringSet() for i := uint32(0); i < config.MapJobCount; i++ { - logger := log.WithField("job", i) + jobLogger := log.WithField("job", i) url := filepath.Join(config.IndexRootSource, "job_"+strconv.FormatUint(uint64(i), 10)) - logger.Infof("Connecting to %s", url) + jobLogger.Infof("Connecting to %s", url) outerJobStore, err := index.Connect(url) if err != nil { @@ -119,13 +119,13 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error { accounts, err := outerJobStore.ReadAccounts() // TODO: in final version this should be critical error, now just skip it if os.IsNotExist(err) { - logger.Errorf("accounts file not found (TODO!)") + jobLogger.Errorf("accounts file not found (TODO!)") continue } else if err != nil { return errors.Wrapf(err, "failed to read accounts for job %d", i) } - logger.Infof("Processing %d accounts with %d workers", + jobLogger.Infof("Processing %d accounts with %d workers", len(accounts), config.Workers) workQueues := make([]chan string, config.Workers) @@ -159,7 +159,7 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error { for j := uint32(0); j < config.Workers; j++ { go func(routineIndex uint32) { defer wg.Done() - logger := logger. + logger := jobLogger. WithField("worker", routineIndex). WithField("total", len(accounts)) logger.Info("Started worker") @@ -179,14 +179,14 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error { // First, open the "final merged indices" at the root level // for this account. - mergedIndices, err := outerJobStore.Read(account) + mergedIndices, mergeErr := outerJobStore.Read(account) // TODO: in final version this should be critical error, now just skip it - if os.IsNotExist(err) { + if os.IsNotExist(mergeErr) { logger.Errorf("Account %s is unavailable - TODO fix", account) continue - } else if err != nil { - panic(err) + } else if mergeErr != nil { + panic(mergeErr) } // Then, iterate through all of the job folders and merge @@ -198,27 +198,27 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error { // worker needs to have a connection to every index // store, so there's no reason to re-open these for each // inner loop. - innerJobStore, err := index.Connect(url) - if err != nil { - logger.WithError(err). + innerJobStore, indexErr := index.Connect(url) + if indexErr != nil { + logger.WithError(indexErr). Errorf("Failed to open index at %s", url) - panic(err) + panic(indexErr) } - jobIndices, err := innerJobStore.Read(account) + jobIndices, innerJobErr := innerJobStore.Read(account) // This job never touched this account; skip. - if os.IsNotExist(err) { + if os.IsNotExist(innerJobErr) { continue - } else if err != nil { - logger.WithError(err). + } else if innerJobErr != nil { + logger.WithError(innerJobErr). Errorf("Failed to read index for %s", account) - panic(err) + panic(innerJobErr) } - if err := mergeIndices(mergedIndices, jobIndices); err != nil { - logger.WithError(err). + if mergeIndexErr := mergeIndices(mergedIndices, jobIndices); mergeIndexErr != nil { + logger.WithError(mergeIndexErr). Errorf("Merge failure for index at %s", url) - panic(err) + panic(mergeIndexErr) } } @@ -240,7 +240,7 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error { } } - logger.Infof("Final account flush.") + jobLogger.Infof("Final account flush.") if err = finalIndexStore.Flush(); err != nil { logger.WithError(err).Errorf("Flush error.") panic(err) @@ -249,7 +249,7 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error { // Merge the transaction indexes // There's 256 files, (one for each first byte of the txn hash) var transactionsProcessed, transactionsSkipped uint64 - logger = logger. + logger = jobLogger. WithField("indexed", transactionsProcessed). WithField("skipped", transactionsSkipped) @@ -269,28 +269,28 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error { for k := uint32(0); k < config.MapJobCount; k++ { url := filepath.Join(config.IndexRootSource, fmt.Sprintf("job_%d", k)) - innerJobStore, err := index.Connect(url) - if err != nil { - logger.WithError(err).Errorf("Failed to open index at %s", url) - panic(err) + innerJobStore, jobErr := index.Connect(url) + if jobErr != nil { + logger.WithError(jobErr).Errorf("Failed to open index at %s", url) + panic(jobErr) } - innerTxnIndexes, err := innerJobStore.ReadTransactions(prefix) - if os.IsNotExist(err) { + innerTxnIndexes, innerJobErr := innerJobStore.ReadTransactions(prefix) + if os.IsNotExist(innerJobErr) { continue - } else if err != nil { - logger.WithError(err).Errorf("Error reading tx prefix %s", prefix) - panic(err) + } else if innerJobErr != nil { + logger.WithError(innerJobErr).Errorf("Error reading tx prefix %s", prefix) + panic(innerJobErr) } - if err := finalIndexStore.MergeTransactions(prefix, innerTxnIndexes); err != nil { - logger.WithError(err).Errorf("Error merging txs at prefix %s", prefix) - panic(err) + if prefixErr := finalIndexStore.MergeTransactions(prefix, innerTxnIndexes); err != nil { + logger.WithError(prefixErr).Errorf("Error merging txs at prefix %s", prefix) + panic(prefixErr) } } } - logger.Infof("Final transaction flush (%d processed)", transactionsProcessed) + jobLogger.Infof("Final transaction flush (%d processed)", transactionsProcessed) if err = finalIndexStore.Flush(); err != nil { logger.Errorf("Error flushing transactions: %v", err) panic(err) diff --git a/exp/lighthorizon/index/cmd/batch/reduce/set.go b/exp/lighthorizon/index/cmd/batch/reduce/set.go index e57b88775b..99e6f6282e 100644 --- a/exp/lighthorizon/index/cmd/batch/reduce/set.go +++ b/exp/lighthorizon/index/cmd/batch/reduce/set.go @@ -4,26 +4,26 @@ import "sync" // SafeStringSet is a simple thread-safe set. type SafeStringSet struct { - lock sync.RWMutex - set map[string]struct{} + lock sync.RWMutex + set map[string]struct{} } func NewSafeStringSet() *SafeStringSet { - return &SafeStringSet{ - lock: sync.RWMutex{}, - set: map[string]struct{}{}, - } + return &SafeStringSet{ + lock: sync.RWMutex{}, + set: map[string]struct{}{}, + } } func (set *SafeStringSet) Contains(key string) bool { - defer set.lock.RUnlock() - set.lock.RLock() - _, ok := set.set[key] - return ok + defer set.lock.RUnlock() + set.lock.RLock() + _, ok := set.set[key] + return ok } func (set *SafeStringSet) Add(key string) { - defer set.lock.Unlock() - set.lock.Lock() - set.set[key] = struct{}{} + defer set.lock.Unlock() + set.lock.Lock() + set.set[key] = struct{}{} } diff --git a/exp/lighthorizon/index/cmd/single/main_test.go b/exp/lighthorizon/index/cmd/single/main_test.go index a18c1a8212..5442badc39 100644 --- a/exp/lighthorizon/index/cmd/single/main_test.go +++ b/exp/lighthorizon/index/cmd/single/main_test.go @@ -49,7 +49,7 @@ func TestSingleProcess(tt *testing.T) { t.Logf("Storing indices in %s", tmpDir) ctx := context.Background() - _, err := index.BuildIndices( + _, indexErr := index.BuildIndices( ctx, txmetaSource, tmpDir, @@ -62,9 +62,9 @@ func TestSingleProcess(tt *testing.T) { }, workerCount, ) - require.NoError(t, err) + require.NoError(t, indexErr) - backend, err := historyarchive.ConnectBackend( + backend, backendErr := historyarchive.ConnectBackend( txmetaSource, historyarchive.ConnectOptions{ Context: ctx, @@ -72,7 +72,7 @@ func TestSingleProcess(tt *testing.T) { S3Region: "us-east-1", }, ) - require.NoError(t, err) + require.NoError(t, backendErr) ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(backend) defer ledgerBackend.Close() diff --git a/exp/lighthorizon/index/types/bitmap.go b/exp/lighthorizon/index/types/bitmap.go index 680d6cd992..423d179134 100644 --- a/exp/lighthorizon/index/types/bitmap.go +++ b/exp/lighthorizon/index/types/bitmap.go @@ -109,6 +109,7 @@ func (i *CheckpointIndex) setActive(checkpoint uint32) error { return nil } +//lint:ignore U1000 Ignore unused function temporarily func (i *CheckpointIndex) isActive(checkpoint uint32) bool { if checkpoint >= i.firstCheckpoint && checkpoint <= i.lastCheckpoint { b := bitShiftLeft(checkpoint) diff --git a/exp/lighthorizon/main.go b/exp/lighthorizon/main.go index f6e8c2810b..f278f55c09 100644 --- a/exp/lighthorizon/main.go +++ b/exp/lighthorizon/main.go @@ -1,15 +1,13 @@ package main import ( - "context" "flag" "net/http" "github.com/stellar/go/exp/lighthorizon/actions" "github.com/stellar/go/exp/lighthorizon/archive" "github.com/stellar/go/exp/lighthorizon/index" - "github.com/stellar/go/historyarchive" - "github.com/stellar/go/ingest/ledgerbackend" + "github.com/stellar/go/network" "github.com/stellar/go/support/log" ) @@ -28,22 +26,16 @@ func main() { log.SetLevel(log.DebugLevel) log.Info("Starting lighthorizon!") - // Simple file os access - source, err := historyarchive.ConnectBackend( - *sourceUrl, - historyarchive.ConnectOptions{ - Context: context.Background(), - NetworkPassphrase: *networkPassphrase, - }, - ) + ingestArchive, err := archive.NewIngestArchive(*sourceUrl, *networkPassphrase) if err != nil { panic(err) } - ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source) - defer ledgerBackend.Close() - archiveWrapper := archive.Wrapper{Archive: ledgerBackend, Passphrase: *networkPassphrase} + defer ingestArchive.Close() + + archiveWrapper := archive.Wrapper{Archive: ingestArchive, Passphrase: *networkPassphrase} http.HandleFunc("/operations", actions.Operations(archiveWrapper, indexStore)) http.HandleFunc("/transactions", actions.Transactions(archiveWrapper, indexStore)) + http.HandleFunc("/", actions.ApiDocs()) log.Fatal(http.ListenAndServe(":8080", nil)) } diff --git a/historyarchive/fs_cache.go b/historyarchive/fs_cache.go index 2585566449..b206b385a8 100644 --- a/historyarchive/fs_cache.go +++ b/historyarchive/fs_cache.go @@ -17,7 +17,8 @@ import ( // FsCacheBackend fronts another backend with a local filesystem cache type FsCacheBackend struct { ArchiveBackend - dir string + dir string + //lint:ignore U1000 Ignore unused temporarily knownFiles lruCache maxFiles int lru lruCache diff --git a/historyarchive/gcs_archive.go b/historyarchive/gcs_archive.go index 8cb00eddb9..2ce1e90f4c 100644 --- a/historyarchive/gcs_archive.go +++ b/historyarchive/gcs_archive.go @@ -49,6 +49,7 @@ func (b *GCSArchiveBackend) GetFile(pth string) (io.ReadCloser, error) { r, err := b.bucket.Object(pth).NewReader(context.Background()) if err == storage.ErrObjectNotExist { // TODO: Check this is right + //lint:ignore SA4006 Ignore unused function temporarily err = os.ErrNotExist } return r, nil diff --git a/ingest/ledgerbackend/history_archive_backend.go b/ingest/ledgerbackend/history_archive_backend.go index 98ee689d83..5ca9f6a0bf 100644 --- a/ingest/ledgerbackend/history_archive_backend.go +++ b/ingest/ledgerbackend/history_archive_backend.go @@ -31,7 +31,7 @@ func (b *HistoryArchiveBackend) GetLatestLedgerSequence(ctx context.Context) (ui } defer r.Close() var buf bytes.Buffer - if _, err := io.Copy(&buf, r); err != nil { + if _, err = io.Copy(&buf, r); err != nil { return 0, errors.Wrap(err, "could not read latest ledger") } parsed, err := strconv.ParseUint(buf.String(), 10, 32) @@ -59,7 +59,7 @@ func (b *HistoryArchiveBackend) GetLedger(ctx context.Context, sequence uint32) } defer r.Close() var buf bytes.Buffer - if _, err := io.Copy(&buf, r); err != nil { + if _, err = io.Copy(&buf, r); err != nil { return ledger, err } if err = ledger.UnmarshalBinary(buf.Bytes()); err != nil {