Skip to content

[7/n] [installinator] fix reports getting delayed #8041

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions installinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ camino.workspace = true
camino-tempfile.workspace = true
cancel-safe-futures.workspace = true
clap.workspace = true
daft.workspace = true
display-error-chain.workspace = true
futures.workspace = true
hex.workspace = true
Expand Down
202 changes: 187 additions & 15 deletions installinator/src/mock_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
#![allow(clippy::arc_with_non_send_sync)]

use std::{
collections::BTreeMap,
collections::{BTreeMap, VecDeque},
fmt,
net::{IpAddr, Ipv6Addr, SocketAddr},
sync::Mutex,
time::Duration,
};

Expand All @@ -18,11 +19,12 @@ use async_trait::async_trait;
use bytes::Bytes;
use installinator_client::{ClientError, ResponseValue};
use installinator_common::EventReport;
use proptest::prelude::*;
use proptest::{collection::vec_deque, prelude::*};
use reqwest::StatusCode;
use test_strategy::Arbitrary;
use tokio::sync::mpsc;
use tufaceous_artifact::ArtifactHashId;
use update_engine::events::StepEventIsTerminal;
use uuid::Uuid;

use crate::{
Expand Down Expand Up @@ -457,7 +459,9 @@ impl ResponseAction_ {
#[derive(Debug)]
struct MockProgressBackend {
update_id: Uuid,
report_sender: mpsc::Sender<EventReport>,
// Use an unbounded sender to avoid async code in handle_valid_peer_event.
report_sender: mpsc::UnboundedSender<EventReport>,
behaviors: Mutex<ReportBehaviors>,
}

impl MockProgressBackend {
Expand All @@ -475,16 +479,96 @@ impl MockProgressBackend {
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 3)),
2000,
));

fn new(
update_id: Uuid,
report_sender: mpsc::UnboundedSender<EventReport>,
behaviors: ReportBehaviors,
) -> Self {
Self { update_id, report_sender, behaviors: Mutex::new(behaviors) }
}

fn handle_valid_peer_event(
&self,
report: EventReport,
) -> Result<(), ClientError> {
let is_terminal = matches!(
report.step_events.last().map(|e| e.kind.is_terminal()),
Some(StepEventIsTerminal::Terminal { .. })
);

let mut lock = self.behaviors.lock().unwrap();
let next_behavior = lock.next_valid_peer_behavior(is_terminal);

match next_behavior {
ValidPeerBehavior::Accept => {
if lock.terminal_accepted {
// Return Gone to indicate that the peer has accepted the
// report in the past.
Err(ClientError::ErrorResponse(ResponseValue::new(
installinator_client::types::Error {
error_code: None,
message: "terminal message received => Gone"
.to_owned(),
request_id: "mock-request-id".to_owned(),
},
StatusCode::GONE,
Default::default(),
)))
} else {
// Accept the report.
_ = self.report_sender.send(report);
if is_terminal {
lock.terminal_accepted = true;
}
Ok(())
}
}
ValidPeerBehavior::AcceptError => {
// The real implementation generates a reqwest::Error, which can't be
// created outside of the reqwest library. Generate a different error.
Err(ClientError::InvalidRequest(
"peer could not receive response".to_owned(),
))
}
ValidPeerBehavior::ResponseError => {
// Accept the report but return an error.
if !lock.terminal_accepted {
_ = self.report_sender.send(report);
if is_terminal {
lock.terminal_accepted = true;
}
}

Err(ClientError::InvalidRequest(
"peer received response, but failed to transmit \
that back to installinator"
.to_owned(),
))
}
}
}
}

