-
Notifications
You must be signed in to change notification settings - Fork 816
Use gRPC for distributor <-> ingester rpcs. #144
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
From #105:
Any measurements? |
Nope, not tested it yet. |
I was actually more thinking of the boilerplate reduction than the latency. Because of vendoring it's hard to tell the actual difference at a glance. |
Take a look at the second commit. We currently haven;t removed the origin code http (as its needed for migration), but will do. I just tidied it up for now. |
Had to update built-tools and fix the generation of bindata.go to get lint to pass. |
06a2986
to
f68f206
Compare
@jml this is good to go now. Anecdotally query performance is much improved (locally). |
- Don't commit generated code, teach make how to build it - Don't duplicate remote.proto from promtheus, import it. - Make ingester and distributor expose grpc services - Move http server code into the ingester/distributor packages (some duplication, but will go away after we migrate) - Start a gRPC server for ingester (along side http server) - Plumb through optionally creating grpc ingester client - Try and propagate user id from distributor to ingester - gRPC middleware package; add logging, auth and instrumentation middleware
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good, and I really like the refactoring. Would like another chance to go over it after you've replied to the review, since it really is quite big (even without the vendoring), and I want to make sure I grok it all.
flag.StringVar(&cfg.rulerConfig.ConfigsAPIURL, "ruler.configs.url", "", "URL of configs API server.") | ||
flag.StringVar(&cfg.rulerConfig.UserID, "ruler.userID", "", "Weave Cloud org to run rules for") | ||
flag.DurationVar(&cfg.rulerConfig.EvaluationInterval, "ruler.evaluation-interval", 15*time.Second, "How frequently to evaluate rules") | ||
flag.BoolVar(&cfg.logSuccess, "log.success", false, "Log successful requests") | ||
flag.BoolVar(&cfg.watchDynamo, "watch-dynamo", false, "Periodically collect DynamoDB provisioned throughput.") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's up with all this shuffling? It doesn't matter much, just curious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my own sanity, I grouped the flags by common, ingester, distributor etc
) | ||
cortex.RegisterIngesterServer(grpcServer, ing) | ||
go grpcServer.Serve(lis) | ||
defer grpcServer.Stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this function (main
) is getting too big?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I think we'll end up breaking this into a bunch of binaries / images next.
} | ||
d.clients[hostname] = client | ||
|
||
d.clients[ingester.Hostname] = client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will it matter if some of the ingester clients are HTTP and others are grpc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will not. It has to support that for the deployment case.
"github.com/weaveworks/cortex/user" | ||
) | ||
|
||
// IngesterClient is a client library for the ingester |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should say httpIngesterClient
.
@@ -158,13 +173,14 @@ type sampleTracker struct { | |||
succeeded int32 | |||
} | |||
|
|||
// Append implements SampleAppender. | |||
func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error { | |||
// Push Implements cortex.IngesterServer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be lower-case implements
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
m = append(m, &ss) | ||
} | ||
|
||
return m |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add some tests for each of these To
/From
pairs that demonstrate that they roundtrip correctly. (i.e. that ToFoo(FromFoo(x)) == x
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -151,12 +151,14 @@ func (r *Ruler) getConfig(userID string) (*config.Config, error) { | |||
|
|||
// appenderAdapter adapts cortex.SampleAppender to prometheus.SampleAppender | |||
type appenderAdapter struct { | |||
appender cortex.SampleAppender | |||
ctx context.Context | |||
distributor *distributor.Distributor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That said, I think this should probably be an IngesterServer
(if it is indeed correct that the distributor implements the IngesterServer
interface).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not fully - its only implements the Push method. I thought the coupling was a bit strong here, but there is no other interface just for Push, unless we invent one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something we can save for a future refactoring then.
// Append implements storage.SampleAppender. | ||
func (i *Ingester) Append(ctx context.Context, samples []*model.Sample) error { | ||
for _, sample := range samples { | ||
// Push Implements cortex.IngesterServer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lower-case "implements"
@@ -0,0 +1,56 @@ | |||
package distributor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would you feel about renaming this file to http_server.go
or http_handlers.go
or something? Likewise for its ingester equivalent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't mind either way. The ingester_client.go and ingester/http.go are going to be deleted in the next PR.
_, err := d.Push(ctx, &req) | ||
if err != nil { | ||
switch e := err.(type) { | ||
case IngesterError: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AIUI, this is only going to be relevant if we're also using the HTTP ingester client, and that with gRPC, we will no longer be able to distinguish between user errors and server errors. Is there a way of addressing this? It's a bit of a regression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sure there is with gRPC, but I'll have to investigate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please file a bug for follow-up?
@jml PTAL |
@@ -15,8 +15,10 @@ func ServerLoggingInterceptor(logSuccess bool) grpc.UnaryServerInterceptor { | |||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { | |||
begin := time.Now() | |||
resp, err := handler(ctx, req) | |||
if logSuccess || err != nil { | |||
if logSuccess && err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That should be err == nil
, I think. Probably overall clearer to do:
if err != nil {
...
} else if logSuccess {
...
}
Thanks. Minor tweaks. |
Fixes #105
TODO: