-
Notifications
You must be signed in to change notification settings - Fork 1.2k
WIP: ✨ Cluster Provider and cluster-aware controllers #2207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: vincepri The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
} | ||
cluster1, err := mgr.LogicalClusterGetter()("cluster1") | ||
Expect(err).NotTo(HaveOccurred()) | ||
Expect(cluster1.GetClient().Create(ctx, dep)).To(Succeed()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Do I see correctly that you are creating a Deployment on Cluster1 and then get reconcile requests for Cluster1 and Cluster2?
I would have expected to only get one on Cluster1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it's mostly because under the hood it's returning the same testenv kubeconfig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least for now, need to rework testenv a bit potentially to spin up multiple clusters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I guessed it is something like that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case we don't want to adress this on this PR, let's add a TODO to make clear the currently tested/verified behavior is not the actual behavior
) | ||
|
||
// Controller implements controller.Controller. | ||
type Controller struct { | ||
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required. | ||
Name string | ||
|
||
// Cluster is the logical cluster that this controller is running against. | ||
// +optional | ||
Cluster logical.Name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really understand why a controller would need to know anything about the cluster it runs against. It has eventsources and the events those emit contain the cluster. It should also be mentioned that a controller might be getting events from multiple clusters.
IMHO we should just add the cluster
field based on the reconcile.Request
into the logger (which is from what I can tell the only place where we use this)
e561bb9
to
64ecb97
Compare
e29fcad
to
153e775
Compare
/test pull-controller-runtime-test |
ce17906
to
8d3b8f7
Compare
{ | ||
// TODO(vincepri): Make this timeout configurable. | ||
ctx, cancel := context.WithTimeout(ctx, 10*time.Second) | ||
defer cancel() | ||
var watchErr error | ||
if err := wait.PollImmediateUntilWithContext(ctx, 1*time.Second, func(ctx context.Context) (done bool, err error) { | ||
cfg, watchErr = cm.logicalAdapter.RESTConfig(name) | ||
cl, watchErr = cm.logicalProvider.Get(ctx, name, cm.defaultClusterOptions, cluster.WithName(name)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why both name and cluster.WithName(name)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The last one just sets the option for cluster.New
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy to rework it, I guess it's a bit weird 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why poll rather than making Get
blocking?
9618908
to
9d51b73
Compare
c648112
to
13a0b91
Compare
13a0b91
to
107c2dc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly just nits, but not entirely done with the review yet (pkg/manager is left)
// KindClusterProvider is a cluster provider that works with a local Kind instance. | ||
type KindClusterProvider struct{} | ||
|
||
func (k *KindClusterProvider) Get(ctx context.Context, name logical.Name, opts ...cluster.Option) (cluster.Cluster, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Might it be better if we end up with something like logicalcluster.Name or logical.ClusterName
Also in field names of structs like Builder:
cluster cluster.Cluster
logicalName logical.Name
vs.
cluster cluster.Cluster
logicalClusterName logicalcluster.Name
or if we want something shorter
cluster cluster.Cluster
clusterName logicalcluster.Name
(Might be good to consistently stick with the same naming, e.g. Cluster is a Cluster, ClusterName is a logicalcluster.Name, mixing it up might lead to a bit of confusion, e.g. with req.Cluster)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we should rename the repo to sigs.k8s.io/logicalcluster
and change the package name to logicalcluster
too. I also like clusterName
as a short-hand for variables of type logicalcluster.Name
.
@nikhita how would we do a repo rename?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternative, as suggested in other comments: get rid of the alias altogether and just use string. Am undecided. Have done that now locally in this branch. We lose type-safety, and replace it with naming convention of calling every formally logical cluster name clusterName
.
// If we're building a logical controller, raw watches are not allowed | ||
// given that the cache cannot be validated to be coming from the same cluter. | ||
// In the future, we could consider allowing this by satisfying a new interface | ||
// that sets and uses the cluster. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you expand a bit on that?
Is it that the cache that we get from cluster.GetCache might be caching objects from another cluster than the one the channel belongs to? (this is the case for source.Channel, right?)
How does this fit with this godoc comment?
* Use Channel for events originating outside the cluster (eh.g. GitHub Webhook callback, Polling external urls).
I assume it's the case where folks are using Channel for something coming from a cluster? (which is not what we document, but of course possible)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, this makes it impossible to use sources that are not source.Kind
} | ||
cluster1, err := mgr.LogicalClusterGetter()("cluster1") | ||
Expect(err).NotTo(HaveOccurred()) | ||
Expect(cluster1.GetClient().Create(ctx, dep)).To(Succeed()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case we don't want to adress this on this PR, let's add a TODO to make clear the currently tested/verified behavior is not the actual behavior
|
||
func (e *enqueueRequestsFromMapFunc) DeepCopyFor(c cluster.Cluster) EventHandler { | ||
return &enqueueRequestsFromMapFunc{ | ||
cluster: c, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if we need to store the entire Cluster given that we only use the name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related: at least in EnqueueRequestForObject
we could additionally store the ClusterName in an additional field to avoid the lookup for every event
(Not sure if the compiler optimizes it away, but still simplifies the implementation)
mapper: c.GetRESTMapper(), | ||
} | ||
if err := copy.parseOwnerTypeGroupKind(c.GetScheme()); err != nil { | ||
panic(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Are there cases where it would be good if DeepCopyFor could return an error?
|
||
func (w *watchDescription) DeepCopyFor(c cluster.Cluster) *watchDescription { | ||
copy := &watchDescription{ | ||
predicates: w.predicates, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This shares the predicates, right? Is that fine or is there any way we can copy them?
// This covers the case where a Watch was added later to the controller. | ||
if watchDesc.IsClusterAware() { | ||
for _, cldesc := range c.clusters { | ||
if err := c.startClusterAwareWatchLocked(cldesc, watchDesc); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Are cluster aware watches started here and in l.192? If yes and it's intentional do we have to ensure that c.clusters does not contain the "default" cluster? Or is that the responsibility of the provider?
if watch == nil { | ||
return nil | ||
} | ||
c.LogConstructor(nil).Info("Starting Cluster-Aware EventSource", "cluster", cldesc.Name(), "source", watch.src) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: not sure how watch.src is logged
|
||
// Disengage all the runnables. | ||
for _, aware := range cm.logicalClusterAwareRunnables { | ||
if err := aware.Disengage(c.ctx, c); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: might make sense to collect error here to try removing it from as many runnables as possible and then return after the loop if we have some errors.
if _, err := cm.getLogicalCluster(ctx, event.Name); err != nil { | ||
return err | ||
} | ||
cm.engageClusterAwareRunnables() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we want to optimize this a bit in a follow-up to not always engage for all clusters, but just for the added/modified one.
Might become a problem with hundreds+ of clusters
@@ -51,6 +51,15 @@ func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables { | |||
// The runnables added before Start are started when Start is called. | |||
// The runnables added after Start are started directly. | |||
func (r *runnables) Add(fn Runnable) error { | |||
// For wrapped logical runnables, we need to unwrap them to get the underlying runnable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Do we still plan do implement something here or just leftover?
// that we schedule all the cluster aware runnables. | ||
for name, cluster := range cm.logicalClusters { | ||
for _, aware := range cm.logicalClusterAwareRunnables { | ||
if err := aware.Engage(cluster.ctx, cluster); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if this has to respect leader election as well
Great step forward!! Sorry also a bunch of nits, I thought I might as well just comment on them while I see them |
// If we're building a logical controller, raw watches are not allowed | ||
// given that the cache cannot be validated to be coming from the same cluter. | ||
// In the future, we could consider allowing this by satisfying a new interface | ||
// that sets and uses the cluster. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, this makes it impossible to use sources that are not source.Kind
cluster cluster.Cluster | ||
logicalName logical.Name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand these fields. The cluster is always set to the manager and logicalName is never set. Do you want to add a new WithCluster
to the builder
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vincepri where did you want to go with this?
// Cluster provides various methods to interact with a cluster. | ||
type Cluster interface { | ||
// Name returns the unique logical name of the cluster. | ||
Name() logical.Name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not a string? I find the whole logical
very confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 the current naming with logical is pretty confusing
func (c *Controller) Engage(ctx context.Context, cluster cluster.Cluster) error { | ||
c.mu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is basically a ControllerBuilder
. Why do we add logic into the controller how to duplicate itself rather than keeping this in the Builder
layer above? i.E.
type ControllerBuilder func(cluster.Cluster) (cancel func() error, err error)
builder := builder.New(mgr).For(&myKind{}).ControllerBuilder(&MyReconciler{})
cancels := map[string]func() error{}
for clusterName, cluster := range mgr.GetClusters() {
cancel, err := builder(cluster)
if err != nil {
return err
}
cancels[clusterName] = cancel
}
Adding this inside here is a) muddying the layers b) error prone if ppl change the controller in the future and miss the fact that everything has to be deepcopied in here
continue | ||
} | ||
// If the request doesn't specify a cluster, use the cluster from the context. | ||
if req.Cluster == "" && e.cluster != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding this kind of awareness into a custom mapper doesn't really make a lot of sense to me. It would only be useful in a controller that deals with multiple clusters, otherwise we don't need the distinction. Once we deal with multiple clusters, why would we only sometimes fill the Cluster
field?
@@ -197,3 +205,16 @@ func (e *enqueueRequestForOwner) getOwnersReferences(object metav1.Object) []met | |||
// No Controller OwnerReference found | |||
return nil | |||
} | |||
|
|||
func (e *enqueueRequestForOwner) DeepCopyFor(c cluster.Cluster) EventHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as in controller/source re keeping the builder layer above the thing
@@ -178,6 +196,9 @@ func (cm *controllerManager) Add(r Runnable) error { | |||
} | |||
|
|||
func (cm *controllerManager) add(r Runnable) error { | |||
if aware, ok := r.(cluster.AwareRunnable); ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd much prefer to add an explicit RunForEachCluster(builder func(cluster.Cluster)(Runnable, error)
that we can document, rather than requiring ppl to be aware of an extension interface.
It also means that we can simply use the existing Runnable
interface to start/stop as needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a sideeffect, this would address @sbueringer comment below re leader election
{ | ||
// TODO(vincepri): Make this timeout configurable. | ||
ctx, cancel := context.WithTimeout(ctx, 10*time.Second) | ||
defer cancel() | ||
var watchErr error | ||
if err := wait.PollImmediateUntilWithContext(ctx, 1*time.Second, func(ctx context.Context) (done bool, err error) { | ||
cfg, watchErr = cm.logicalAdapter.RESTConfig(name) | ||
cl, watchErr = cm.logicalProvider.Get(ctx, name, cm.defaultClusterOptions, cluster.WithName(name)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why poll rather than making Get
blocking?
return cm.defaultCluster.GetAPIReader() | ||
} | ||
|
||
func (cm *controllerManager) engageClusterAwareRunnables() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of this feels extremely complicated. How about:
cancels := map[string]func(){}
clusterProvider.OnAdd(func(c cluster) error {
for _, builder := range mgr.eachClusterRunnables {
ctx, cancel := context.WithCancel(ctx)
if err := mgr.Add(wrapRunnableWithCustomCtx(ctx, c)); err != nil { return err }
c.GetCache().HasSynced()
// maybe aggregate errs instead?
runnable, err := builder(c);
if err != nil { return err }
if err := mgr.Add(wrapRunnableWithCustomCtx(ctx, runnable); err != nil { return err }
cancels[cluster.Name() = cancel
}
})
func wrapRunnableWithCustomCtx(ctx context.Context,r Runnable) Runnable {
return RunnableFunc(_ context.Context){ r(ctx) }
}
clusterProvider.OnDelete(func(name string){
for _, cancel := range cancels[name]{
cancel()
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clusterProvider
is what you call logicalProvider
. I find the logical
in there still confusing, to me that sounds like there must be a physicalProvider
somewhere - But that distinction doesn't make sense for controller runtime.
107c2dc
to
edecb30
Compare
123d53c
to
9e965d1
Compare
Signed-off-by: Vince Prignano <[email protected]>
9e965d1
to
dfeaa19
Compare
@vincepri: The following tests failed, say
Full PR test history. Your PR dashboard. Please help us cut down on flakes by linking to an open issue when you hit one in your PR. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
PR needs rebase. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
The Kubernetes project currently lacks enough contributors to adequately respond to all PRs. This bot triages PRs according to the following rules:
You can:
Please send feedback to sig-contributor-experience at kubernetes/community. /lifecycle stale |
The Kubernetes project currently lacks enough active contributors to adequately respond to all PRs. This bot triages PRs according to the following rules:
You can:
Please send feedback to sig-contributor-experience at kubernetes/community. /lifecycle rotten |
The Kubernetes project currently lacks enough active contributors to adequately respond to all issues and PRs. This bot triages PRs according to the following rules:
You can:
Please send feedback to sig-contributor-experience at kubernetes/community. /close |
@k8s-triage-robot: Closed this PR. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
No description provided.