#[async_trait]
impl ReportProgressImpl for MockProgressBackend {
async fn discover_peers(
&self,
) -> Result<PeerAddresses, DiscoverPeersError> {
Ok([Self::VALID_PEER, Self::INVALID_PEER, Self::UNRESPONSIVE_PEER]
let mut lock = self.behaviors.lock().unwrap();
match lock.next_discovery_behavior() {
ReportDiscoveryBehavior::Retry => Err(DiscoverPeersError::Retry(
anyhow::anyhow!("simulated retry error"),
)),
// TODO: it would be nice to simulate some peers disappearing here.
ReportDiscoveryBehavior::Success => Ok([
Self::VALID_PEER,
Self::INVALID_PEER,
Self::UNRESPONSIVE_PEER,
]
.into_iter()
.collect())
.collect()),
}
}

async fn report_progress_impl(
Expand All @@ -495,8 +579,7 @@ impl ReportProgressImpl for MockProgressBackend {
) -> Result<(), ClientError> {
assert_eq!(update_id, self.update_id, "update ID matches");
if peer == Self::VALID_PEER {
_ = self.report_sender.send(report).await;
Ok(())
self.handle_valid_peer_event(report)
} else if peer == Self::INVALID_PEER {
Err(ClientError::ErrorResponse(ResponseValue::new(
installinator_client::types::Error {
Expand All @@ -517,6 +600,90 @@ impl ReportProgressImpl for MockProgressBackend {
}
}

#[derive(Clone, Copy, Debug, Arbitrary)]
enum ReportDiscoveryBehavior {
/// Return all peers successfully.
#[weight(4)]
Success,

/// Simulate a retry error.
#[weight(1)]
Retry,
}

/// For reporting results, controls how discovery and the valid peer should
/// behave.
///
/// Used to simulate network flakiness while discovering peers and reporting
/// results.
#[derive(Clone, Debug)]
struct ReportBehaviors {
discovery: VecDeque<ReportDiscoveryBehavior>,
progress: VecDeque<ValidPeerBehavior>,
terminal: VecDeque<ValidPeerBehavior>,
terminal_accepted: bool,
}

impl ReportBehaviors {
fn next_discovery_behavior(&mut self) -> ReportDiscoveryBehavior {
// Once the queue of behaviors is exhausted, always return success.
self.discovery.pop_front().unwrap_or(ReportDiscoveryBehavior::Success)
}

fn next_valid_peer_behavior(
&mut self,
is_terminal: bool,
) -> ValidPeerBehavior {
// Once the queues of behaviors are exhausted, always accept.
if is_terminal {
self.terminal.pop_front().unwrap_or(ValidPeerBehavior::Accept)
} else {
self.progress.pop_front().unwrap_or(ValidPeerBehavior::Accept)
}
}
}

impl Arbitrary for ReportBehaviors {
type Parameters = ();
type Strategy = BoxedStrategy<Self>;

fn arbitrary_with((): Self::Parameters) -> Self::Strategy {
(
vec_deque(any::<ReportDiscoveryBehavior>(), 0..128),
vec_deque(any::<ValidPeerBehavior>(), 0..128),
vec_deque(any::<ValidPeerBehavior>(), 0..128),
)
.prop_map(|(discovery, progress, terminal)| ReportBehaviors {
discovery,
progress,
terminal,
terminal_accepted: false,
})
.boxed()
}
}

/// Model situations in which the peer that accepts the update misbehaves or
/// has flakiness.
///
/// The AcceptError and ResponseError variants are low-probability ones in
/// reality, but we set them to be higher probability here (1/3 each) to get
/// better coverage for error conditions.
#[derive(Clone, Copy, Debug, Arbitrary)]
enum ValidPeerBehavior {
/// Accept the update and return Ok(()).
Accept,

/// Fail to accept the update, simulating situations where the server fails
/// to receive the report.
AcceptError,

/// Accept the update but return an error, simulating situations where the
/// server receives the report but is unable to transmit this fact back to
/// the client.
ResponseError,
}

mod tests {
use super::*;
use crate::{
Expand All @@ -535,7 +702,7 @@ mod tests {
};
use omicron_test_utils::dev::test_setup_log;
use test_strategy::proptest;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tufaceous_artifact::KnownArtifactKind;

// The #[proptest] macro doesn't currently with with #[tokio::test] sadly.
Expand All @@ -547,6 +714,7 @@ mod tests {
timeout: Duration,
#[strategy(any::<[u8; 16]>().prop_map(Uuid::from_bytes))]
update_id: Uuid,
valid_peer_behaviors: ReportBehaviors,
) {
with_test_runtime(async move {
let logctx = test_setup_log("proptest_fetch_artifact");
Expand All @@ -555,18 +723,24 @@ mod tests {

let attempts = universe.attempts();

let (report_sender, report_receiver) = mpsc::channel(512);
let (report_sender, report_receiver) = mpsc::unbounded_channel();

let receiver_handle = tokio::spawn(async move {
ReceiverStream::new(report_receiver).collect::<Vec<_>>().await
UnboundedReceiverStream::new(report_receiver)
.collect::<Vec<_>>()
.await
});

let (progress_reporter, event_sender) = ProgressReporter::new(
&logctx.log,
update_id,
ReportProgressBackend::new(
&logctx.log,
MockProgressBackend { update_id, report_sender },
MockProgressBackend::new(
update_id,
report_sender,
valid_peer_behaviors,
),
),
);
let progress_handle = progress_reporter.start();
Expand All @@ -582,11 +756,11 @@ mod tests {
let artifact =
fetch_artifact(&cx, &log, attempts, timeout)
.await?;
let address = artifact.peer.address();
let peer = artifact.peer;
StepSuccess::new(artifact)
.with_metadata(
InstallinatorCompletionMetadata::Download {
address,
address: peer.address(),
},
)
.into()
Expand All @@ -609,8 +783,6 @@ mod tests {
.await
.expect("progress report receiver task exited successfully");

println!("finished receiving reports");

match (expected_result, fetched_artifact) {
(
Ok((expected_attempt, expected_addr)),
Expand Down
2 changes: 1 addition & 1 deletion installinator/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl FromStr for DiscoveryMechanism {
}
}

#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct PeerAddresses {
peers: BTreeSet<PeerAddress>,
}
Expand Down
Loading
Loading