Skip to content

Commit 22b152a

Browse files
tech: add blocking queue to push batch of aisPosition. add env variables
1 parent 476897b commit 22b152a

File tree

11 files changed

+115
-38
lines changed

11 files changed

+115
-38
lines changed

.env.dev.defaults

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,3 +172,6 @@ MONITORENV_KAFKA_AIS_KEYSTORE=${PWD}/infra/kafka/certs/monitorenv/monitorenv.p12
172172
MONITORENV_KAFKA_AIS_TRUSTSTORE_PASSWORD=changeit
173173
MONITORENV_KAFKA_AIS_KEYSTORE_PASSWORD=changeit
174174
MONITORENV_KAFKA_AIS_KEY_PASSWORD=changeit
175+
MONITORENV_KAFKA_AIS_BATCH_SIZE=100
176+
MONITORENV_KAFKA_AIS_TOPIC=monitorenv.ais.position
177+
MONITORENV_KAFKA_AIS_TIMEOUT=30000

.env.infra.example

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,3 +165,6 @@ MONITORENV_KAFKA_AIS_KEYSTORE=
165165
MONITORENV_KAFKA_AIS_TRUSTSTORE_PASSWORD=
166166
MONITORENV_KAFKA_AIS_KEYSTORE_PASSWORD=
167167
MONITORENV_KAFKA_AIS_KEY_PASSWORD=
168+
MONITORENV_KAFKA_AIS_BATCH_SIZE=100
169+
MONITORENV_KAFKA_AIS_TOPIC=monitorenv.ais.position
170+
MONITORENV_KAFKA_AIS_TIMEOUT=30000

.env.test.defaults

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,3 +166,6 @@ MONITORENV_KAFKA_AIS_KEYSTORE=
166166
MONITORENV_KAFKA_AIS_TRUSTSTORE_PASSWORD=
167167
MONITORENV_KAFKA_AIS_KEYSTORE_PASSWORD=
168168
MONITORENV_KAFKA_AIS_KEY_PASSWORD=
169+
MONITORENV_KAFKA_AIS_BATCH_SIZE=100
170+
MONITORENV_KAFKA_AIS_TOPIC=monitorenv.ais.position
171+
MONITORENV_KAFKA_AIS_TIMEOUT=30000
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package fr.gouv.cacem.monitorenv.config
2+
3+
import org.springframework.boot.context.properties.ConfigurationProperties
4+
import org.springframework.stereotype.Component
5+
6+
@Component
7+
@ConfigurationProperties(prefix = "monitorenv.kafka.ais")
8+
class KafkaAISProperties(
9+
var topic: String = "monitorenv.ais.position",
10+
var timeout: Long = 30000L,
11+
var batchSize: Int = 100,
12+
)

backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/database/repositories/JpaAISPositionRepository.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,9 @@ class JpaAISPositionRepository(
1818
fun save(aisPosition: AISPayload) {
1919
dbAISPositionRepository.save(AISPositionModel.toAISPositionModel(aisPosition))
2020
}
21+
22+
@Transactional
23+
fun saveAll(aisPositions: List<AISPayload>) {
24+
dbAISPositionRepository.saveAll(aisPositions.map { AISPositionModel.toAISPositionModel(it) })
25+
}
2126
}
Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,67 @@
11
package fr.gouv.cacem.monitorenv.infrastructure.kafka
22

3+
import fr.gouv.cacem.monitorenv.config.KafkaAISProperties
34
import fr.gouv.cacem.monitorenv.infrastructure.database.repositories.JpaAISPositionRepository
45
import fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters.AISPayload
6+
import jakarta.annotation.PostConstruct
7+
import jakarta.annotation.PreDestroy
58
import org.slf4j.LoggerFactory
69
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
710
import org.springframework.kafka.annotation.KafkaListener
811
import org.springframework.stereotype.Component
12+
import java.util.concurrent.LinkedBlockingQueue
13+
import java.util.concurrent.TimeUnit
14+
import kotlin.concurrent.thread
915

