-
Notifications
You must be signed in to change notification settings - Fork 518
services/horizon, clients/horizonclient: Allow filtering ingested transactions by account or asset. #4277
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
services/horizon, clients/horizonclient: Allow filtering ingested transactions by account or asset. #4277
Changes from all commits
Commits
Show all changes
119 commits
Select commit
Hold shift + click to select a range
cd48ff0
#4222: ported filtering-poc and started real asset filter feature dev
sreuland 8ee5308
Hide ingestion filtering behind a commandline flag
2opremio 01d11c2
Add account filtering support
2opremio 3e345ba
#4222: common GET/SET handler, resource model, and URL paths for filt…
sreuland b2deb81
#4222: cleaned up URL paths for CRUD on filter configs
sreuland 410876e
#4222: refactored filter list acquistion into filters package
sreuland 0151a47
#4222: changed filter cache name
sreuland 6978425
#4222: removed singleton pattern of filters
sreuland b2370fc
#4222: cleanup go fmt
sreuland 68a450a
#4222: backfilling tests on filters
sreuland 06d9c8a
#4222: fixed json string to postgres conversion
sreuland 46ccadf
Merge remote-tracking branch 'upstream/master' into filtering-feature
sreuland 0c5b57b
#4222: back filling tests on filter config web action handlers
sreuland eea2ae6
#4222: refactor filer config CRUD interface, removed create/del, not …
sreuland f5a092d
#4222: fix filters internal expired cache state rollover setting
sreuland 580e59c
#4221: added account filter tests
sreuland 9f47343
#4222: added unit test coverage on filters-as-service
sreuland 19928ff
#4222: added filtering log statement
sreuland cef3622
#4222: added filtering log statement correction
sreuland b31edb5
Merge branch 'master' into filtering-feature
2opremio bf664c3
Fix whitespace
2opremio 0df128b
#4222: removed pointer usage on filter resource
sreuland 9c6044d
Add filtering endpoint support to horizon the client
2opremio 8709b45
Start creating an integration test
2opremio c193a4d
#4222: refactored filter config model from json into typed schema in …
sreuland 6c7b850
#4222: added admin api docs in api blueprint format, include new filt…
sreuland cf8c9bc
#4222: switched to openapi3 format for admin api doc
sreuland 86ea48c
#4222: added changelog entry for filters features
sreuland b2a40d8
#4222: updated admin api docs, examples
sreuland f6aa1e5
#4222: updated changelog for filter feature
sreuland 99f811e
Merge remote-tracking branch 'upstream/master' into filtering-feature
sreuland 2050a1a
Continue with the integration test
2opremio 30dbd20
Merge remote-tracking branch 'upstream/master' into filtering-feature
sreuland 3353d76
Add txsub_result table
2opremio 68ea3f9
Serialize the full transaction, not just the result
2opremio 644883c
Initialize, get and cleanup submission results
2opremio 02c5008
Set the transaction submission result during ingestion
2opremio af9f5fc
Implement mocks by pointer
2opremio 7bbb7d5
Appease go vet
2opremio 5167127
Remove useles tests and adjust base scenario
2opremio a7c0c43
Remove unused code
2opremio 9d94a82
Appease go vet
2opremio cbd44a0
Fix failed tx scenario
2opremio beeb78f
Allow writing to db in order to clean up old entries
2opremio 22dc6b1
Rename DB methods to make then easier to understand
2opremio d5785d2
Restore rollback method
2opremio 3e42f42
Merge branch 'master' into filtering-feature
2opremio be6589f
Fix merge conflicts
2opremio 4f306d0
Fix broken test
2opremio 25cdf87
#4222: perform commits on unfiltered processor in all cases
sreuland b921a76
Fix txsub tests
2opremio be743ee
Add performance TODO
2opremio 1162fe3
Fix base scenario again
2opremio d01d591
MOre base scenario fixes
2opremio c82ae03
And yet some more base scenario fixes
2opremio cec1091
Regenerate base scenario
2opremio 95fda79
Fixed failed transactions scenario
2opremio f22e88d
Fix all unit tests except for feebump transactions
2opremio f1b45c6
minor refactoring
2opremio 794f8ac
Merge remote-tracking branch 'upstream/master' into filtering-feature
sreuland e64390e
Add another TODO
2opremio 06fcdcd
#4222: make txsub results init idempotent whether row for tx hash exi…
sreuland b6be43e
Merge remote-tracking branch 'upstream/filtering-feature' into filter…
sreuland 231ccfe
#4222: remove a TODO for init tx result
sreuland c6f99f5
Add inner transaction column to txsub_results
2opremio f7f504c
Forgot to pass the inner hash to the mock
2opremio c153946
Fix TestInitIdempotent
2opremio bb7a6b3
#4222: fixed unit tests for mocked InitEmptyTxSubmissionResult
sreuland c8742be
Merge remote-tracking branch 'origin/master' into filtering-feature
2opremio 6741c72
Resolve merge conflic
2opremio bc5cb5e
Return the affected rows by SetTxSubmissionResult()
2opremio 5d94ffb
Remove fixed TODO
2opremio 77e8ef6
Modify SetTxSubmissionResult() so that it takes multiple transactions…
2opremio 43264d5
Clarify TODO
2opremio f8aeef2
Implement batching for SetTxSubmissionResults
2opremio 7cce799
Correct documentation comments
2opremio 17538dc
#4222: separate integration test cases for asset and account whitelist
sreuland 1a3cb42
Only cleanup old submission results every TTL period
2opremio 32aaa4d
Remove superflous TODO
2opremio 3b2b381
Remove another fixed TODO
2opremio 8ea43c0
Remove yet another TODO
2opremio ffb8ce1
#4222: added metrics 'name=horizon_ingest_ledger_stats_total, type=le…
sreuland 6778bf9
#4222: added metrics 'name=horizon_ingest_processor_run_duration_seco…
sreuland ba5a684
Revert "#4222: added metrics 'name=horizon_ingest_processor_run_durat…
2opremio 5c3ad55
Get metrics for TxSubmissionResultProcessor and only run it if not re…
2opremio 90a3352
#4222: refactored Admin* Cliet methods into admin client struct/inte…
sreuland ae706fa
#4222: minor code cleanup per PR feedback
sreuland 565e1aa
#4222: use utility method NewProblemWithInvalidField for common err f…
sreuland bd1a459
#4222: rerun go formatting, missed one file
sreuland 7843998
#4222: remove superflous TODOs
sreuland b37dc44
#4222: added history tx lookup back in to txsub tick routing, for las…
sreuland d476a4e
#4222: removed asset filter inspection of claimablebalance claim/claw…
sreuland b693b95
#4222: add host/port override for admin client
sreuland 045f34b
fixed typo
sreuland 0a79fd0
#4222: cleaned up creation of problem for sql NotFound
sreuland 54ddeac
#4222: fixed err checking in get tx results loop, increased test cove…
sreuland 058d0ee
streamlined matching/boolean code
sreuland 05e7211
#4222: incorporate pr feedback on code, remove todo
sreuland 065aae3
Merge remote-tracking branch 'upstream/master' into filtering-feature
sreuland 0b86858
#4222: fixed unit test failure and static check on duration
sreuland 92121f3
Merge remote-tracking branch 'upstream/master' into filtering-feature
sreuland 4015550
#4222: refactored into NewAdminClient pattern, pr feedback
sreuland 0578799
Update typos in changelog
sreuland 2b1e53b
#4222: removed clock from admin client
sreuland 3d89915
Update inline code docs for typo
sreuland 5e8a912
#4222: refactored admin client method names to not be redundant
sreuland a14bc78
Merge remote-tracking branch 'upstream/filtering-feature' into filter…
sreuland 0ae77dc
#4222: removed unnecessary check on duplicate txsub results
sreuland e80e5f3
#4222: added inline docs on setTxSubmissionResults
sreuland 6491458
#4222: cleanup go formatting error
sreuland e36549b
Merge remote-tracking branch 'upstream/master' into filtering-feature
sreuland 2a0a1bc
#4222: added inline docs on admin http endpoint handlers for referenc…
sreuland 7b77d7b
#4222: include V0 transactions in asset filter
sreuland 65405b9
#4222: define length of map at declaration on asset filter
sreuland 4541374
#4222: set ingestion filter feature flag to default of false
sreuland b390c9b
#4222: added info log output for filtered tx drops
sreuland 29822d4
Merge remote-tracking branch 'upstream/master' into filtering-feature
sreuland d59cf46
#4222: changed log output for tx dropped statement to show hex string…
sreuland 0a78183
Merge branch 'master' into filtering-feature
2opremio File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
package horizonclient | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"net/http" | ||
"net/url" | ||
"time" | ||
|
||
hProtocol "github.com/stellar/go/protocols/horizon" | ||
"github.com/stellar/go/support/errors" | ||
) | ||
|
||
// port - the horizon admin port, zero value defaults to 4200 | ||
// host - the host interface name that horizon has bound admin web service, zero value defaults to 'localhost' | ||
// timeout - the length of time for the http client to wait on responses from admin web service | ||
func NewAdminClient(port uint16, host string, timeout time.Duration) (*AdminClient, error) { | ||
baseURL, err := getAdminBaseURL(port, host) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if timeout == 0 { | ||
timeout = HorizonTimeout | ||
} | ||
|
||
return &AdminClient{ | ||
baseURL: baseURL, | ||
http: http.DefaultClient, | ||
horizonTimeout: timeout, | ||
}, nil | ||
} | ||
|
||
func getAdminBaseURL(port uint16, host string) (string, error) { | ||
baseURL, err := url.Parse("http://localhost") | ||
if err != nil { | ||
return "", err | ||
} | ||
adminPort := uint16(4200) | ||
if port > 0 { | ||
adminPort = port | ||
} | ||
adminHost := baseURL.Hostname() | ||
if len(host) > 0 { | ||
adminHost = host | ||
} | ||
baseURL.Host = fmt.Sprintf("%s:%d", adminHost, adminPort) | ||
return baseURL.String(), nil | ||
} | ||
|
||
func (c *AdminClient) sendGetRequest(requestURL string, a interface{}) error { | ||
req, err := http.NewRequest("GET", requestURL, nil) | ||
if err != nil { | ||
return errors.Wrap(err, "error creating Admin HTTP request") | ||
} | ||
return c.sendHTTPRequest(req, a) | ||
} | ||
|
||
func (c *AdminClient) sendHTTPRequest(req *http.Request, a interface{}) error { | ||
ctx, cancel := context.WithTimeout(context.Background(), c.horizonTimeout) | ||
defer cancel() | ||
|
||
if resp, err := c.http.Do(req.WithContext(ctx)); err != nil { | ||
return err | ||
} else { | ||
return decodeResponse(resp, a, req.URL.String(), nil) | ||
} | ||
} | ||
|
||
func (c *AdminClient) getIngestionFiltersURL(filter string) string { | ||
return fmt.Sprintf("%s/ingestion/filters/%s", c.baseURL, filter) | ||
} | ||
|
||
func (c *AdminClient) GetIngestionAssetFilter() (hProtocol.AssetFilterConfig, error) { | ||
var filter hProtocol.AssetFilterConfig | ||
err := c.sendGetRequest(c.getIngestionFiltersURL("asset"), &filter) | ||
return filter, err | ||
} | ||
|
||
func (c *AdminClient) GetIngestionAccountFilter() (hProtocol.AccountFilterConfig, error) { | ||
var filter hProtocol.AccountFilterConfig | ||
err := c.sendGetRequest(c.getIngestionFiltersURL("account"), &filter) | ||
return filter, err | ||
} | ||
|
||
func (c *AdminClient) SetIngestionAssetFilter(filter hProtocol.AssetFilterConfig) error { | ||
buf := bytes.NewBuffer(nil) | ||
err := json.NewEncoder(buf).Encode(filter) | ||
if err != nil { | ||
return err | ||
} | ||
req, err := http.NewRequest(http.MethodPut, c.getIngestionFiltersURL("asset"), buf) | ||
if err != nil { | ||
return errors.Wrap(err, "error creating HTTP request") | ||
} | ||
req.Header.Add("Content-Type", "application/json") | ||
return c.sendHTTPRequest(req, nil) | ||
} | ||
|
||
func (c *AdminClient) SetIngestionAccountFilter(filter hProtocol.AccountFilterConfig) error { | ||
buf := bytes.NewBuffer(nil) | ||
err := json.NewEncoder(buf).Encode(filter) | ||
if err != nil { | ||
return err | ||
} | ||
req, err := http.NewRequest(http.MethodPut, c.getIngestionFiltersURL("account"), buf) | ||
if err != nil { | ||
return errors.Wrap(err, "error creating HTTP request") | ||
} | ||
req.Header.Add("Content-Type", "application/json") | ||
return c.sendHTTPRequest(req, nil) | ||
} | ||
|
||
// ensure that the horizon admin client implements AdminClientInterface | ||
var _ AdminClientInterface = &AdminClient{} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
package horizonclient | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestDefaultAdminHostPort(t *testing.T) { | ||
horizonAdminClient, err := NewAdminClient(0, "", 0) | ||
|
||
fullAdminURL := horizonAdminClient.getIngestionFiltersURL("test") | ||
require.NoError(t, err) | ||
assert.Equal(t, "http://localhost:4200/ingestion/filters/test", fullAdminURL) | ||
} | ||
|
||
func TestOverrideAdminHostPort(t *testing.T) { | ||
horizonAdminClient, err := NewAdminClient(1234, "127.0.0.1", 0) | ||
|
||
fullAdminURL := horizonAdminClient.getIngestionFiltersURL("test") | ||
require.NoError(t, err) | ||
assert.Equal(t, "http://127.0.0.1:1234/ingestion/filters/test", fullAdminURL) | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.