Skip to content
This repository was archived by the owner on Nov 14, 2019. It is now read-only.

Commit 1bb8460

Browse files
committed
Implements Carbon namespacing
1 parent 5d139a4 commit 1bb8460

File tree

7 files changed

+157
-31
lines changed

7 files changed

+157
-31
lines changed

config.default.json.example

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,14 @@
1010
{
1111
"type" : "carbon",
1212
"address" : "localhost",
13-
"port" : 2003,
14-
"frequency" : 10
13+
"port" : 2004,
14+
"frequency" : 10,
15+
"pickle" : true,
16+
"global_prefix" : "stats",
17+
"prefix_counter" : "counters",
18+
"prefix_timer" : "timers",
19+
"prefix_histo" : "histo",
20+
"prefix_gauge" : "gauges"
1521
}
1622
],
1723

src/backend.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ struct brubeck_backend {
1313

1414
int (*connect)(void *);
1515
bool (*is_connected)(void *);
16-
void (*sample)(const char *, value_t, void *);
16+
void (*sample)(uint8_t, const char *, value_t, void *);
1717
void (*flush)(void *);
1818

1919
uint32_t tick_time;

src/backends/carbon.c

Lines changed: 111 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ static void carbon_disconnect(struct brubeck_carbon *self)
4545
}
4646

4747
static void plaintext_each(
48+
uint8_t type,
4849
const char *key,
4950
value_t value,
5051
void *backend)
@@ -58,6 +59,30 @@ static void plaintext_each(
5859
if (!carbon_is_connected(carbon))
5960
return;
6061

62+
if (carbon->namespacing.global) {
63+
memcpy(ptr, carbon->namespacing.global, carbon->namespacing.global_len);
64+
ptr += carbon->namespacing.global_len;
65+
*ptr++ = '.';
66+
}
67+
68+
if ((type == BRUBECK_MT_COUNTER || type == BRUBECK_MT_METER) && carbon->namespacing.counter) {
69+
memcpy(ptr, carbon->namespacing.counter, carbon->namespacing.counter_len);
70+
ptr += carbon->namespacing.counter_len;
71+
*ptr++ = '.';
72+
} else if (type == BRUBECK_MT_TIMER && carbon->namespacing.timer) {
73+
memcpy(ptr, carbon->namespacing.timer, carbon->namespacing.timer_len);
74+
ptr += carbon->namespacing.timer_len;
75+
*ptr++ = '.';
76+
} else if (type == BRUBECK_MT_HISTO && carbon->namespacing.histo) {
77+
memcpy(ptr, carbon->namespacing.histo, carbon->namespacing.histo_len);
78+
ptr += carbon->namespacing.histo_len;
79+
*ptr++ = '.';
80+
} else if (type == BRUBECK_MT_GAUGE && carbon->namespacing.gauge) {
81+
memcpy(ptr, carbon->namespacing.gauge, carbon->namespacing.gauge_len);
82+
ptr += carbon->namespacing.gauge_len;
83+
*ptr++ = '.';
84+
}
85+
6186
memcpy(ptr, key, key_len);
6287
ptr += key_len;
6388
*ptr++ = ' ';
@@ -103,18 +128,61 @@ static inline size_t pickle1_double(char *ptr, void *_src)
103128
}
104129

