Skip to content

Commit f440be9

Browse files
feat: payload is not a list
1 parent f1b2d81 commit f440be9

File tree

7 files changed

+32
-48
lines changed

7 files changed

+32
-48
lines changed

backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/database/model/AISPositionModel.kt

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,8 @@ package fr.gouv.cacem.monitorenv.infrastructure.database.model
22

33
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
44
import com.fasterxml.jackson.databind.annotation.JsonSerialize
5-
import fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters.AISPositionEntity
6-
import jakarta.persistence.Entity
7-
import jakarta.persistence.GeneratedValue
8-
import jakarta.persistence.GenerationType
9-
import jakarta.persistence.Id
10-
import jakarta.persistence.Table
5+
import fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters.AISPayload
6+
import jakarta.persistence.*
117
import org.locationtech.jts.geom.Geometry
128
import org.locationtech.jts.io.WKTReader
139
import org.n52.jackson.datatype.jts.GeometryDeserializer
@@ -31,7 +27,7 @@ data class AISPositionModel(
3127
val ts: ZonedDateTime?,
3228
) {
3329
companion object {
34-
fun toAISPositionModel(aisPosition: AISPositionEntity): AISPositionModel =
30+
fun toAISPositionModel(aisPosition: AISPayload): AISPositionModel =
3531
AISPositionModel(
3632
id = aisPosition.id,
3733
mmsi = aisPosition.mmsi,

backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/database/repositories/JpaAISPositionRepository.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package fr.gouv.cacem.monitorenv.infrastructure.database.repositories
22

33
import fr.gouv.cacem.monitorenv.infrastructure.database.model.AISPositionModel
44
import fr.gouv.cacem.monitorenv.infrastructure.database.repositories.interfaces.IDBAISPositionRepository
5-
import fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters.AISPositionEntity
5+
import fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters.AISPayload
66
import jakarta.transaction.Transactional
77
import org.slf4j.Logger
88
import org.slf4j.LoggerFactory
@@ -15,7 +15,7 @@ class JpaAISPositionRepository(
1515
private val logger: Logger = LoggerFactory.getLogger(JpaAISPositionRepository::class.java)
1616

1717
@Transactional
18-
fun saveAll(aisPositions: List<AISPositionEntity>) {
19-
dbAISPositionRepository.saveAll(aisPositions.map { AISPositionModel.toAISPositionModel(it) })
18+
fun save(aisPosition: AISPayload) {
19+
dbAISPositionRepository.save(AISPositionModel.toAISPositionModel(aisPosition))
2020
}
2121
}

backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/kafka/AISListener.kt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,15 @@ class AISListener(
1515
private val logger = LoggerFactory.getLogger(AISListener::class.java)
1616

1717
companion object {
18-
const val TOPIC = "ais"
18+
const val TOPIC = "monitorenv.ais.position"
1919
}
2020

2121
@KafkaListener(topics = [TOPIC])
2222
fun listenAIS(payload: AISPayload) {
2323
try {
24-
logger.info("${payload.positions.size} AIS positions received")
25-
jpaAISPositionRepository.saveAll(payload.positions)
24+
jpaAISPositionRepository.save(payload)
2625
} catch (ex: Exception) {
27-
logger.error(ex.message)
26+
logger.error("Could not save ais position: ${ex.message}")
2827
throw ex
2928
}
3029
}

backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/kafka/AISProducer.kt

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,13 @@ package fr.gouv.cacem.monitorenv.infrastructure.kafka
22

33
import fr.gouv.cacem.monitorenv.infrastructure.kafka.AISListener.Companion.TOPIC
44
import fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters.AISPayload
5-
import fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters.AISPositionEntity
65
import org.slf4j.LoggerFactory
76
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
87
import org.springframework.kafka.core.KafkaTemplate
98
import org.springframework.scheduling.annotation.Scheduled
109
import org.springframework.stereotype.Component
1110
import java.time.ZonedDateTime
12-
import java.util.*
11+
import java.util.UUID
1312
import kotlin.random.Random
1413

1514
/**
@@ -40,19 +39,14 @@ class AISProducer(
4039
kafkaTemplate.send(
4140
TOPIC,
4241
AISPayload(
43-
positions =
44-
listOf(
45-
AISPositionEntity(
46-
id = null,
47-
mmsi = Random.nextInt(0, 999999999),
48-
coord = generateRandomPoint(),
49-
status = UUID.randomUUID().toString(),
50-
course = Random.nextDouble(),
51-
heading = Random.nextDouble(),
52-
speed = Random.nextDouble(),
53-
ts = ZonedDateTime.now(),
54-
),
55-
),
42+
id = null,
43+
mmsi = Random.nextInt(0, 999999999),
44+
coord = generateRandomPoint(),
45+
status = UUID.randomUUID().toString(),
46+
course = Random.nextDouble(),
47+
heading = Random.nextDouble(),
48+
speed = Random.nextDouble(),
49+
ts = ZonedDateTime.now(),
5650
),
5751
)
5852
} catch (ex: Exception) {
Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
11
package fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters
22

3+
import java.time.ZonedDateTime
4+
35
class AISPayload(
4-
val positions: List<AISPositionEntity>,
6+
val id: Int? = null,
7+
val mmsi: Int?,
8+
val coord: String?,
9+
val status: String?,
10+
val course: Double?,
11+
val heading: Double?,
12+
val speed: Double?,
13+
val ts: ZonedDateTime?,
514
)

backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/kafka/adapters/AISPositionEntity.kt

Lines changed: 0 additions & 14 deletions
This file was deleted.

backend/src/test/kotlin/fr/gouv/cacem/monitorenv/infrastructure/kafka/AISListenerITests.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ package fr.gouv.cacem.monitorenv.infrastructure.kafka
22

33
import fr.gouv.cacem.monitorenv.infrastructure.database.repositories.AbstractKafkaTests
44
import fr.gouv.cacem.monitorenv.infrastructure.database.repositories.interfaces.IDBAISPositionRepository
5+
import fr.gouv.cacem.monitorenv.infrastructure.kafka.AISListener.Companion.TOPIC
56
import fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters.AISPayload
6-
import fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters.AISPositionEntity
77
import jakarta.transaction.Transactional
88
import org.assertj.core.api.Assertions.assertThat
99
import org.awaitility.Awaitility
@@ -29,7 +29,7 @@ class AISListenerITests : AbstractKafkaTests() {
2929
// Given
3030
val coord = "POINT(-2.7335 47.6078)"
3131
val aisPosition =
32-
AISPositionEntity(
32+
AISPayload(
3333
id = null,
3434
mmsi = 1234567890,
3535
coord = coord,
@@ -40,12 +40,12 @@ class AISListenerITests : AbstractKafkaTests() {
4040
ts = ZonedDateTime.now(),
4141
)
4242

43-
kafkaTemplate.send("ais", AISPayload(positions = listOf(aisPosition))).get(10, TimeUnit.SECONDS)
43+
kafkaTemplate.send(TOPIC, aisPosition).get(10, TimeUnit.SECONDS)
4444

4545
Awaitility
4646
.await()
4747
.pollInterval(1, TimeUnit.SECONDS)
48-
.atMost(30, TimeUnit.SECONDS)
48+
.atMost(5, TimeUnit.SECONDS)
4949
.untilAsserted {
5050
val saved = dbAISPositionRepository.findByIdOrNull(1)
5151
assertThat(saved).isNotNull()

0 commit comments

Comments
 (0)