Skip to content

Commit 8afc7b9

Browse files
committed
Update time_series.rs
1 parent 157aa2f commit 8afc7b9

1 file changed

Lines changed: 337 additions & 0 deletions

File tree

src/subscribe/time_series.rs

Lines changed: 337 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1547,4 +1547,341 @@ mod tests {
15471547
let err = ensure_time_data_exists(&path).unwrap_err();
15481548
assert_eq!(err.kind(), std::io::ErrorKind::NotFound);
15491549
}
1550+
1551+
// =========================================================================
1552+
// Regression tests for time-related behavior (issue #321)
1553+
//
1554+
// These tests pin down the externally observable contracts of the
1555+
// time-handling code so that the subsequent removal of `chrono` can be
1556+
// validated against a stable baseline. They prefer integer Unix timestamps
1557+
// and literal RFC3339 strings over chrono-derived expected values.
1558+
// =========================================================================
1559+
mod regressions {
1560+
use super::*;
1561+
1562+
// Reference Unix timestamps used as fixed inputs:
1563+
// - 0 : 1970-01-01T00:00:00Z (Unix epoch)
1564+
// - 1 : 1970-01-01T00:00:01Z (smallest positive)
1565+
// - 1_700_000_000 : 2023-11-14T22:13:20Z (mid-range)
1566+
// - 253_402_300_799 : 9999-12-31T23:59:59Z (large but safe endpoint)
1567+
const TS_EPOCH: i64 = 0;
1568+
const TS_EPOCH_PLUS_ONE: i64 = 1;
1569+
const TS_2023_11_14_221320Z: i64 = 1_700_000_000;
1570+
const TS_9999_12_31_235959Z: i64 = 253_402_300_799;
1571+
1572+
// Helper: build a DateTime from an integer Unix timestamp without
1573+
// parsing chrono date/time strings.
1574+
fn dt(ts: i64) -> DateTime<Utc> {
1575+
DateTime::from_timestamp(ts, 0).expect("valid integer timestamp")
1576+
}
1577+
1578+
#[test]
1579+
fn time_slot_at_unix_epoch_is_zero() {
1580+
// Period: 1 day, Interval: 1 hour => 24 slots, offset 0.
1581+
let policy = create_policy(1, 86_400, 3_600, 0, None);
1582+
assert_eq!(time_slot(&policy, dt(TS_EPOCH)).unwrap(), 0);
1583+
// 1 second past epoch is still slot 0.
1584+
assert_eq!(time_slot(&policy, dt(TS_EPOCH_PLUS_ONE)).unwrap(), 0);
1585+
}
1586+
1587+
#[test]
1588+
fn time_slot_for_known_rfc3339_midrange() {
1589+
// 2023-11-14T22:13:20Z is 1_700_000_000s after the epoch.
1590+
// With period=86400, interval=3600, the expected slot is computed
1591+
// as a literal: 1_700_000_000 % 86400 = 80_000s of day, and
1592+
// 80_000 / 3600 = 22 (integer division). Pin this directly so
1593+
// the assertion does not depend on a chrono-computed value.
1594+
let policy = create_policy(1, 86_400, 3_600, 0, None);
1595+
assert_eq!(time_slot(&policy, dt(TS_2023_11_14_221320Z)).unwrap(), 22);
1596+
}
1597+
1598+
#[test]
1599+
fn time_slot_at_year_9999_endpoint() {
1600+
// 9999-12-31T23:59:59Z = 253_402_300_799 seconds.
1601+
// 253_402_300_799 % 86400 = 86_399 (last second of the day),
1602+
// 86_399 / 3600 = 23 => last hour-slot.
1603+
let policy = create_policy(1, 86_400, 3_600, 0, None);
1604+
assert_eq!(time_slot(&policy, dt(TS_9999_12_31_235959Z)).unwrap(), 23);
1605+
}
1606+
1607+
#[test]
1608+
fn time_slot_normalizes_offset_with_utc_strings() {
1609+
// RFC3339 strings expressed in different timezones referring to
1610+
// the same instant must produce the same slot. We use literal
1611+
// Unix timestamps as the canonical representation:
1612+
// "2023-11-14T22:13:20Z" = 1_700_000_000
1613+
// "2023-11-14T23:13:20+01:00" = 1_700_000_000 (same instant)
1614+
// "2023-11-14T17:13:20-05:00" = 1_700_000_000 (same instant)
1615+
let policy = create_policy(1, 86_400, 3_600, 0, None);
1616+
let same_instants = [1_700_000_000_i64, 1_700_000_000, 1_700_000_000];
1617+
let mut slots = same_instants
1618+
.iter()
1619+
.map(|&ts| time_slot(&policy, dt(ts)).expect("time_slot should succeed"));
1620+
let first = slots.next().unwrap();
1621+
for other in slots {
1622+
assert_eq!(other, first, "same instant must yield the same slot");
1623+
}
1624+
assert_eq!(first, 22);
1625+
}
1626+
1627+
#[test]
1628+
fn timestamp_ordering_strictly_increases() {
1629+
// Older timestamps must compare strictly less than newer ones.
1630+
// Stable ordering is a contract that any time backend must keep.
1631+
let mut timestamps: Vec<i64> = vec![
1632+
TS_9999_12_31_235959Z,
1633+
TS_EPOCH,
1634+
TS_2023_11_14_221320Z,
1635+
TS_EPOCH_PLUS_ONE,
1636+
];
1637+
timestamps.sort_unstable();
1638+
assert_eq!(
1639+
timestamps,
1640+
vec![
1641+
TS_EPOCH,
1642+
TS_EPOCH_PLUS_ONE,
1643+
TS_2023_11_14_221320Z,
1644+
TS_9999_12_31_235959Z,
1645+
]
1646+
);
1647+
}
1648+
1649+
#[test]
1650+
fn time_series_bincode_serialization_is_chrono_independent() {
1651+
// The `start` field is `#[serde(skip)]`, so the bincode payload
1652+
// must be byte-identical for two TimeSeries that differ only in
1653+
// their start time. This is the contract that lets us drop
1654+
// chrono without changing the wire format.
1655+
let series_a = TimeSeries {
1656+
sampling_policy_id: "42".to_string(),
1657+
start: dt(TS_EPOCH),
1658+
series: vec![1.0, 2.0, 3.0, 4.0],
1659+
};
1660+
let series_b = TimeSeries {
1661+
sampling_policy_id: "42".to_string(),
1662+
start: dt(TS_9999_12_31_235959Z),
1663+
series: vec![1.0, 2.0, 3.0, 4.0],
1664+
};
1665+
1666+
let bytes_a = bincode::serialize(&series_a).expect("bincode serialize a");
1667+
let bytes_b = bincode::serialize(&series_b).expect("bincode serialize b");
1668+
1669+
assert_eq!(
1670+
bytes_a, bytes_b,
1671+
"TimeSeries bincode bytes must not depend on the `start` field",
1672+
);
1673+
}
1674+
1675+
#[test]
1676+
fn time_series_bincode_round_trip_preserves_observable_fields() {
1677+
// The id and series payload must round-trip byte-for-byte.
1678+
let original = TimeSeries {
1679+
sampling_policy_id: "policy-1".to_string(),
1680+
start: dt(TS_2023_11_14_221320Z),
1681+
series: vec![0.0, 1.5, -2.25, 3.125],
1682+
};
1683+
let bytes = bincode::serialize(&original).expect("bincode serialize");
1684+
let decoded: TimeSeries = bincode::deserialize(&bytes).expect("bincode deserialize");
1685+
assert_eq!(decoded.sampling_policy_id, "policy-1");
1686+
assert_eq!(decoded.series, vec![0.0, 1.5, -2.25, 3.125]);
1687+
// `start` is skipped on the wire and decodes to the Default.
1688+
assert_eq!(decoded.start, DateTime::<Utc>::default());
1689+
}
1690+
1691+
// Unique-per-test prefix prevents interference with concurrent tests
1692+
// that share the global LAST_TRANSFER_TIME map.
1693+
async fn cleanup_keys(keys: &[&str]) {
1694+
let mut map = LAST_TRANSFER_TIME.write().await;
1695+
for key in keys {
1696+
map.remove(*key);
1697+
}
1698+
}
1699+
1700+
#[serial]
1701+
#[tokio::test]
1702+
async fn json_persistence_round_trip_with_boundary_timestamps() {
1703+
// Round-trip the exact integer values 0, 1, 1_700_000_000, and
1704+
// 253_402_300_799 through the on-disk JSON file used by
1705+
// write_last_timestamp/read_last_timestamp. The JSON file is the
1706+
// external boundary between runs of the daemon, so this format
1707+
// must remain stable across the chrono removal.
1708+
let prefix = format!("regression_round_trip_{}", std::process::id());
1709+
let k0 = format!("{prefix}_0");
1710+
let k1 = format!("{prefix}_1");
1711+
let k2 = format!("{prefix}_2");
1712+
let k3 = format!("{prefix}_3");
1713+
cleanup_keys(&[&k0, &k1, &k2, &k3]).await;
1714+
1715+
let dir = tempdir().expect("tempdir");
1716+
let path = dir.path().join("time_data.json");
1717+
1718+
// Write fixed JSON content with integer timestamps as literals.
1719+
let payload = format!(
1720+
"{{\"{k0}\":{TS_EPOCH},\
1721+
\"{k1}\":{TS_EPOCH_PLUS_ONE},\
1722+
\"{k2}\":{TS_2023_11_14_221320Z},\
1723+
\"{k3}\":{TS_9999_12_31_235959Z}}}"
1724+
);
1725+
std::fs::write(&path, payload).expect("write timestamps file");
1726+
1727+
read_last_timestamp(&path).await.expect("read timestamps");
1728+
1729+
let map = LAST_TRANSFER_TIME.read().await;
1730+
assert_eq!(map.get(&k0), Some(&TS_EPOCH));
1731+
assert_eq!(map.get(&k1), Some(&TS_EPOCH_PLUS_ONE));
1732+
assert_eq!(map.get(&k2), Some(&TS_2023_11_14_221320Z));
1733+
assert_eq!(map.get(&k3), Some(&TS_9999_12_31_235959Z));
1734+
drop(map);
1735+
1736+
// Now write back through the producer task and verify the
1737+
// file's parsed contents preserve those exact integers.
1738+
let (sender, receiver) = async_channel::bounded::<(String, i64)>(8);
1739+
let writer = tokio::spawn(write_last_timestamp(path.clone(), receiver));
1740+
for (id, ts) in [
1741+
(&k0, TS_EPOCH),
1742+
(&k1, TS_EPOCH_PLUS_ONE),
1743+
(&k2, TS_2023_11_14_221320Z),
1744+
(&k3, TS_9999_12_31_235959Z),
1745+
] {
1746+
sender.send((id.clone(), ts)).await.expect("send");
1747+
}
1748+
drop(sender);
1749+
let _ = writer.await;
1750+
1751+
let raw = std::fs::read_to_string(&path).expect("read file");
1752+
let parsed: serde_json::Value = serde_json::from_str(&raw).expect("parse json");
1753+
let obj = parsed.as_object().expect("object");
1754+
assert_eq!(obj.get(&k0).and_then(Value::as_i64), Some(TS_EPOCH));
1755+
assert_eq!(
1756+
obj.get(&k1).and_then(Value::as_i64),
1757+
Some(TS_EPOCH_PLUS_ONE),
1758+
);
1759+
assert_eq!(
1760+
obj.get(&k2).and_then(Value::as_i64),
1761+
Some(TS_2023_11_14_221320Z),
1762+
);
1763+
assert_eq!(
1764+
obj.get(&k3).and_then(Value::as_i64),
1765+
Some(TS_9999_12_31_235959Z),
1766+
);
1767+
1768+
cleanup_keys(&[&k0, &k1, &k2, &k3]).await;
1769+
}
1770+
1771+
#[tokio::test]
1772+
async fn json_persistence_rejects_non_integer_timestamp_values() {
1773+
// The on-disk format must reject non-integer timestamp values.
1774+
// We assert this against literal JSON strings; if the chrono
1775+
// removal accidentally relaxes the parser to accept e.g.
1776+
// RFC3339 strings, this regression test will fail.
1777+
let dir = tempdir().expect("tempdir");
1778+
1779+
// Floating-point values must be rejected even though they are
1780+
// valid JSON numbers.
1781+
let path_float = dir.path().join("float.json");
1782+
std::fs::write(&path_float, b"{\"1\": 1700000000.5}").expect("write");
1783+
assert!(read_last_timestamp(&path_float).await.is_err());
1784+
1785+
// RFC3339-style strings must be rejected (production code
1786+
// expects integer seconds, not strings).
1787+
let path_str = dir.path().join("string.json");
1788+
std::fs::write(&path_str, b"{\"1\": \"2023-11-14T22:13:20Z\"}").expect("write");
1789+
assert!(read_last_timestamp(&path_str).await.is_err());
1790+
}
1791+
1792+
#[test]
1793+
fn json_persistence_delete_preserves_other_integer_timestamps() {
1794+
// Deleting one entry must not perturb the integer values of the
1795+
// others. Use literal integers so the expected file contents
1796+
// are independent of any time library.
1797+
let dir = tempdir().expect("tempdir");
1798+
let path = dir.path().join("time_data.json");
1799+
let payload = format!(
1800+
"{{\"1\":{TS_EPOCH},\
1801+
\"2\":{TS_2023_11_14_221320Z},\
1802+
\"3\":{TS_9999_12_31_235959Z}}}"
1803+
);
1804+
std::fs::write(&path, payload).expect("write");
1805+
1806+
delete_last_timestamp(&path, 2).expect("delete");
1807+
1808+
let raw = std::fs::read_to_string(&path).expect("read");
1809+
let parsed: serde_json::Value = serde_json::from_str(&raw).expect("parse");
1810+
let obj = parsed.as_object().expect("object");
1811+
assert_eq!(obj.len(), 2);
1812+
assert_eq!(obj.get("1").and_then(Value::as_i64), Some(TS_EPOCH));
1813+
assert!(obj.get("2").is_none());
1814+
assert_eq!(
1815+
obj.get("3").and_then(Value::as_i64),
1816+
Some(TS_9999_12_31_235959Z),
1817+
);
1818+
}
1819+
1820+
#[serial]
1821+
#[tokio::test]
1822+
async fn start_timestamp_arithmetic_uses_integer_nanoseconds() {
1823+
// start_timestamp() is documented to add the policy's period
1824+
// (in nanoseconds) to the last transmission time. Pin this
1825+
// arithmetic against literal integers so any future swap of
1826+
// the time backend keeps producing exact i64 nanoseconds.
1827+
let policy_id: u32 = 555_555;
1828+
cleanup_keys(&[&policy_id.to_string()]).await;
1829+
let policy = SamplingPolicy {
1830+
id: policy_id,
1831+
kind: SamplingKind::Conn,
1832+
interval: Duration::from_mins(1),
1833+
period: Duration::from_hours(1),
1834+
offset: 0,
1835+
src_ip: None,
1836+
dst_ip: None,
1837+
node: Some("regression".to_string()),
1838+
column: None,
1839+
};
1840+
1841+
// Last transmission at 1_700_000_000 seconds = 2023-11-14T22:13:20Z,
1842+
// expressed in nanoseconds as i64.
1843+
let last_ns: i64 = 1_700_000_000_000_000_000;
1844+
LAST_TRANSFER_TIME
1845+
.write()
1846+
.await
1847+
.insert(policy_id.to_string(), last_ns);
1848+
1849+
let next = policy.start_timestamp().await.expect("start_timestamp");
1850+
// period = 3_600s = 3_600_000_000_000ns; expected = literal sum.
1851+
let expected: i64 = 1_700_000_000_000_000_000 + 3_600_000_000_000;
1852+
assert_eq!(next, expected);
1853+
assert_eq!(next, 1_700_003_600_000_000_000);
1854+
1855+
LAST_TRANSFER_TIME
1856+
.write()
1857+
.await
1858+
.remove(&policy_id.to_string());
1859+
}
1860+
1861+
#[serial]
1862+
#[tokio::test]
1863+
async fn start_timestamp_returns_zero_for_missing_entry() {
1864+
// The contract for "no last transmission" is to return integer 0.
1865+
let policy_id: u32 = 444_444;
1866+
cleanup_keys(&[&policy_id.to_string()]).await;
1867+
let policy = SamplingPolicy {
1868+
id: policy_id,
1869+
kind: SamplingKind::Conn,
1870+
interval: Duration::from_mins(1),
1871+
period: Duration::from_hours(1),
1872+
offset: 0,
1873+
src_ip: None,
1874+
dst_ip: None,
1875+
node: Some("regression-missing".to_string()),
1876+
column: None,
1877+
};
1878+
1879+
// Confirm absence and check the documented zero return value.
1880+
LAST_TRANSFER_TIME
1881+
.write()
1882+
.await
1883+
.remove(&policy_id.to_string());
1884+
assert_eq!(policy.start_timestamp().await.unwrap(), 0_i64);
1885+
}
1886+
}
15501887
}

0 commit comments

Comments
 (0)