Skip to content
This repository was archived by the owner on Nov 14, 2019. It is now read-only.

Commit 922b484

Browse files
committed
Implements Carbon namespacing
1 parent 15e00f1 commit 922b484

File tree

7 files changed

+135
-32
lines changed

7 files changed

+135
-32
lines changed

config.default.json.example

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,13 @@
1010
{
1111
"type" : "carbon",
1212
"address" : "localhost",
13-
"port" : 2003,
14-
"frequency" : 10
13+
"port" : 2004,
14+
"frequency" : 10,
15+
"pickle" : true,
16+
"global_prefix" : "stats",
17+
"prefix_counter" : "counters",
18+
"prefix_timer" : "timers",
19+
"prefix_gauge" : "gauges"
1520
}
1621
],
1722

src/backend.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ struct brubeck_backend {
1313

1414
int (*connect)(void *);
1515
bool (*is_connected)(void *);
16-
void (*sample)(const char *, value_t, void *);
16+
void (*sample)(uint8_t, const char *, value_t, void *);
1717
void (*flush)(void *);
1818

1919
uint32_t tick_time;

src/backends/carbon.c

Lines changed: 92 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ static void carbon_disconnect(struct brubeck_carbon *self)
4545
}
4646

4747
static void plaintext_each(
48+
uint8_t type,
4849
const char *key,
4950
value_t value,
5051
void *backend)
@@ -58,6 +59,26 @@ static void plaintext_each(
5859
if (!carbon_is_connected(carbon))
5960
return;
6061

62+
if (carbon->namespacing.global) {
63+
memcpy(ptr, carbon->namespacing.global, carbon->namespacing.global_len);
64+
ptr += carbon->namespacing.global_len;
65+
*ptr++ = '.';
66+
}
67+
68+
if ((type == BRUBECK_MT_COUNTER || type == BRUBECK_MT_METER) && carbon->namespacing.counter) {
69+
memcpy(ptr, carbon->namespacing.counter, carbon->namespacing.counter_len);
70+
ptr += carbon->namespacing.counter_len;
71+
*ptr++ = '.';
72+
} else if (type == BRUBECK_MT_TIMER && carbon->namespacing.timer) {
73+
memcpy(ptr, carbon->namespacing.timer, carbon->namespacing.timer_len);
74+
ptr += carbon->namespacing.timer_len;
75+
*ptr++ = '.';
76+
} else if (type == BRUBECK_MT_GAUGE && carbon->namespacing.gauge) {
77+
memcpy(ptr, carbon->namespacing.gauge, carbon->namespacing.gauge_len);
78+
ptr += carbon->namespacing.gauge_len;
79+
*ptr++ = '.';
80+
}
81+
6182
memcpy(ptr, key, key_len);
6283
ptr += key_len;
6384
*ptr++ = ' ';
@@ -103,18 +124,56 @@ static inline size_t pickle1_double(char *ptr, void *_src)
103124
}
104125

