Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/go/rpk/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ require (
github.com/prometheus/common v0.65.0
github.com/redpanda-data/common-go/proto v0.0.0-20260223115805-73fb9bd9c2c0
github.com/redpanda-data/common-go/rpadmin v0.2.4
github.com/redpanda-data/common-go/rpsr v0.1.3
github.com/redpanda-data/common-go/rpsr v0.1.4
github.com/redpanda-data/protoc-gen-go-mcp v0.0.0-20250930092048-a98b94b5957a
github.com/rs/xid v1.6.0
github.com/safchain/ethtool v0.6.2
Expand All @@ -67,13 +67,13 @@ require (
github.com/twmb/franz-go/pkg/kadm v1.17.1
github.com/twmb/franz-go/pkg/kfake v0.0.0-20251024215757-aea970d4d0d2
github.com/twmb/franz-go/pkg/kmsg v1.12.0
github.com/twmb/franz-go/pkg/sr v1.5.0
github.com/twmb/franz-go/pkg/sr v1.7.0
github.com/twmb/franz-go/plugin/kzap v1.1.2
github.com/twmb/tlscfg v1.2.1
github.com/twmb/types v1.1.6
go.uber.org/zap v1.27.1
golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546
golang.org/x/sync v0.19.0
golang.org/x/sync v0.20.0
golang.org/x/sys v0.41.0
golang.org/x/term v0.40.0
google.golang.org/genproto/googleapis/api v0.0.0-20260311181403-84a4fc48630c
Expand Down
12 changes: 6 additions & 6 deletions src/go/rpk/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ github.com/redpanda-data/common-go/proto v0.0.0-20260223115805-73fb9bd9c2c0 h1:H
github.com/redpanda-data/common-go/proto v0.0.0-20260223115805-73fb9bd9c2c0/go.mod h1:4TOyhdEvR/hlk0RjHXzmO0hYmWBpGGI1qiFPNv211O4=
github.com/redpanda-data/common-go/rpadmin v0.2.4 h1:XM7kfhKokWeLATX4dnLXczjd4sxN2AcJll/KRvE15iA=
github.com/redpanda-data/common-go/rpadmin v0.2.4/go.mod h1:uOAY10WXPtcDPU0aUdpkqHR+b1BqUvRhlvMf0vha73A=
github.com/redpanda-data/common-go/rpsr v0.1.3 h1:mcjp7MeJylONI0H4MlrUEPEVOZpPNQGzWqcdd1QIAv4=
github.com/redpanda-data/common-go/rpsr v0.1.3/go.mod h1:2j2416onosg5FKaKz52NooRE+q/9EJqQn0kyTcTXWHc=
github.com/redpanda-data/common-go/rpsr v0.1.4 h1:d9lu5q5wyhZWBYR1GnZkq+eZGKU0qoaSwwybRS9Uk2k=
github.com/redpanda-data/common-go/rpsr v0.1.4/go.mod h1:qVa7b0yaCRdZDn5dcZ9CazqVr4jYbgtOJUywI2X3G3I=
github.com/redpanda-data/go-avro/v2 v2.0.0-20240405204525-77b1144dc525 h1:vskZrV6q8W8flL0Ud23AJUYAd8ZgTadO45+loFnG2G0=
github.com/redpanda-data/go-avro/v2 v2.0.0-20240405204525-77b1144dc525/go.mod h1:3YqAM7pgS5vW/EH7naCjFqnAajSgi0f0CfMe1HGhLxQ=
github.com/redpanda-data/protoc-gen-go-mcp v0.0.0-20250930092048-a98b94b5957a h1:jNHT6Fcy/rBAFnX8rjbwrJ+lSF2Ufa1jqmnCV6m6RKY=
Expand Down Expand Up @@ -340,8 +340,8 @@ github.com/twmb/franz-go/pkg/kfake v0.0.0-20251024215757-aea970d4d0d2 h1:Iwo/KHm
github.com/twmb/franz-go/pkg/kfake v0.0.0-20251024215757-aea970d4d0d2/go.mod h1:d8HaJtUEgZfU2n+Ps/fCtzlFLtgdrlZgTWwvCqQ3eDo=
github.com/twmb/franz-go/pkg/kmsg v1.12.0 h1:CbatD7ers1KzDNgJqPbKOq0Bz/WLBdsTH75wgzeVaPc=
github.com/twmb/franz-go/pkg/kmsg v1.12.0/go.mod h1:+DPt4NC8RmI6hqb8G09+3giKObE6uD2Eya6CfqBpeJY=
github.com/twmb/franz-go/pkg/sr v1.5.0 h1:KQH8veHxKyAjT4U4/rziJnSEfafuluznLoxhrp0yJfo=
github.com/twmb/franz-go/pkg/sr v1.5.0/go.mod h1:O4o4mUMNfmyEt2HcuM+qZdc6KrcStvjgxWR6Cfvmukw=
github.com/twmb/franz-go/pkg/sr v1.7.0 h1:wHStlO6aOPWWgZ68ZYcdtQe9tRbkcTc1gRLbgs+8QAA=
github.com/twmb/franz-go/pkg/sr v1.7.0/go.mod h1:64CsHlsQnyFRq1sYPcCmlRrEG3PlLPb6cDddx2wGr28=
github.com/twmb/franz-go/plugin/kzap v1.1.2 h1:0arX5xJ0soUPX1LlDay6ZZoxuWkWk1lggQ5M/IgRXAE=
github.com/twmb/franz-go/plugin/kzap v1.1.2/go.mod h1:53Cl9Uz1pbdOPDvUISIxLrZIWSa2jCuY1bTMauRMBmo=
github.com/twmb/tlscfg v1.2.1 h1:IU2efmP9utQEIV2fufpZjPq7xgcZK4qu25viD51BB44=
Expand Down Expand Up @@ -415,8 +415,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
1 change: 1 addition & 0 deletions src/go/rpk/pkg/cli/registry/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
importpath = "github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/registry",
visibility = ["//visibility:public"],
deps = [
"//src/go/rpk/pkg/cli/registry/context",
"//src/go/rpk/pkg/cli/registry/mode",
"//src/go/rpk/pkg/cli/registry/schema",
"//src/go/rpk/pkg/config",
Expand Down
43 changes: 30 additions & 13 deletions src/go/rpk/pkg/cli/registry/compatibility_level.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,26 @@
package registry

import (
"context"
"fmt"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/schemaregistry"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"github.com/twmb/franz-go/pkg/sr"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/schemaregistry"
)

func compatibilityLevelCommand(fs afero.Fs, p *config.Params) *cobra.Command {
func compatibilityLevelCommand(fs afero.Fs, p *config.Params, schemaCtx *string) *cobra.Command {
cmd := &cobra.Command{
Use: "compatibility-level",
Args: cobra.ExactArgs(0),
Short: "Manage global or per-subject compatibility levels",
}
cmd.AddCommand(
compatGetCommand(fs, p),
compatSetCommand(fs, p),
compatGetCommand(fs, p, schemaCtx),
compatSetCommand(fs, p, schemaCtx),
)
p.InstallFormatFlag(cmd)
return cmd
Expand All @@ -41,7 +41,7 @@ type compatibilityLevelResponse struct {
Err string `json:"error,omitempty" yaml:"error,omitempty"`
}

func compatGetCommand(fs afero.Fs, p *config.Params) *cobra.Command {
func compatGetCommand(fs afero.Fs, p *config.Params, schemaCtx *string) *cobra.Command {
var global bool
cmd := &cobra.Command{
Use: "get [SUBJECT...]",
Expand All @@ -52,7 +52,7 @@ Running this command with no subject returns the global level, alternatively
you can use the --global flag to get the global level at the same time as
per-subject levels.
`,
Run: func(_ *cobra.Command, subjects []string) {
Run: func(cmd *cobra.Command, subjects []string) {
f := p.Formatter
if h, ok := f.Help([]compatibilityLevelResponse{}); ok {
out.Exit(h)
Expand All @@ -63,10 +63,18 @@ per-subject levels.
cl, err := schemaregistry.NewClient(fs, p)
out.MaybeDie(err, "unable to initialize schema registry client: %v", err)

for i, s := range subjects {
if s != sr.GlobalSubject {
subjects[i] = schemaregistry.QualifySubject(*schemaCtx, s)
}
}
if len(subjects) > 0 && global {
subjects = append(subjects, sr.GlobalSubject)
}
results := cl.Compatibility(context.Background(), subjects...)
results := cl.Compatibility(cmd.Context(), subjects...)
for i := range results {
results[i].Subject = schemaregistry.StripContextQualifier(*schemaCtx, results[i].Subject)
}

err = printCompatibilityResult(results, f)
out.MaybeDieErr(err)
Expand All @@ -77,14 +85,14 @@ per-subject levels.
return cmd
}

func compatSetCommand(fs afero.Fs, p *config.Params) *cobra.Command {
func compatSetCommand(fs afero.Fs, p *config.Params, schemaCtx *string) *cobra.Command {
var global bool
var level string
cmd := &cobra.Command{
Use: "set [SUBJECT...]",
Short: "Set the global or per-subject compatibility levels",
Long: compatHelpText,
Run: func(_ *cobra.Command, subjects []string) {
Run: func(cmd *cobra.Command, subjects []string) {
f := p.Formatter
if h, ok := f.Help([]compatibilityLevelResponse{}); ok {
out.Exit(h)
Expand All @@ -94,14 +102,23 @@ func compatSetCommand(fs afero.Fs, p *config.Params) *cobra.Command {

cl, err := schemaregistry.NewClient(fs, p)
out.MaybeDie(err, "unable to initialize schema registry client: %v", err)

for i, s := range subjects {
if s != sr.GlobalSubject {
subjects[i] = schemaregistry.QualifySubject(*schemaCtx, s)
}
}
if len(subjects) > 0 && global {
subjects = append(subjects, sr.GlobalSubject)
}
var l sr.CompatibilityLevel
err = l.UnmarshalText([]byte(level))
out.MaybeDieErr(err)

results := cl.SetCompatibility(context.Background(), sr.SetCompatibility{Level: l}, subjects...)
results := cl.SetCompatibility(cmd.Context(), sr.SetCompatibility{Level: l}, subjects...)
for i := range results {
results[i].Subject = schemaregistry.StripContextQualifier(*schemaCtx, results[i].Subject)
}
err = printCompatibilityResult(results, f)
out.MaybeDieErr(err)
},
Expand Down
36 changes: 36 additions & 0 deletions src/go/rpk/pkg/cli/registry/context/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "context",
srcs = [
"context.go",
"delete.go",
"list.go",
],
importpath = "github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/registry/context",
visibility = ["//visibility:public"],
deps = [
"//src/go/rpk/pkg/adminapi",
"//src/go/rpk/pkg/config",
"//src/go/rpk/pkg/out",
"//src/go/rpk/pkg/schemaregistry",
"@com_github_spf13_afero//:afero",
"@com_github_spf13_cobra//:cobra",
"@com_github_twmb_franz_go_pkg_sr//:sr",
],
)

go_test(
name = "context_test",
srcs = ["context_test.go"],
deps = [
":context",
"//src/go/rpk/pkg/cli/registry",
"//src/go/rpk/pkg/config",
"@com_github_spf13_afero//:afero",
"@com_github_spf13_cobra//:cobra",
"@com_github_stretchr_testify//require",
"@com_github_twmb_franz_go_pkg_sr//:sr",
"@com_github_twmb_franz_go_pkg_sr//srfake",
],
)
128 changes: 128 additions & 0 deletions src/go/rpk/pkg/cli/registry/context/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright 2026 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package context

import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/spf13/afero"
"github.com/spf13/cobra"
"github.com/twmb/franz-go/pkg/sr"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
)

const qualifiedSubjectsConfigKey = "schema_registry_enable_qualified_subjects"

type contextResponse struct {
Name string `json:"name" yaml:"name"`
Mode string `json:"mode" yaml:"mode"`
Compatibility string `json:"compatibility" yaml:"compatibility"`
}

// ListContexts calls cl.Contexts and translates a 404 into a message
// indicating that the Redpanda cluster does not support schema contexts.
func ListContexts(ctx context.Context, cl *sr.Client) ([]string, error) {
contexts, err := cl.Contexts(ctx)
if err != nil {
var re *sr.ResponseError
if errors.As(err, &re) && re.StatusCode == 404 {
return nil, fmt.Errorf("schema registry contexts are not supported by this cluster")
}
return nil, err
}
return contexts, nil
}
Comment on lines +29 to +47

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[food for thought, not for this PR, but maybe as a follow-up in next releases]

Would be cool to list the context, including the Mode and Compatibility that has been set. There are some other places like subject list, where we could add this as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a good suggestion and ive updated it to show mode and compatability for context list, haven't gone deeper than that but something we can look in to


// checkQualifiedSubjectsEnabled verifies that the cluster has the
// schema_registry_enable_qualified_subjects config set to true via the
// Admin API.
func checkQualifiedSubjectsEnabled(ctx context.Context, fs afero.Fs, profile *config.RpkProfile) error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
cl, err := adminapi.NewClient(ctx, fs, profile)
if err != nil {
return fmt.Errorf("unable to verify schema context support via admin API: %w\nUse --skip-context-check to skip this verification", err)
}
cfg, err := cl.SingleKeyConfig(ctx, qualifiedSubjectsConfigKey)
if err != nil {
return fmt.Errorf("unable to verify schema context support via admin API: %w\nUse --skip-context-check to skip this verification", err)
}
val, exists := cfg[qualifiedSubjectsConfigKey]
if !exists {
return fmt.Errorf("schema contexts are not supported by this cluster (config key %q not found); the cluster may need upgrading", qualifiedSubjectsConfigKey)
}
enabled, ok := val.(bool)
if !ok {
return fmt.Errorf("schema contexts are not supported by this cluster (unexpected value for %q: %v)", qualifiedSubjectsConfigKey, val)
}
if !enabled {
return fmt.Errorf("schema contexts are not enabled on this cluster; you may enable it using:\n rpk cluster config set %s true", qualifiedSubjectsConfigKey)
}
return nil
}

// IsContextSupported checks whether the cluster supports schema contexts
// by verifying the admin API feature flag.
func IsContextSupported(ctx context.Context, fs afero.Fs, profile *config.RpkProfile, skipAdminCheck bool) error {
if skipAdminCheck {
return nil
}
return checkQualifiedSubjectsEnabled(ctx, fs, profile)
}

// ValidateContext validates the schema context name format, loads the
// profile, and confirms the cluster supports contexts via the admin API
// feature flag.
func ValidateContext(ctx context.Context, schemaCtx string, fs afero.Fs, p *config.Params, skipAdminCheck bool) error {
if schemaCtx[0] != '.' {
return fmt.Errorf("invalid schema context %q: context names must start with a '.'", schemaCtx)
}
if strings.Contains(schemaCtx, ":") {
return fmt.Errorf("invalid schema context %q: context names must not contain ':'", schemaCtx)
}
profile, err := p.LoadVirtualProfile(fs)
if err != nil {
return fmt.Errorf("rpk unable to load config: %w", err)
}
return IsContextSupported(ctx, fs, profile, skipAdminCheck)
}

func NewCommand(fs afero.Fs, p *config.Params, _ *string) *cobra.Command {
cmd := &cobra.Command{
Use: "context",
Args: cobra.ExactArgs(0),
Short: "Manage schema registry contexts",
Comment thread
c-julin marked this conversation as resolved.
Long: `Manage schema registry contexts.

Schema contexts provide namespace isolation within the schema registry,
allowing multiple independent sets of subjects and schemas to coexist.

Before using schema contexts, the cluster must have the
schema_registry_enable_qualified_subjects configuration set to true. You
can enable it with:

rpk cluster config set schema_registry_enable_qualified_subjects true

Use the --schema-context flag on the parent 'registry' command to scope
operations to a specific context.
`,
}
cmd.AddCommand(
listCommand(fs, p),
deleteCommand(fs, p),
)
return cmd
}
Loading
Loading