Skip to content

Conversation

@FZambia
Copy link
Member

@FZambia FZambia commented Sep 23, 2025

This allows clients to subscribe to a channel while providing publication tags filter, so that only relevant publications with tags that satisfy the provided filter are delivered. This may optimize bandwidth and reduce processing overhead on the client side.

Design goals

To align with Centrifuge’s goals, the publication filter implementation should follow these principles:

  • Be zero allocation to evaluate, because it is in the hot path with broadcast to many subscribers.
  • Be easy to serialize/deserialize to/from Protobuf and fully (easy)JSON compatible.
  • Be programmatically constructible, making it easy to build filters in code based on app conditions.
  • Be simple. If it's a custom implementation, then we want to avoid too much complexity which can limit the usage.
  • Be secure. Be not Turing complete. Only filter based on what subscriber can see in the Publication anyway.

Implementation

The initial idea was to use CEL expressions for the filtering. See #502 for the community PR with CEL support. But during the evaluation of the idea observed a huge allocation penalty which may come with CEL expression evaluations during publication broadcasts to many subscribers - allocs happen in the hot path where we don't want to have allocations at all (see below the benchmark). Thus more CPU usage, more load on GC. And unfortunately the real overhead is not very predictable and can be performance killer. Another concern – CEL expressions are hard to make programatically – no API for that on the client side. So while we successfully using CEL in other places in Centrifugo – here it seems not a good fit.

So the decision was to proceed with a custom implementation, it's of course limited compared to CEL and other expression engines – but it fully follows the principles outlined above.

Every Publication may contain tags which is map[string]string in Protobuf schema. The mechanism introduced here allows filtering publications based on attached tags.

The filtering system is based on NodeFilter message type from the Protobuf schema (note, we are not using enums for JSON interop reasons) which may be passed in subscription request:

message FilterNode {
  // Operation type for this node:
  // - "" (empty string) → leaf node (comparison)
  // - "and" → logical AND of child nodes
  // - "or" → logical OR of child nodes
  // - "not" → logical NOT of a single child node
  string op = 1;
  // Key for comparison (only valid for leaf nodes).
  string key = 2;
  // Comparison operator for leaf nodes.
  // Only meaningful if op == "".
  // Supported values:
  //   "eq"   → equal
  //   "neq"  → not equal
  //   "in"   → value is in vals
  //   "nin"  → value is not in vals
  //   "ex"   → key exists in tags
  //   "nex"  → key does not exist
  //   "sw"   → string starts with val
  //   "ew"   → string ends with val
  //   "ct"   → string contains val
  //   "lt"   → numeric less than val
  //   "lte"  → numeric less than or equal val
  //   "gt"   → numeric greater than val
  //   "gte"  → numeric greater than or equal val
  string cmp = 3;
  // Single value used in most comparisons (e.g. "eq").
  string val = 4;
  // Multiple values used for set comparisons ("in", "nin").
  repeated string vals = 5;
  // Child nodes.
  // Used for logical operations: "and", "or", "not".
  repeated FilterNode nodes = 6;
}

An optional tf (tags filter) field with NodeFilter may be set in SubscribeRequest. Since filtering relies to the client protocol type – the filtering is not pluggable, but part of the core.

Let's compare to CEL. Example, the overhead of broadcasting to 10k subscribers:

BenchmarkFilterCompareFilterNode10k-8                      	    1280	    932989 ns/op	       0 B/op	       0 allocs/op
BenchmarkFilterCompareCEL10k-8                             	     357	   3345105 ns/op	 1040453 B/op	   70000 allocs/op

Or, memory overhead for creating the same filter:

BenchmarkFilterCompareMemoryFilterNode-8                   	 5785044	       207.9 ns/op	     664 B/op	       5 allocs/op
BenchmarkFilterCompareCELMemory-8                          	    8680	    123834 ns/op	   92969 B/op	    1464 allocs/op

Tags filtering additionally supports:

  • publication automatic recovery in stream mode - only publications corresponding to the filter will be returned in successfully recovered case
  • cache recovery mode – only latest publication corresponding to the filter will be returned in successfully recovered scenario

Filters are validated at subscription time. Common validation errors include:

  • Missing cmp for leaf nodes.
  • Missing key for leaf nodes (except for ex and nex).
  • Empty vals for in or nin comparisons.
  • Non-empty val or vals for ex or nex.
  • Invalid op or cmp values.
  • not nodes with more or less than one child.
  • and or or nodes with no children.

If a filter is invalid, the server returns ErrorBadRequest error to the client, and the subscription is rejected.

