Skip to content

Log actor exit phase #5774

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions quickwit/quickwit-actors/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,18 @@ impl<A: Actor> Envelope<A> {
}
}

/// Execute the captured handle function.
/// Executes the captured handle function.
///
/// When exiting, also returns the message type name.
pub async fn handle_message(
&mut self,
actor: &mut A,
ctx: &ActorContext<A>,
) -> Result<(), ActorExitStatus> {
self.handler_envelope.handle_message(actor, ctx).await?;
) -> Result<(), (ActorExitStatus, &'static str)> {
let handling_res = self.handler_envelope.handle_message(actor, ctx).await;
if let Err(exit_status) = handling_res {
return Err((exit_status, self.handler_envelope.message_type_name()));
}
Ok(())
}
}
Expand All @@ -70,6 +75,8 @@ impl<A: Actor> fmt::Debug for Envelope<A> {

#[async_trait]
trait EnvelopeT<A: Actor>: Send {
fn message_type_name(&self) -> &'static str;

fn debug_msg(&self) -> String;

/// Returns the message as a boxed any.
Expand All @@ -91,6 +98,10 @@ where
A: DeferableReplyHandler<M>,
M: fmt::Debug + Send + 'static,
{
fn message_type_name(&self) -> &'static str {
std::any::type_name::<M>()
}

fn debug_msg(&self) -> String {
#[allow(clippy::needless_option_take)]
if let Some((_response_tx, msg)) = self.as_ref().take() {
Expand Down
99 changes: 62 additions & 37 deletions quickwit/quickwit-actors/src/spawn_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;
use std::time::Duration;

use anyhow::Context;
Expand Down Expand Up @@ -216,6 +217,26 @@ impl<A: Actor + Default> SpawnBuilder<A> {
}
}

enum ActorExitPhase {
Initializing,
Handling { message: &'static str },
Running,
OnDrainedMessaged,
Completed,
}

impl fmt::Debug for ActorExitPhase {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ActorExitPhase::Initializing => write!(f, "initializing"),
ActorExitPhase::Handling { message } => write!(f, "handling({message})"),
ActorExitPhase::Running => write!(f, "running"),
ActorExitPhase::OnDrainedMessaged => write!(f, "on_drained_messages"),
ActorExitPhase::Completed => write!(f, "completed"),
}
}
}

/// Receives an envelope from either the high priority queue or the low priority queue.
///
/// In the paused state, the actor will only attempt to receive high priority messages.
Expand Down Expand Up @@ -250,41 +271,46 @@ impl<A: Actor> ActorExecutionEnv<A> {
self.actor.get_mut().initialize(&self.ctx).await
}

async fn process_messages(&mut self) -> ActorExitStatus {
async fn process_messages(&mut self) -> (ActorExitStatus, ActorExitPhase) {
loop {
if let Err(exit_status) = self.process_all_available_messages().await {
return exit_status;
if let Err((exit_status, exit_phase)) = self.process_all_available_messages().await {
return (exit_status, exit_phase);
}
}
}

async fn process_one_message(
&mut self,
mut envelope: Envelope<A>,
) -> Result<(), ActorExitStatus> {
) -> Result<(), (ActorExitStatus, ActorExitPhase)> {
self.yield_and_check_if_killed().await?;
envelope
.handle_message(self.actor.get_mut(), &self.ctx)
.await?;
.await
.map_err(|(exit_status, message)| {
(exit_status, ActorExitPhase::Handling { message })
})?;
Ok(())
}

async fn yield_and_check_if_killed(&mut self) -> Result<(), ActorExitStatus> {
async fn yield_and_check_if_killed(&mut self) -> Result<(), (ActorExitStatus, ActorExitPhase)> {
if self.ctx.kill_switch().is_dead() {
return Err(ActorExitStatus::Killed);
return Err((ActorExitStatus::Killed, ActorExitPhase::Running));
}
if self.actor.get_mut().yield_after_each_message() {
self.ctx.yield_now().await;
if self.ctx.kill_switch().is_dead() {
return Err(ActorExitStatus::Killed);
return Err((ActorExitStatus::Killed, ActorExitPhase::Running));
}
} else {
self.ctx.record_progress();
}
Ok(())
}

async fn process_all_available_messages(&mut self) -> Result<(), ActorExitStatus> {
async fn process_all_available_messages(
&mut self,
) -> Result<(), (ActorExitStatus, ActorExitPhase)> {
self.yield_and_check_if_killed().await?;
let envelope = recv_envelope(&mut self.inbox, &self.ctx).await;
self.process_one_message(envelope).await?;
Expand All @@ -304,7 +330,11 @@ impl<A: Actor> ActorExecutionEnv<A> {
break;
}
}
self.actor.get_mut().on_drained_messages(&self.ctx).await?;
self.actor
.get_mut()
.on_drained_messages(&self.ctx)
.await
.map_err(|exit_status| (exit_status, ActorExitPhase::OnDrainedMessaged))?;
}
if self.ctx.mailbox().is_last_mailbox() {
// We double check here that the mailbox does not contain any messages,
Expand All @@ -314,8 +344,7 @@ impl<A: Actor> ActorExecutionEnv<A> {
if self.inbox.is_empty() {
// No one will be able to send us more messages.
// We can exit the actor.
info!(actor = self.ctx.actor_instance_id(), "no more messages");
return Err(ActorExitStatus::Success);
return Err((ActorExitStatus::Success, ActorExitPhase::Completed));
}
}

Expand All @@ -340,23 +369,6 @@ impl<A: Actor> ActorExecutionEnv<A> {
}
exit_status
}

fn process_exit_status(&self, exit_status: &ActorExitStatus) {
match &exit_status {
ActorExitStatus::Success
| ActorExitStatus::Quit
| ActorExitStatus::DownstreamClosed
| ActorExitStatus::Killed => {}
ActorExitStatus::Failure(err) => {
error!(cause=?err, exit_status=?exit_status, "actor-failure");
}
ActorExitStatus::Panicked => {
error!(exit_status=?exit_status, "actor-failure");
}
}
info!(actor_id = %self.ctx.actor_instance_id(), exit_status = %exit_status, "actor-exit");
self.ctx.exit(exit_status);
}
}

impl<A: Actor> Drop for ActorExecutionEnv<A> {
Expand All @@ -382,19 +394,32 @@ async fn actor_loop<A: Actor>(
let initialize_exit_status_res: Result<(), ActorExitStatus> = actor_env.initialize().await;
drop(no_advance_time_guard);

let after_process_exit_status = if let Err(initialize_exit_status) = initialize_exit_status_res
{
// We do not process messages if initialize yield an error.
// We still call finalize however!
initialize_exit_status
} else {
actor_env.process_messages().await
let (after_process_exit_status, exit_phase) =
if let Err(initialize_exit_status) = initialize_exit_status_res {
// We do not process messages if initialize yield an error.
// We still call finalize however!
(initialize_exit_status, ActorExitPhase::Initializing)
} else {
actor_env.process_messages().await
};

let actor_id = actor_env.ctx.actor_instance_id();
match after_process_exit_status {
ActorExitStatus::Success
| ActorExitStatus::Quit
| ActorExitStatus::DownstreamClosed
| ActorExitStatus::Killed => {
info!(actor_id, phase = ?exit_phase, exit_status = ?after_process_exit_status, "actor-exit");
}
ActorExitStatus::Failure(_) | ActorExitStatus::Panicked => {
error!(actor_id, phase = ?exit_phase, exit_status = ?after_process_exit_status, "actor-exit");
}
};

// TODO the no advance time guard for finalize has a race condition. Ideally we would
// like to have the guard before we drop the last envelope.
let final_exit_status = actor_env.finalize(after_process_exit_status).await;
// The last observation is collected on `ActorExecutionEnv::Drop`.
actor_env.process_exit_status(&final_exit_status);
actor_env.ctx.exit(&final_exit_status);
final_exit_status
}