105130
static void pickle1_push(
106-
struct pickler *buf,
131+
struct brubeck_carbon *carbon,
132+
uint8_t type,
107133
const char *key,
108134
uint8_t key_len,
109-
uint32_t timestamp,
110135
value_t value)
111136
{
137+
uint8_t namespaced_key_len = 0;
138+
char *type_namespace = NULL;
139+
size_t type_namespace_len = 0;
140+
struct pickler *buf = &carbon->pickler;
112141
char *ptr = buf->ptr + buf->pos;
113142

143+
if (carbon->namespacing.global) {
144+
// the global namespace plus the "." character
145+
namespaced_key_len += carbon->namespacing.global_len + 1;
146+
}
147+
148+
if ((type == BRUBECK_MT_COUNTER || type == BRUBECK_MT_METER) && carbon->namespacing.counter) {
149+
type_namespace = carbon->namespacing.counter;
150+
type_namespace_len = carbon->namespacing.counter_len;
151+
// the counter namespace plus the "." character
152+
namespaced_key_len += carbon->namespacing.counter_len + 1;
153+
} else if (type == BRUBECK_MT_TIMER && carbon->namespacing.timer) {
154+
type_namespace = carbon->namespacing.timer;
155+
type_namespace_len = carbon->namespacing.timer_len;
156+
// the counter namespace plus the "." character
157+
namespaced_key_len += carbon->namespacing.timer_len + 1;
158+
} else if (type == BRUBECK_MT_HISTO && carbon->namespacing.histo) {
159+
type_namespace = carbon->namespacing.histo;
160+
type_namespace_len = carbon->namespacing.histo_len;
161+
// the counter namespace plus the "." character
162+
namespaced_key_len += carbon->namespacing.histo_len + 1;
163+
} else if (type == BRUBECK_MT_GAUGE && carbon->namespacing.gauge) {
164+
type_namespace = carbon->namespacing.gauge;
165+
type_namespace_len = carbon->namespacing.gauge_len;
166+
// the counter namespace plus the "." character
167+
namespaced_key_len += carbon->namespacing.gauge_len + 1;
168+
}
169+
170+
namespaced_key_len += key_len;
171+
114172
*ptr++ = '(';
115173

116174
*ptr++ = 'U';
117-
*ptr++ = key_len;
175+
*ptr++ = namespaced_key_len;
176+
if (carbon->namespacing.global) {
177+
memcpy(ptr, carbon->namespacing.global, carbon->namespacing.global_len);
178+
ptr += carbon->namespacing.global_len;
179+
*ptr++ = '.';
180+
}
181+
if (type_namespace) {
182+
memcpy(ptr, type_namespace, type_namespace_len);
183+
ptr += type_namespace_len;
184+
*ptr++ = '.';
185+
}
118186
memcpy(ptr, key, key_len);
119187
ptr += key_len;
120188

@@ -123,7 +191,7 @@ static void pickle1_push(
123191

124192
*ptr++ = '(';
125193

126-
ptr += pickle1_int32(ptr, &timestamp);
194+
ptr += pickle1_int32(ptr, &carbon->backend.tick_time);
127195
ptr += pickle1_double(ptr, &value);
128196

129197
*ptr++ = 't';
@@ -177,6 +245,7 @@ static void pickle1_flush(void *backend)
177245
}
178246

179247
static void pickle1_each(
248+
uint8_t type,
180249
const char *key,
181250
value_t value,
182251
void *backend)
@@ -192,23 +261,32 @@ static void pickle1_each(
192261
if (!carbon_is_connected(carbon))
193262
return;
194263

195-
pickle1_push(&carbon->pickler, key, key_len,
196-
carbon->backend.tick_time, value);
264+
pickle1_push(carbon, type, key, key_len, value);
197265
}
198266

199267
struct brubeck_backend *
200268
brubeck_carbon_new(struct brubeck_server *server, json_t *settings, int shard_n)
201269
{
202270
struct brubeck_carbon *carbon = xcalloc(1, sizeof(struct brubeck_carbon));
203271
char *address;
272+
char *global_prefix = NULL,
273+
*prefix_counter = NULL,
274+
*prefix_timer = NULL,
275+
*prefix_histo = NULL,
276+
*prefix_gauge = NULL;
204277
int port, frequency, pickle = 0;
205278

206279
json_unpack_or_die(settings,
207-
"{s:s, s:i, s?:b, s:i}",
280+
"{s:s, s:i, s?:b, s:i, s?:s, s?:s, s?:s, s?:s, s?:s}",
208281
"address", &address,
209282
"port", &port,
210283
"pickle", &pickle,
211-
"frequency", &frequency);
284+
"frequency", &frequency,
285+
"global_prefix", &global_prefix,
286+
"prefix_counter", &prefix_counter,
287+
"prefix_timer", &prefix_timer,
288+
"prefix_histo", &prefix_histo,
289+
"prefix_gauge", &prefix_gauge);
212290

213291
carbon->backend.type = BRUBECK_BACKEND_CARBON;
214292
carbon->backend.shard_n = shard_n;
@@ -225,6 +303,31 @@ brubeck_carbon_new(struct brubeck_server *server, json_t *settings, int shard_n)
225303
carbon->backend.flush = NULL;
226304
}
227305