105126
static void pickle1_push(
106-
struct pickler *buf,
127+
struct brubeck_carbon *carbon,
128+
uint8_t type,
107129
const char *key,
108130
uint8_t key_len,
109-
uint32_t timestamp,
110131
value_t value)
111132
{
133+
uint8_t namespaced_key_len = 0;
134+
char *type_namespace = NULL;
135+
size_t type_namespace_len = 0;
136+
struct pickler *buf = &carbon->pickler;
112137
char *ptr = buf->ptr + buf->pos;
113138

139+
if (carbon->namespacing.global) {
140+
// the global namespace plus the "." character
141+
namespaced_key_len += carbon->namespacing.global_len + 1;
142+
}
143+
144+
if ((type == BRUBECK_MT_COUNTER || type == BRUBECK_MT_METER) && carbon->namespacing.counter) {
145+
type_namespace = carbon->namespacing.counter;
146+
type_namespace_len = carbon->namespacing.counter_len;
147+
// the counter namespace plus the "." character
148+
namespaced_key_len += carbon->namespacing.counter_len + 1;
149+
} else if (type == BRUBECK_MT_TIMER && carbon->namespacing.timer) {
150+
type_namespace = carbon->namespacing.timer;
151+
type_namespace_len = carbon->namespacing.timer_len;
152+
// the counter namespace plus the "." character
153+
namespaced_key_len += carbon->namespacing.timer_len + 1;
154+
} else if (type == BRUBECK_MT_GAUGE && carbon->namespacing.gauge) {
155+
type_namespace = carbon->namespacing.gauge;
156+
type_namespace_len = carbon->namespacing.gauge_len;
157+
// the counter namespace plus the "." character
158+
namespaced_key_len += carbon->namespacing.gauge_len + 1;
159+
}
160+
161+
namespaced_key_len += key_len;
162+
114163
*ptr++ = '(';
115164

116165
*ptr++ = 'U';
117-
*ptr++ = key_len;
166+
*ptr++ = namespaced_key_len;
167+
if (carbon->namespacing.global) {
168+
memcpy(ptr, carbon->namespacing.global, carbon->namespacing.global_len);
169+
ptr += carbon->namespacing.global_len;
170+
*ptr++ = '.';
171+
}
172+
if (type_namespace) {
173+
memcpy(ptr, type_namespace, type_namespace_len);
174+
ptr += type_namespace_len;
175+
*ptr++ = '.';
176+
}
118177
memcpy(ptr, key, key_len);
119178
ptr += key_len;
120179

@@ -123,7 +182,7 @@ static void pickle1_push(
123182

124183
*ptr++ = '(';
125184

126-
ptr += pickle1_int32(ptr, &timestamp);
185+
ptr += pickle1_int32(ptr, &carbon->backend.tick_time);
127186
ptr += pickle1_double(ptr, &value);
128187

129188
*ptr++ = 't';
@@ -177,6 +236,7 @@ static void pickle1_flush(void *backend)
177236
}
178237

179238
static void pickle1_each(
239+
uint8_t type,
180240
const char *key,
181241
value_t value,
182242
void *backend)
@@ -192,23 +252,27 @@ static void pickle1_each(
192252
if (!carbon_is_connected(carbon))
193253
return;
194254

195-
pickle1_push(&carbon->pickler, key, key_len,
196-
carbon->backend.tick_time, value);
255+
pickle1_push(carbon, type, key, key_len, value);
197256
}
198257

199258
struct brubeck_backend *
200259
brubeck_carbon_new(struct brubeck_server *server, json_t *settings, int shard_n)
201260
{
202261
struct brubeck_carbon *carbon = xcalloc(1, sizeof(struct brubeck_carbon));
203262
char *address;
263+
char *global_prefix = NULL, *prefix_counter = NULL, *prefix_timer = NULL, *prefix_gauge = NULL;
204264
int port, frequency, pickle = 0;
205265

206266
json_unpack_or_die(settings,
207-
"{s:s, s:i, s?:b, s:i}",
267+
"{s:s, s:i, s?:b, s:i, s?:s, s?:s, s?:s, s?:s}",
208268
"address", &address,
209269
"port", &port,
210270
"pickle", &pickle,
211-
"frequency", &frequency);
271+
"frequency", &frequency,
272+
"global_prefix", &global_prefix,
273+
"prefix_counter", &prefix_counter,
274+
"prefix_timer", &prefix_timer,
275+
"prefix_gauge", &prefix_gauge);
212276

213277
carbon->backend.type = BRUBECK_BACKEND_CARBON;
214278
carbon->backend.shard_n = shard_n;
@@ -225,6 +289,26 @@ brubeck_carbon_new(struct brubeck_server *server, json_t *settings, int shard_n)
225289
carbon->backend.flush = NULL;
226290
}
227291

292+
if (global_prefix) {
293+
carbon->namespacing.global = global_prefix;
294+
carbon->namespacing.global_len = strlen(global_prefix);
295+
}
296+
297+
if (prefix_counter) {
298+
carbon->namespacing.counter = prefix_counter;
299+
carbon->namespacing.counter_len = strlen(prefix_counter);
300+
}
301+
302+
if (prefix_timer) {
303+
carbon->namespacing.timer = prefix_timer;
304+
carbon->namespacing.timer_len = strlen(prefix_timer);
305+
}
306+
307+
if (prefix_gauge) {
308+
carbon->namespacing.gauge = prefix_gauge;
309+
carbon->namespacing.gauge_len = strlen(prefix_gauge);
310+
}
311+
228312
carbon->backend.sample_freq = frequency;
229313
carbon->backend.server = server;
230314
carbon->out_sock = -1;

src/backends/carbon.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,19 @@ struct brubeck_carbon {
1515
uint16_t pos;
1616
uint16_t pt;
1717
} pickler;
18+
struct namespacing {
19+
char *global;
20+
size_t global_len;
21+
22+
char *counter;
23+
size_t counter_len;
24+
25+
char *timer;
26+
size_t timer_len;
27+
28+
char *gauge;
29+
size_t gauge_len;
30+
} namespacing;
1831
size_t sent;
1932
};
2033

