Open
Description
What happened?
This can be re-produced with simple DoFn with user counters, the updates in counters in processElement are populated properly to MointoringInfos when sending back ProcessBundleInstructionResponse, however the counters in onTimer callback are not recorded.
DoFn<KV<String, Long>, KV<Long, Instant>> fn =
new DoFn<KV<String, Long>, KV<Long, Instant>>() {
@TimerId(timerId)
private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
final Counter startTimers = Metrics.counter("timers", "started_timers_count");
final Counter firedTimers = Metrics.counter("timers", "fired_timers_count");
@ProcessElement
public void processElement(
@TimerId(timerId) Timer timer,
@Timestamp Instant timestamp,
OutputReceiver<KV<Long, Instant>> r) {
timer
.align(Duration.standardMinutes(1))
.offset(Duration.standardSeconds(1))
.setRelative();
LOG.info("started timer");
startTimers.inc();
r.output(KV.of(3L, timestamp));
}
@OnTimer(timerId)
public void onTimer(@Timestamp Instant timestamp, OutputReceiver<KV<Long, Instant>> r) {
LOG.info("fired timer");
firedTimers.inc();
r.output(KV.of(42L, timestamp));
}
};
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner