Skip to content

[server] Improve observability regarding communication with messagebus #3607

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions components/gitpod-messagebus/src/messagebus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -442,11 +442,11 @@ export abstract class AbstractTopicListener<T> implements MessagebusListener {
constructor(protected readonly exchangeName: string, protected readonly listener: TopicListener<T>) { }

async establish(channel: Channel): Promise<void> {
const topic = await this.topic();
const topic = this.topic();
return this.doEstablish(channel, topic);
}

abstract topic(): Promise<string>;
abstract topic(): string;

protected async doEstablish(channel: Channel, topic: string): Promise<void> {
if (channel === undefined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ export class RabbitMQConsensusLeaderMessenger extends AbstractMessageBusIntegrat

class EventListener extends AbstractTopicListener<RaftMessage> {

async topic(): Promise<string> {
topic(): string {
return '';
}

Expand Down
13 changes: 13 additions & 0 deletions components/server/src/prometheus-metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,16 @@ const httpRequestDuration = new prometheusClient.Histogram({
export function observeHttpRequestDuration(method: string, route: string, statusCode: number, durationInSeconds: number) {
httpRequestDuration.observe({ method, route, statusCode }, durationInSeconds);
}

const messagebusTopicReads = new prometheusClient.Counter({
name: 'gitpod_server_topic_reads_total',
help: 'The amount of reads from messagebus topics.',
labelNames: ['topic'],
registers: [prometheusClient.register]
});

export function increaseMessagebusTopicReads(topic: string) {
messagebusTopicReads.inc({
topic,
})
}
7 changes: 5 additions & 2 deletions components/server/src/workspace/messagebus-integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ import { Channel, Message } from "amqplib";
import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing";
import * as opentracing from "opentracing";
import { CancellationTokenSource } from "vscode-ws-jsonrpc";
import { increaseMessagebusTopicReads } from '../prometheus-metrics';

export class WorkspaceInstanceUpdateListener extends AbstractTopicListener<WorkspaceInstance> {

constructor(protected readonly messageBusHelper: MessageBusHelper, listener: TopicListener<WorkspaceInstance>, protected readonly userId?: string) {
super(messageBusHelper.workspaceExchange, listener);
}

async topic() {
topic() {
return this.messageBusHelper.getWsTopicForListening(this.userId, undefined, "updates");
}
}
Expand All @@ -31,7 +32,7 @@ export class HeadlessWorkspaceLogListener extends AbstractTopicListener<Headless
super(messageBusHelper.workspaceExchange, listener);
}

async topic() {
topic() {
return this.messageBusHelper.getWsTopicForListening(undefined, this.workspaceID, "headless-log");
}

Expand Down Expand Up @@ -117,6 +118,7 @@ export class MessageBusIntegration extends AbstractMessageBusIntegration {
const listener = new HeadlessWorkspaceLogListener(this.messageBusHelper, callback, workspaceID);
const cancellationTokenSource = new CancellationTokenSource()
this.listen(listener, cancellationTokenSource.token);
increaseMessagebusTopicReads(listener.topic())
return Disposable.create(() => cancellationTokenSource.cancel())
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not listen in listenForPrebuildUpdatableQueue as well? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PrebuildUpdatableQueue's topic didn't have a name attribute. I had a quick talk with @csweichel last week and we agreed on removing the metric over there.

@csweichel I feel like we had another reason that I can't remember right now. Do you remember if there were more reasons to not expose a metric for this queue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

primarily the fact that it's a queue and not an exchange and we didn't find a good naming scheme.

Expand All @@ -131,6 +133,7 @@ export class MessageBusIntegration extends AbstractMessageBusIntegration {
const listener = new WorkspaceInstanceUpdateListener(this.messageBusHelper, callback, userId);
const cancellationTokenSource = new CancellationTokenSource()
this.listen(listener, cancellationTokenSource.token);
increaseMessagebusTopicReads(listener.topic())
return Disposable.create(() => cancellationTokenSource.cancel())
}

Expand Down
4 changes: 2 additions & 2 deletions components/ws-manager-bridge/src/messagebus-integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class WorkspaceStatusUpdateListener extends AbstractTopicListener<WorkspaceStatu
super(MessageBusIntegration.WORKSPACE_EXCHANGE, listener);
}

async topic() {
topic() {
return this._topic;
}
}
Expand All @@ -115,7 +115,7 @@ class WorkspaceLogListener extends AbstractTopicListener<WorkspaceLogMessage.AsO
super(MessageBusIntegration.WORKSPACE_EXCHANGE, listener);
}

async topic() {
topic() {
return this._topic;
}
}