src/internal_sampler.c

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,44 +14,44 @@ brubeck_internal__sample(struct brubeck_metric *metric, brubeck_sample_cb sample
1414
WITH_SUFFIX(".metrics") {
1515
value = brubeck_atomic_swap(&stats->live.metrics, 0);
1616
stats->sample.metrics = value;
17-
sample(key, (value_t)value, opaque);
17+
sample(metric->type, key, (value_t)value, opaque);
1818
}
1919

2020
WITH_SUFFIX(".errors") {
2121
value = brubeck_atomic_swap(&stats->live.errors, 0);
2222
stats->sample.errors = value;
23-
sample(key, (value_t)value, opaque);
23+
sample(metric->type, key, (value_t)value, opaque);
2424
}
2525

2626
WITH_SUFFIX(".unique_keys") {
2727
value = brubeck_atomic_fetch(&stats->live.unique_keys);
2828
stats->sample.unique_keys = value;
29-
sample(key, (value_t)value, opaque);
29+
sample(metric->type, key, (value_t)value, opaque);
3030
}
3131

3232
/* Secure statsd endpoint */
3333
WITH_SUFFIX(".secure.failed") {
3434
value = brubeck_atomic_swap(&stats->live.secure.failed, 0);
3535
stats->sample.secure.failed = value;
36-
sample(key, (value_t)value, opaque);
36+
sample(metric->type, key, (value_t)value, opaque);
3737
}
3838

3939
WITH_SUFFIX(".secure.from_future") {
4040
value = brubeck_atomic_swap(&stats->live.secure.from_future, 0);
4141
stats->sample.secure.from_future = value;
42-
sample(key, (value_t)value, opaque);
42+
sample(metric->type, key, (value_t)value, opaque);
4343
}
4444

4545
WITH_SUFFIX(".secure.delayed") {
4646
value = brubeck_atomic_swap(&stats->live.secure.delayed, 0);
4747
stats->sample.secure.delayed = value;
48-
sample(key, (value_t)value, opaque);
48+
sample(metric->type, key, (value_t)value, opaque);
4949
}
5050

5151
WITH_SUFFIX(".secure.replayed") {
5252
value = brubeck_atomic_swap(&stats->live.secure.replayed, 0);
5353
stats->sample.secure.replayed = value;
54-
sample(key, (value_t)value, opaque);
54+
sample(metric->type, key, (value_t)value, opaque);
5555
}
5656

5757
/*

src/metric.c

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ gauge__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *opa
6464
}
6565
pthread_spin_unlock(&metric->lock);
6666

67-
sample(metric->key, value, opaque);
67+
sample(metric->type, metric->key, value, opaque);
6868
}
6969

7070

@@ -98,7 +98,7 @@ meter__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *opa
9898
}
9999
pthread_spin_unlock(&metric->lock);
100100

101-
sample(metric->key, value, opaque);
101+
sample(metric->type, metric->key, value, opaque);
102102
}
103103

104104

@@ -140,7 +140,7 @@ counter__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *o
140140
}
141141
pthread_spin_unlock(&metric->lock);
142142

143-
sample(metric->key, value, opaque);
143+
sample(metric->type, metric->key, value, opaque);
144144
}
145145

146146

@@ -178,47 +178,47 @@ histogram__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void
178178
memcpy(key, metric->key, metric->key_len);
179179

180180
WITH_SUFFIX(".min") {
181-
sample(key, hsample.min, opaque);
181+
sample(metric->type, key, hsample.min, opaque);
182182
}
183183

184184
WITH_SUFFIX(".max") {
185-
sample(key, hsample.max, opaque);
185+
sample(metric->type, key, hsample.max, opaque);
186186
}
187187

188188
WITH_SUFFIX(".sum") {
189-
sample(key, hsample.sum, opaque);
189+
sample(metric->type, key, hsample.sum, opaque);
190190
}
191191

192192
WITH_SUFFIX(".mean") {
193-
sample(key, hsample.mean, opaque);
193+
sample(metric->type, key, hsample.mean, opaque);
194194
}
195195

196196
WITH_SUFFIX(".count") {
197-
sample(key, hsample.count, opaque);
197+
sample(metric->type, key, hsample.count, opaque);
198198
}
199199

200200
WITH_SUFFIX(".median") {
201-
sample(key, hsample.median, opaque);
201+
sample(metric->type, key, hsample.median, opaque);
202202
}
203203

204204
WITH_SUFFIX(".percentile.75") {
205-
sample(key, hsample.percentile[PC_75], opaque);
205+
sample(metric->type, key, hsample.percentile[PC_75], opaque);
206206
}
207207

208208
WITH_SUFFIX(".percentile.95") {
209-
sample(key, hsample.percentile[PC_95], opaque);
209+
sample(metric->type, key, hsample.percentile[PC_95], opaque);
210210
}
211211

212212
WITH_SUFFIX(".percentile.98") {
213-
sample(key, hsample.percentile[PC_98], opaque);
213+
sample(metric->type, key, hsample.percentile[PC_98], opaque);
214214
}
215215

216216
WITH_SUFFIX(".percentile.99") {
217-
sample(key, hsample.percentile[PC_99], opaque);
217+
sample(metric->type, key, hsample.percentile[PC_99], opaque);
218218
}
219219

220220
WITH_SUFFIX(".percentile.999") {
221-
sample(key, hsample.percentile[PC_999], opaque);
221+
sample(metric->type, key, hsample.percentile[PC_999], opaque);
222222
}
223223
}
224224

src/metric.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ struct brubeck_metric {
5555
};
5656

5757
typedef void (*brubeck_sample_cb)(
58+
uint8_t type,
5859
const char *key,
5960
value_t value,
6061
void *backend);

0 commit comments

Comments
 (0)