306+
if (global_prefix) {
307+
carbon->namespacing.global = global_prefix;
308+
carbon->namespacing.global_len = strlen(global_prefix);
309+
}
310+
311+
if (prefix_counter) {
312+
carbon->namespacing.counter = prefix_counter;
313+
carbon->namespacing.counter_len = strlen(prefix_counter);
314+
}
315+
316+
if (prefix_timer) {
317+
carbon->namespacing.timer = prefix_timer;
318+
carbon->namespacing.timer_len = strlen(prefix_timer);
319+
}
320+
321+
if (prefix_histo) {
322+
carbon->namespacing.histo = prefix_histo;
323+
carbon->namespacing.histo_len = strlen(prefix_histo);
324+
}
325+
326+
if (prefix_gauge) {
327+
carbon->namespacing.gauge = prefix_gauge;
328+
carbon->namespacing.gauge_len = strlen(prefix_gauge);
329+
}
330+
228331
carbon->backend.sample_freq = frequency;
229332
carbon->backend.server = server;
230333
carbon->out_sock = -1;

src/backends/carbon.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,22 @@ struct brubeck_carbon {
1515
uint16_t pos;
1616
uint16_t pt;
1717
} pickler;
18+
struct namespacing {
19+
char *global;
20+
size_t global_len;
21+
22+
char *counter;
23+
size_t counter_len;
24+
25+
char *timer;
26+
size_t timer_len;
27+
28+
char *histo;
29+
size_t histo_len;
30+
31+
char *gauge;
32+
size_t gauge_len;
33+
} namespacing;
1834
size_t sent;
1935
};
2036

