Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 84 additions & 42 deletions src/common/memory-manager/src/guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@

use std::{fmt, mem};

use common_telemetry::debug;
use snafu::ensure;
use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};

use crate::error::{
MemoryAcquireTimeoutSnafu, MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result,
};
use crate::manager::{MemoryMetrics, MemoryQuota};
use crate::manager::{MemoryMetrics, MemoryQuota, UnlimitedMemoryQuota};
use crate::policy::OnExhaustedPolicy;

/// Guard representing a slice of reserved memory.
Expand All @@ -30,31 +29,53 @@ pub struct MemoryGuard<M: MemoryMetrics> {
}

pub(crate) enum GuardState<M: MemoryMetrics> {
Unlimited,
Released,
Unlimited {
quota: UnlimitedMemoryQuota<M>,
granted_bytes: u64,
},
Limited {
permit: OwnedSemaphorePermit,
quota: MemoryQuota<M>,
permit: OwnedSemaphorePermit,
},
}

impl<M: MemoryMetrics> GuardState<M> {
fn release(self) {
match self {
GuardState::Released => None,
GuardState::Unlimited {
quota,
granted_bytes,
} => Some(quota.release_bytes(granted_bytes)),
GuardState::Limited { quota, permit } => Some(quota.release_permit(permit)),
};
}
Comment thread
fengjiachun marked this conversation as resolved.
}

impl<M: MemoryMetrics> MemoryGuard<M> {
pub(crate) fn unlimited() -> Self {
pub(crate) fn unlimited(quota: UnlimitedMemoryQuota<M>, bytes: u64) -> Self {
quota.add_in_use(bytes);
Self {
state: GuardState::Unlimited,
state: GuardState::Unlimited {
quota,
granted_bytes: bytes,
},
}
}

pub(crate) fn limited(permit: OwnedSemaphorePermit, quota: MemoryQuota<M>) -> Self {
pub(crate) fn limited(quota: MemoryQuota<M>, permit: OwnedSemaphorePermit) -> Self {
Self {
state: GuardState::Limited { permit, quota },
state: GuardState::Limited { quota, permit },
}
}

/// Returns granted quota in bytes.
pub fn granted_bytes(&self) -> u64 {
match &self.state {
GuardState::Unlimited => 0,
GuardState::Limited { permit, quota } => {
GuardState::Released => 0,
GuardState::Unlimited { granted_bytes, .. } => *granted_bytes,
GuardState::Limited { quota, permit } => {
quota.permits_to_bytes(permit.num_permits() as u32)
}
}
Expand All @@ -68,13 +89,24 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
/// - Returns error if requested bytes would exceed the manager's total limit
/// - Returns error if the semaphore is unexpectedly closed
pub async fn acquire_additional(&mut self, bytes: u64) -> Result<()> {
match &mut self.state {
GuardState::Unlimited => Ok(()),
GuardState::Limited { permit, quota } => {
if bytes == 0 {
return Ok(());
}
if bytes == 0 {
return Ok(());
}

match &mut self.state {
GuardState::Released => {
debug_assert!(false, "released memory guard state should not be reused");
Ok(())
}
GuardState::Unlimited {
quota,
granted_bytes,
} => {
quota.add_in_use(bytes);
*granted_bytes = granted_bytes.saturating_add(bytes);
Ok(())
}
GuardState::Limited { quota, permit } => {
let additional_permits = quota.bytes_to_permits(bytes);
let current_permits = permit.num_permits() as u32;

Expand All @@ -95,7 +127,6 @@ impl<M: MemoryMetrics> MemoryGuard<M> {

permit.merge(additional_permit);
quota.update_in_use_metric();
debug!("Acquired additional {} bytes", bytes);
Ok(())
}
}
Expand All @@ -106,13 +137,24 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
/// On success, merges the new memory into this guard and returns true.
/// On failure, returns false and leaves this guard unchanged.
pub fn try_acquire_additional(&mut self, bytes: u64) -> bool {
match &mut self.state {
GuardState::Unlimited => true,
GuardState::Limited { permit, quota } => {
if bytes == 0 {
return true;
}
if bytes == 0 {
return true;
}

match &mut self.state {
GuardState::Released => {
debug_assert!(false, "released memory guard state should not be reused");
false
}
GuardState::Unlimited {
quota,
granted_bytes,
} => {
quota.add_in_use(bytes);
*granted_bytes = granted_bytes.saturating_add(bytes);
true
}
GuardState::Limited { quota, permit } => {
let additional_permits = quota.bytes_to_permits(bytes);

match quota
Expand All @@ -123,7 +165,6 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
Ok(additional_permit) => {
permit.merge(additional_permit);
quota.update_in_use_metric();
debug!("Acquired additional {} bytes", bytes);
true
}
Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
Expand Down Expand Up @@ -168,7 +209,8 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
MemoryLimitExceededSnafu {
requested_bytes: bytes,
limit_bytes: match &self.state {
GuardState::Unlimited => 0, // unreachable: unlimited mode always succeeds
GuardState::Released => 0,
GuardState::Unlimited { .. } => 0, // unreachable: unlimited mode always succeeds
GuardState::Limited { quota, .. } => {
quota.permits_to_bytes(quota.limit_permits)
}
Expand All @@ -184,22 +226,29 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
///
/// Returns true if the release succeeds or is a no-op; false if the request exceeds granted.
pub fn release_partial(&mut self, bytes: u64) -> bool {
if bytes == 0 {
return true;
}

match &mut self.state {
GuardState::Unlimited => true,
GuardState::Limited { permit, quota } => {
if bytes == 0 {
return true;
GuardState::Released => true,
GuardState::Unlimited {
quota,
granted_bytes,
} => {
if bytes > *granted_bytes {
return false;
}

*granted_bytes -= quota.release_bytes(bytes);
Comment thread
fengjiachun marked this conversation as resolved.
Outdated
true
}
GuardState::Limited { quota, permit } => {
let release_permits = quota.bytes_to_permits(bytes);

match permit.split(release_permits as usize) {
Some(released_permit) => {
let released_bytes =
quota.permits_to_bytes(released_permit.num_permits() as u32);
drop(released_permit);
quota.update_in_use_metric();
debug!("Released {} bytes from memory guard", released_bytes);
quota.release_permit(released_permit);
true
}
None => false,
Expand All @@ -211,14 +260,7 @@ impl<M: MemoryMetrics> MemoryGuard<M> {

impl<M: MemoryMetrics> Drop for MemoryGuard<M> {
fn drop(&mut self) {
if let GuardState::Limited { permit, quota } =
mem::replace(&mut self.state, GuardState::Unlimited)
{
let bytes = quota.permits_to_bytes(permit.num_permits() as u32);
drop(permit);
quota.update_in_use_metric();
debug!("Released memory: {} bytes", bytes);
}
mem::replace(&mut self.state, GuardState::Released).release();
}
}

Expand Down
Loading
Loading