During filter evaluation (publication broadcast):

  • Invalid numeric values for gt, gte, lt, or lte comparisons result in the filter evaluating to false.
  • Unknown cmp or op values result in the filter evaluates to false, though it must be normally checked during validation.

Example

Example of possible filter usage in centrifuge-js:

const tagsFilter = {
    op: "", // leaf node.
    key: "ticker",
    cmp: "eq",
    val: ticker
};

const sub = centrifuge.newSubscription(channel, {
    tagsFilter: tagsFilter
});

Or sth more complex:

const tagsFilter = {
    op: "and",
    nodes: [
        {
            op: "", // leaf node for comparison
            key: "ticker",
            cmp: "eq",
            val: "AAPL"
        },
        {
            op: "", // leaf node for comparison
            key: "source",
            cmp: "eq",
            val: "NASDAQ"
        }
    ]
};

It's rather simple to provide a type-safe (and typo-safe) helper for filter construction on the real-time SDK side, but not doing it for now. I.e. sth like this may be done (can live outside of SDK actually), sth like this:

// Tags filter construction helper.
const Filter = {
  // Comparison operators
  eq(key, val) {
    return { op: "", key, cmp: "eq", val };
  },
  neq(key, val) {
    return { op: "", key, cmp: "neq", val };
  },
  in(key, vals) {
    return { op: "", key, cmp: "in", vals };
  },
  nin(key, vals) {
    return { op: "", key, cmp: "nin", vals };
  },
  exists(key) {
    return { op: "", key, cmp: "ex" };
  },
  notExists(key) {
    return { op: "", key, cmp: "nex" };
  },
  startsWith(key, val) {
    return { op: "", key, cmp: "sw", val };
  },
  endsWith(key, val) {
    return { op: "", key, cmp: "ew", val };
  },
  contains(key, val) {
    return { op: "", key, cmp: "ct", val };
  },
  gt(key, val) {
    return { op: "", key, cmp: "gt", val };
  },
  gte(key, val) {
    return { op: "", key, cmp: "gte", val };
  },
  lt(key, val) {
    return { op: "", key, cmp: "lt", val };
  },
  lte(key, val) {
    return { op: "", key, cmp: "lte", val };
  },

  // Logical operators
  and(...nodes) {
    return { op: "and", nodes };
  },
  or(...nodes) {
    return { op: "or", nodes };
  },
  not(node) {
    return { op: "not", nodes: [node] };
  }
};

// Example usage:
const tagsFilter = Filter.and(
  Filter.eq("ticker", "AAPL"),
  Filter.eq("source", "NASDAQ")
);

const sub = centrifuge.newSubscription(channel, { tagsFilter });

// Complex example:
const complexFilter = Filter.or(
  Filter.and(
    Filter.eq("ticker", "AAPL"),
    Filter.exists("price")
  ),
  Filter.not(Filter.contains("source", "REUTERS"))
);

@codecov
Copy link

codecov bot commented Sep 23, 2025

Codecov Report

❌ Patch coverage is 87.27545% with 85 lines in your changes missing coverage. Please review.
✅ Project coverage is 83.82%. Comparing base (ecba7e1) to head (e41f318).
⚠️ Report is 2 commits behind head on master.

Files with missing lines Patch % Lines
client.go 71.29% 24 Missing and 7 partials ⚠️
hub.go 88.93% 10 Missing and 19 partials ⚠️
node.go 63.26% 15 Missing and 3 partials ⚠️
internal/recovery/helpers.go 73.07% 7 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master     #511      +/-   ##
==========================================
+ Coverage   83.45%   83.82%   +0.36%     
==========================================
  Files          39       42       +3     
  Lines        9123     9544     +421     
==========================================
+ Hits         7614     8000     +386     
- Misses       1150     1166      +16     
- Partials      359      378      +19     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@torbensen
Copy link

Hi, just to clarify — our PR was accidentally closed because the forked repo got mistakenly deleted by a teammate.

We originally explored CEL for its flexibility and expressive power, but as you mentioned, it introduces high allocation and GC pressure in the broadcast hot path.

Your tag-based filtering approach is a much better fit for Centrifuge’s design goals: zero allocations, simple and efficient structure, native Protobuf/JSON compatibility. While it’s less general than CEL, it covers the core subscription filtering needs while keeping performance predictable.

Thanks a lot for implementing this and aligning it so well with the project’s priorities!

@FZambia
Copy link
Member Author

FZambia commented Sep 29, 2025

@torbensen awesome, thx for letting me know!

Aiming to release filtering by tags soon, need to prepare various parts for it - docs, blog post, SDK changes. It will be part of Javascript SDK only on start. If you find out sth related to it – please let me know.

@FZambia FZambia merged commit 267dce3 into master Oct 12, 2025
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants