2222import com.mongodb.event.ClusterListener;
2323import com.mongodb.event.ClusterOpeningEvent;
2424import com.mongodb.event.CommandEvent;
25+ import io.micrometer.common.lang.Nullable;
2526import io.micrometer.core.instrument.MeterRegistry;
2627import io.micrometer.core.instrument.Tag;
2728import io.micrometer.core.instrument.Tags;
2829import io.micrometer.core.instrument.search.MeterNotFoundException;
2930import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
3031import org.bson.Document;
3132import org.junit.jupiter.api.AfterEach;
32- import org.junit.jupiter.api.BeforeEach;
3333import org.junit.jupiter.api.Test;
3434
3535import java.util.Date;
3838import java.util.concurrent.atomic.AtomicReference;
3939
4040import static java.util.Collections.singletonList;
41+ import static java.util.Objects.requireNonNull;
4142import static org.assertj.core.api.Assertions.assertThat;
43+ import static org.awaitility.Awaitility.await;
44+ import static org.hamcrest.Matchers.notNullValue;
4245
4346/**
4447 * Tests for {@link MongoMetricsCommandListener}.
4851 */
4952class MongoMetricsCommandListenerTest extends AbstractMongoDbTest {
5053
51- private MeterRegistry registry;
52-
53- private AtomicReference<String> clusterId;
54+ private MeterRegistry registry = new SimpleMeterRegistry();
5455
56+ @Nullable
5557 private MongoClient mongo;
5658
57- @BeforeEach
58- void setup() {
59- clusterId = new AtomicReference<>();
60- // tag::setup[]
61- registry = new SimpleMeterRegistry();
62- MongoClientSettings settings = MongoClientSettings.builder()
63- .addCommandListener(new MongoMetricsCommandListener(registry))
64- .applyToClusterSettings(builder -> builder.hosts(singletonList(new ServerAddress(host, port)))
65- .addClusterListener(new ClusterListener() {
66- @Override
67- public void clusterOpening(ClusterOpeningEvent event) {
68- clusterId.set(event.getClusterId().getValue());
69- }
70- }))
71- .build();
72- mongo = MongoClients.create(settings);
73- // end::setup[]
74- }
75-
7659 @Test
7760 void shouldCreateSuccessCommandMetric() {
61+ AtomicReference<String> clusterIdRef = new AtomicReference<>();
62+ mongo = createMongoClient(clusterIdRef);
63+
64+ assertThat(mongo).isNotNull();
65+ String clusterId = await().untilAtomic(clusterIdRef, notNullValue());
66+
7867 // tag::example[]
7968 mongo.getDatabase("test").getCollection("testCol").insertOne(new Document("testDoc", new Date()));
8069
81- Tags tags = Tags.of("cluster.id", clusterId.get() , "server.address", String.format("%s:%s", host, port),
82- "command", " insert", "database", "test", "collection", "testCol", "status", "SUCCESS");
70+ Tags tags = Tags.of("cluster.id", clusterId, "server.address", String.format("%s:%s", host, port), "command" ,
71+ "insert", "database", "test", "collection", "testCol", "status", "SUCCESS");
8372 assertThat(registry.get("mongodb.driver.commands").tags(tags).timer().count()).isEqualTo(1);
8473 // end::example[]
8574 }
8675
8776 @Test
8877 void shouldCreateFailedCommandMetric() {
78+ AtomicReference<String> clusterIdRef = new AtomicReference<>();
79+ mongo = createMongoClient(clusterIdRef);
80+
81+ assertThat(mongo).isNotNull();
82+ String clusterId = await().untilAtomic(clusterIdRef, notNullValue());
83+
8984 mongo.getDatabase("test").getCollection("testCol").dropIndex("nonExistentIndex");
9085
91- Tags tags = Tags.of("cluster.id", clusterId.get() , "server.address", String.format("%s:%s", host, port),
92- "command", " dropIndexes", "database", "test", "collection", "testCol", "status", "FAILED");
86+ Tags tags = Tags.of("cluster.id", clusterId, "server.address", String.format("%s:%s", host, port), "command" ,
87+ "dropIndexes", "database", "test", "collection", "testCol", "status", "FAILED");
9388 assertThat(registry.get("mongodb.driver.commands").tags(tags).timer().count()).isEqualTo(1);
9489 }
9590
9691 @Test
9792 void shouldCreateSuccessCommandMetricWithCustomSettings() {
93+ AtomicReference<String> clusterIdRef = new AtomicReference<>();
94+
9895 MongoCommandTagsProvider tagsProvider = new DefaultMongoCommandTagsProvider() {
9996 @Override
10097 public Iterable<Tag> commandTags(CommandEvent event) {
@@ -107,21 +104,25 @@ public Iterable<Tag> commandTags(CommandEvent event) {
107104 .addClusterListener(new ClusterListener() {
108105 @Override
109106 public void clusterOpening(ClusterOpeningEvent event) {
110- clusterId .set(event.getClusterId().getValue());
107+ clusterIdRef .set(event.getClusterId().getValue());
111108 }
112109 }))
113110 .build();
114- try (MongoClient mongo = MongoClients.create(settings)) {
115- mongo.getDatabase("test").getCollection("testCol").insertOne(new Document("testDoc", new Date()));
116- Tags tags = Tags.of("cluster.id", clusterId.get(), "server.address", String.format("%s:%s", host, port),
117- "command", "insert", "database", "test", "collection", "testCol", "status", "SUCCESS", "mongoz",
118- "5150");
119- assertThat(registry.get("mongodb.driver.commands").tags(tags).timer().count()).isEqualTo(1);
120- }
111+
112+ mongo = MongoClients.create(settings);
113+ assertThat(mongo).isNotNull();
114+ String clusterId = await().untilAtomic(clusterIdRef, notNullValue());
115+
116+ mongo.getDatabase("test").getCollection("testCol").insertOne(new Document("testDoc", new Date()));
117+ Tags tags = Tags.of("cluster.id", clusterId, "server.address", String.format("%s:%s", host, port), "command",
118+ "insert", "database", "test", "collection", "testCol", "status", "SUCCESS", "mongoz", "5150");
119+ assertThat(registry.get("mongodb.driver.commands").tags(tags).timer().count()).isEqualTo(1);
121120 }
122121
123122 @Test
124123 void shouldCreateFailedCommandMetricWithCustomSettings() {
124+ AtomicReference<String> clusterIdRef = new AtomicReference<>();
125+
125126 MongoCommandTagsProvider tagsProvider = new DefaultMongoCommandTagsProvider() {
126127 @Override
127128 public Iterable<Tag> commandTags(CommandEvent event) {
@@ -134,42 +135,47 @@ public Iterable<Tag> commandTags(CommandEvent event) {
134135 .addClusterListener(new ClusterListener() {
135136 @Override
136137 public void clusterOpening(ClusterOpeningEvent event) {
137- clusterId .set(event.getClusterId().getValue());
138+ clusterIdRef .set(event.getClusterId().getValue());
138139 }
139140 }))
140141 .build();
141- try (MongoClient mongo = MongoClients.create(settings)) {
142- mongo.getDatabase("test").getCollection("testCol").dropIndex("nonExistentIndex");
143- Tags tags = Tags.of("cluster.id", clusterId.get(), "server.address", String.format("%s:%s", host, port),
144- "command", "dropIndexes", "database", "test", "collection", "testCol", "status", "FAILED", "mongoz",
145- "5150");
146- assertThat(registry.get("mongodb.driver.commands").tags(tags).timer().count()).isEqualTo(1);
147- }
142+
143+ mongo = MongoClients.create(settings);
144+ assertThat(mongo).isNotNull();
145+ String clusterId = await().untilAtomic(clusterIdRef, notNullValue());
146+
147+ mongo.getDatabase("test").getCollection("testCol").dropIndex("nonExistentIndex");
148+ Tags tags = Tags.of("cluster.id", clusterId, "server.address", String.format("%s:%s", host, port), "command",
149+ "dropIndexes", "database", "test", "collection", "testCol", "status", "FAILED", "mongoz", "5150");
150+ assertThat(registry.get("mongodb.driver.commands").tags(tags).timer().count()).isEqualTo(1);
148151 }
149152
150153 @Test
151154 void shouldSupportConcurrentCommands() throws InterruptedException {
155+ mongo = createMongoClient(new AtomicReference<>());
156+ assertThat(mongo).isNotNull();
157+
152158 for (int i = 0; i < 20; i++) {
153159 Map<String, Thread> commandThreadMap = new HashMap<>();
154160
155161 commandThreadMap.put("insert",
156- new Thread(() -> mongo.getDatabase("test")
162+ new Thread(() -> requireNonNull( mongo) .getDatabase("test")
157163 .getCollection("testCol")
158164 .insertOne(new Document("testField", new Date()))));
159165
160166 commandThreadMap.put("update",
161- new Thread(() -> mongo.getDatabase("test")
167+ new Thread(() -> requireNonNull( mongo) .getDatabase("test")
162168 .getCollection("testCol")
163169 .updateOne(new Document("nonExistentField", "abc"),
164170 new Document("$set", new Document("nonExistentField", "xyz")))));
165171
166172 commandThreadMap.put("delete",
167- new Thread(() -> mongo.getDatabase("test")
173+ new Thread(() -> requireNonNull( mongo) .getDatabase("test")
168174 .getCollection("testCol")
169175 .deleteOne(new Document("nonExistentField", "abc"))));
170176
171- commandThreadMap.put("aggregate",
172- new Thread(( ) -> mongo.getDatabase("test").getCollection("testCol").countDocuments()));
177+ commandThreadMap.put("aggregate", new Thread(
178+ ( ) -> requireNonNull( mongo) .getDatabase("test").getCollection("testCol").countDocuments()));
173179
174180 for (Thread thread : commandThreadMap.values()) {
175181 thread.start();
@@ -207,4 +213,24 @@ void destroy() {
207213 }
208214 }
209215
216+ private MongoClient createMongoClient(AtomicReference<String> clusterId) {
217+ // tag::setup_1[]
218+ MongoClientSettings settings = MongoClientSettings.builder()
219+ .addCommandListener(new MongoMetricsCommandListener(registry))
220+ // end::setup_1[]
221+ .applyToClusterSettings(builder -> builder.hosts(singletonList(new ServerAddress(host, port)))
222+ .addClusterListener(new ClusterListener() {
223+ @Override
224+ public void clusterOpening(ClusterOpeningEvent event) {
225+ clusterId.set(event.getClusterId().getValue());
226+ }
227+ }))
228+ // tag::setup_2[]
229+ .build();
230+
231+ MongoClient mongo = MongoClients.create(settings);
232+ // end::setup_2[]
233+ return mongo;
234+ }
235+
210236}
0 commit comments