1016
@Component
1117
@ConditionalOnProperty(value = ["monitorenv.kafka.ais.enabled"], havingValue = "true")
1218
class AISListener(
1319
private val jpaAISPositionRepository: JpaAISPositionRepository,
20+
private val kafkaAISProperties: KafkaAISProperties,
1421
) {
1522
private val logger = LoggerFactory.getLogger(AISListener::class.java)
1623

17-
companion object {
18-
const val TOPIC = "monitorenv.ais.position"
19-
}
24+
private val queue = LinkedBlockingQueue<AISPayload>()
2025

21-
@KafkaListener(topics = [TOPIC])
26+
@KafkaListener(topics = ["\${monitorenv.kafka.ais.topic:monitorenv.ais.position}"])
2227
fun listenAIS(payload: AISPayload) {
23-
try {
24-
jpaAISPositionRepository.save(payload)
25-
} catch (ex: Exception) {
26-
logger.error("Could not save ais position: ${ex.message}")
27-
throw ex
28+
queue.put(payload)
29+
}
30+
31+
@PostConstruct
32+
fun startBatching() {
33+
thread(isDaemon = true, name = "ais-batch-thread") {
34+
val batchAisPayloadToSave = mutableListOf<AISPayload>()
35+
36+
while (!Thread.currentThread().isInterrupted) {
37+
try {
38+
val first = queue.take()
39+
batchAisPayloadToSave.add(first)
40+
41+
val deadline = System.currentTimeMillis() + kafkaAISProperties.timeout
42+
43+
while (batchAisPayloadToSave.size < kafkaAISProperties.batchSize) {
44+
val remainingTime = deadline - System.currentTimeMillis()
45+
if (remainingTime <= 0) break
46+
47+
val aisPayload = queue.poll(remainingTime, TimeUnit.MILLISECONDS)
48+
if (aisPayload != null) {
49+
batchAisPayloadToSave.add(aisPayload)
50+
}
51+
}
52+
53+
jpaAISPositionRepository.saveAll(batchAisPayloadToSave)
54+
} catch (ex: Exception) {
55+
logger.error("Could not save AIS batch", ex)
56+
} finally {
57+
batchAisPayloadToSave.clear()
58+
}
59+
}
2860
}
2961
}
62+
63+
@PreDestroy
64+
fun shutdown() {
65+
logger.info("Shutting down AISListener batch thread")
66+
}
3067
}

backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/kafka/AISProducer.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package fr.gouv.cacem.monitorenv.infrastructure.kafka
22

3-
import fr.gouv.cacem.monitorenv.infrastructure.kafka.AISListener.Companion.TOPIC
3+
import fr.gouv.cacem.monitorenv.config.KafkaAISProperties
44
import fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters.AISPayload
55
import org.slf4j.LoggerFactory
66
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
@@ -21,6 +21,7 @@ import kotlin.random.Random
2121
)
2222
class AISProducer(
2323
private val kafkaTemplate: KafkaTemplate<String, AISPayload>,
24+
private val kafkaAISProperties: KafkaAISProperties,
2425
) {
2526
private val logger = LoggerFactory.getLogger(AISProducer::class.java)
2627

@@ -37,7 +38,7 @@ class AISProducer(
3738
try {
3839
logger.info("Sending AIS positions...")
3940
kafkaTemplate.send(
40-
TOPIC,
41+
kafkaAISProperties.topic,
4142
AISPayload(
4243
mmsi = Random.nextInt(0, 999999999),
4344
coord = generateRandomPoint(),

backend/src/main/resources/application.yml

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,13 @@ host:
1313
monitorenv:
1414
kafka:
1515
ais:
16+
batch-size: ${MONITORENV_KAFKA_AIS_BATCH_SIZE}
17+
topic: ${MONITORENV_KAFKA_AIS_TOPIC}
18+
timeout: ${MONITORENV_KAFKA_AIS_TIMEOUT}
1619
enabled: ${MONITORENV_KAFKA_AIS_ENABLED}
1720
producer:
18-
enabled: ${MONITORENV_KAFKA_AIS_PRODUCER_ENABLED}
21+
enabled: ${MONITORENV_KAFKA_AIS_PRODUCER_ENABLED:false}
22+
1923
ajp:
2024
port: 8000
2125
version: ${VERSION}
@@ -88,15 +92,15 @@ spring:
8892
consumer:
8993
group-id: ${MONITORENV_KAFKA_AIS_GROUP_ID:test}
9094
auto-offset-reset: latest
91-
95+
9296
properties:
9397
security.protocol: SSL
94-
98+
9599
ssl.keystore.location: ${MONITORENV_KAFKA_AIS_KEYSTORE}
96100
ssl.keystore.password: ${MONITORENV_KAFKA_AIS_KEYSTORE_PASSWORD}
97101
ssl.key.password: ${MONITORENV_KAFKA_AIS_KEY_PASSWORD}
98102
ssl.keystore.type: PKCS12
99-
103+
100104
ssl.truststore.location: ${MONITORENV_KAFKA_AIS_TRUSTSTORE}
101105
ssl.truststore.password: ${MONITORENV_KAFKA_AIS_TRUSTSTORE_PASSWORD}
102106
ssl.truststore.type: JKS
@@ -118,4 +122,4 @@ monitorfish:
118122

119123
rapportnav:
120124
url: ${RAPPORTNAV_URL}
121-
timeout: 3000
125+
timeout: 3000

backend/src/test/kotlin/fr/gouv/cacem/monitorenv/infrastructure/kafka/AISListenerITests.kt

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package fr.gouv.cacem.monitorenv.infrastructure.kafka
22

3+
import fr.gouv.cacem.monitorenv.config.KafkaAISProperties
34
import fr.gouv.cacem.monitorenv.infrastructure.database.model.AISPositionPK
45
import fr.gouv.cacem.monitorenv.infrastructure.database.repositories.AbstractKafkaTests
56
import fr.gouv.cacem.monitorenv.infrastructure.database.repositories.interfaces.IDBAISPositionRepository
6-
import fr.gouv.cacem.monitorenv.infrastructure.kafka.AISListener.Companion.TOPIC
77
import fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters.AISPayload
88
import jakarta.transaction.Transactional
99
import org.assertj.core.api.Assertions.assertThat
@@ -24,6 +24,9 @@ class AISListenerITests : AbstractKafkaTests() {
2424
@Autowired
2525
lateinit var dbAISPositionRepository: IDBAISPositionRepository
2626

27+
@Autowired
28+
lateinit var kafkaAISProperties: KafkaAISProperties
29+
2730
@Transactional
2831
@Test
2932
fun `listenAIS should save AISPosition that comes from AIS topic`() {
@@ -42,12 +45,12 @@ class AISListenerITests : AbstractKafkaTests() {
4245
ts = ts,
4346
)
4447

45-
kafkaTemplate.send(TOPIC, aisPosition).get(10, TimeUnit.SECONDS)
48+
kafkaTemplate.send(kafkaAISProperties.topic, aisPosition).get(10, TimeUnit.SECONDS)
4649

4750
Awaitility
4851
.await()
4952
.pollInterval(1, TimeUnit.SECONDS)
50-
.atMost(5, TimeUnit.SECONDS)
53+
.atMost(kafkaAISProperties.timeout, TimeUnit.SECONDS)
5154
.untilAsserted {
5255
val saved = dbAISPositionRepository.findByIdOrNull(AISPositionPK(mmsi = mmsi, ts = ts))
5356
assertThat(saved).isNotNull()
Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,27 @@
11
monitorenv:
2-
oidc:
3-
enabled: false
4-
brief:
5-
templatePath: /template_export_brief.docx
6-
tmpDocxPath: tmp_brief.docx
7-
tmpOdtPath: tmp_brief.odt
2+
oidc:
3+
enabled: false
4+
brief:
5+
templatePath: /template_export_brief.docx
6+
tmpDocxPath: tmp_brief.docx
7+
tmpOdtPath: tmp_brief.odt
8+
kafka:
9+
ais:
10+
batch-size: 1
811

912
logging:
10-
level:
11-
org.springframework.security.web.authentication.logout: DEBUG
12-
org.hybernate.SQL: DEBUG
13-
org.hybernate.type.descriptor.sql.BasicBinder: TRACE
13+
level:
14+
org.springframework.security.web.authentication.logout: DEBUG
15+
org.hybernate.SQL: DEBUG
16+
org.hybernate.type.descriptor.sql.BasicBinder: TRACE
1417

1518
spring:
16-
jpa:
17-
show-sql: true
18-
flyway:
19-
locations: classpath:/db/migration,classpath:/db/testdata
20-
kafka:
21-
bootstrap-servers: localhost:9092
22-
consumer:
23-
group-id: test
24-
auto-offset-reset: latest
19+
jpa:
20+
show-sql: true
21+
flyway:
22+
locations: classpath:/db/migration,classpath:/db/testdata
23+
kafka:
24+
bootstrap-servers: localhost:9092
25+
consumer:
26+
group-id: test
27+
auto-offset-reset: latest

0 commit comments

Comments
 (0)