src/internal_sampler.c

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,44 +14,44 @@ brubeck_internal__sample(struct brubeck_metric *metric, brubeck_sample_cb sample
1414
WITH_SUFFIX(".metrics") {
1515
value = brubeck_atomic_swap(&stats->live.metrics, 0);
1616
stats->sample.metrics = value;
17-
sample(key, (value_t)value, opaque);
17+
sample(metric->type, key, (value_t)value, opaque);
1818
}
1919

2020
WITH_SUFFIX(".errors") {
2121
value = brubeck_atomic_swap(&stats->live.errors, 0);
2222
stats->sample.errors = value;
23-
sample(key, (value_t)value, opaque);
23+
sample(metric->type, key, (value_t)value, opaque);
2424
}
2525

2626
WITH_SUFFIX(".unique_keys") {
2727
value = brubeck_atomic_fetch(&stats->live.unique_keys);
2828
stats->sample.unique_keys = value;
29-
sample(key, (value_t)value, opaque);
29+
sample(metric->type, key, (value_t)value, opaque);
3030
}
3131

3232
/* Secure statsd endpoint */
3333
WITH_SUFFIX(".secure.failed") {
3434
value = brubeck_atomic_swap(&stats->live.secure.failed, 0);
3535
stats->sample.secure.failed = value;
36-
sample(key, (value_t)value, opaque);
36+
sample(metric->type, key, (value_t)value, opaque);
3737
}
3838

3939
WITH_SUFFIX(".secure.from_future") {
4040
value = brubeck_atomic_swap(&stats->live.secure.from_future, 0);
4141
stats->sample.secure.from_future = value;
42-
sample(key, (value_t)value, opaque);
42+
sample(metric->type, key, (value_t)value, opaque);
4343
}
4444

4545
WITH_SUFFIX(".secure.delayed") {
4646
value = brubeck_atomic_swap(&stats->live.secure.delayed, 0);
4747
stats->sample.secure.delayed = value;
48-
sample(key, (value_t)value, opaque);
48+
sample(metric->type, key, (value_t)value, opaque);
4949
}
5050

5151
WITH_SUFFIX(".secure.replayed") {
5252
value = brubeck_atomic_swap(&stats->live.secure.replayed, 0);
5353
stats->sample.secure.replayed = value;
54-
sample(key, (value_t)value, opaque);
54+
sample(metric->type, key, (value_t)value, opaque);
5555
}
5656

5757
/*

src/metric.c

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ gauge__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *opa
6464
}
6565
pthread_spin_unlock(&metric->lock);
6666

67-
sample(metric->key, value, opaque);
67+
sample(metric->type, metric->key, value, opaque);
6868
}
6969

7070

@@ -98,7 +98,7 @@ meter__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *opa
9898
}
9999
pthread_spin_unlock(&metric->lock);
100100

101-
sample(metric->key, value, opaque);
101+
sample(metric->type, metric->key, value, opaque);
102102
}
103103

104104

@@ -140,7 +140,7 @@ counter__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *o
140140
}
141141
pthread_spin_unlock(&metric->lock);
142142

143-
sample(metric->key, value, opaque);
143+
sample(metric->type, metric->key, value, opaque);
144144
}
145145

146146

@@ -188,43 +188,43 @@ histogram__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void
188188
return;
189189

190190
WITH_SUFFIX(".min") {
191-
sample(key, hsample.min, opaque);
191+
sample(metric->type, key, hsample.min, opaque);
192192
}
193193

194194
WITH_SUFFIX(".max") {
195-
sample(key, hsample.max, opaque);
195+
sample(metric->type, key, hsample.max, opaque);
196196
}
197197

198198
WITH_SUFFIX(".sum") {
199-
sample(key, hsample.sum, opaque);
199+
sample(metric->type, key, hsample.sum, opaque);
200200
}
201201

202202
WITH_SUFFIX(".mean") {
203-
sample(key, hsample.mean, opaque);
203+
sample(metric->type, key, hsample.mean, opaque);
204204
}
205205

206206
WITH_SUFFIX(".median") {
207-
sample(key, hsample.median, opaque);
207+
sample(metric->type, key, hsample.median, opaque);
208208
}
209209

210210
WITH_SUFFIX(".percentile.75") {
211-
sample(key, hsample.percentile[PC_75], opaque);
211+
sample(metric->type, key, hsample.percentile[PC_75], opaque);
212212
}
213213

214214
WITH_SUFFIX(".percentile.95") {
215-
sample(key, hsample.percentile[PC_95], opaque);
215+
sample(metric->type, key, hsample.percentile[PC_95], opaque);
216216
}
217217

218218
WITH_SUFFIX(".percentile.98") {
219-
sample(key, hsample.percentile[PC_98], opaque);
219+
sample(metric->type, key, hsample.percentile[PC_98], opaque);
220220
}
221221

222222
WITH_SUFFIX(".percentile.99") {
223-
sample(key, hsample.percentile[PC_99], opaque);
223+
sample(metric->type, key, hsample.percentile[PC_99], opaque);
224224
}
225225

226226
WITH_SUFFIX(".percentile.999") {
227-
sample(key, hsample.percentile[PC_999], opaque);
227+
sample(metric->type, key, hsample.percentile[PC_999], opaque);
228228
}
229229
}
230230

src/metric.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ struct brubeck_metric {
5555
};
5656

5757
typedef void (*brubeck_sample_cb)(
58+
uint8_t type,
5859
const char *key,
5960
value_t value,
6061
void *backend);

0 commit comments

Comments
 (0)