Skip to content

[ML] Add resource monitoring in CBucketGatherer::addEventData #2848

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
=== Enhancements

* Track memory used in the hierarchical results normalizer. (See {ml-pull}2831[#2831].)
* Improve adherence to memory limits for the bucket gatherer. (See {ml-pull}2848[#2848].)

=== Bug Fixes

Expand Down
7 changes: 4 additions & 3 deletions include/model/CBucketGatherer.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#include <core/CCompressedDictionary.h>
#include <core/CHashing.h>
#include <core/CLogger.h>
#include <core/CMemoryUsage.h>
#include <core/CoreTypes.h>

Expand All @@ -28,7 +27,6 @@

#include <any>
#include <cstdint>
#include <functional>
#include <map>
#include <optional>
#include <string>
Expand Down Expand Up @@ -172,6 +170,9 @@ class MODEL_EXPORT CBucketGatherer {
//! redundant except to create a signature that will not be mistaken for
//! a general purpose copy constructor.
CBucketGatherer(bool isForPersistence, const CBucketGatherer& other);
static bool isRecordIncomplete(const CEventData& data);
bool hasValidPersonAndAttributeIds(std::size_t pid, std::size_t cid) const;
bool handleExplicitNull(const CEventData& data, core_t::TTime time, TSizeSizePr pidCid);

virtual ~CBucketGatherer() = default;
//@}
Expand Down Expand Up @@ -238,7 +239,7 @@ class MODEL_EXPORT CBucketGatherer {
CResourceMonitor& resourceMonitor) = 0;

//! Record the arrival of \p data at \p time.
bool addEventData(CEventData& data);
bool addEventData(const CEventData& data, const CResourceMonitor& resourceMonitor);

//! Roll time forwards to \p time.
void timeNow(core_t::TTime time);
Expand Down
124 changes: 73 additions & 51 deletions lib/model/CBucketGatherer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <maths/common/COrderings.h>

#include <model/CDataGatherer.h>
#include <model/CResourceMonitor.h>

#include <boost/tuple/tuple.hpp>

Expand Down Expand Up @@ -233,7 +234,36 @@ CBucketGatherer::CBucketGatherer(bool isForPersistence, const CBucketGatherer& o
}
}

bool CBucketGatherer::addEventData(CEventData& data) {
bool CBucketGatherer::isRecordIncomplete(const CEventData& data) {
return !data.personId() || !data.attributeId() || !data.count();
}
bool CBucketGatherer::hasValidPersonAndAttributeIds(std::size_t const pid,
std::size_t const cid) const {
// Has the person/attribute been deleted from the gatherer?
if (!m_DataGatherer.isPersonActive(pid)) {
LOG_DEBUG(<< "Not adding value for deleted person " << pid);
return false;
}
if (m_DataGatherer.isPopulation() && !m_DataGatherer.isAttributeActive(cid)) {
LOG_DEBUG(<< "Not adding value for deleted attribute " << cid);
return false;
}
return true;
}
bool CBucketGatherer::handleExplicitNull(const CEventData& data,
core_t::TTime const time,
CBucketGatherer::TSizeSizePr const pidCid) {
// If record is explicit null just note that a null record has been seen
// for the given (pid, cid) pair.
if (data.isExplicitNull()) {
TSizeSizePrUSet& bucketExplicitNulls = m_PersonAttributeExplicitNulls.get(time);
bucketExplicitNulls.insert(pidCid);
return true;
}
return false;
}
bool CBucketGatherer::addEventData(const CEventData& data,
const CResourceMonitor& resourceMonitor) {
core_t::TTime const time = data.time();

if (time < this->earliestBucketStartTime()) {
Expand All @@ -245,70 +275,62 @@ bool CBucketGatherer::addEventData(CEventData& data) {

this->timeNow(time);

if (!data.personId() || !data.attributeId() || !data.count()) {
// The record was incomplete.
if (isRecordIncomplete(data)) {
return false;
}

std::size_t const pid = *data.personId();
std::size_t const cid = *data.attributeId();
std::size_t const count = *data.count();
if ((pid != CDynamicStringIdRegistry::INVALID_ID) &&
(cid != CDynamicStringIdRegistry::INVALID_ID)) {
// Has the person/attribute been deleted from the gatherer?
if (!m_DataGatherer.isPersonActive(pid)) {
LOG_DEBUG(<< "Not adding value for deleted person " << pid);
return false;
}
if (m_DataGatherer.isPopulation() && !m_DataGatherer.isAttributeActive(cid)) {
LOG_DEBUG(<< "Not adding value for deleted attribute " << cid);
return false;
}

TSizeSizePr const pidCid = std::make_pair(pid, cid);
if ((pid == CDynamicStringIdRegistry::INVALID_ID) ||
(cid == CDynamicStringIdRegistry::INVALID_ID)) {
return true;
}

if (hasValidPersonAndAttributeIds(pid, cid) == false) {
return false;
}

// If record is explicit null just note that a null record has been seen
// for the given (pid, cid) pair.
if (data.isExplicitNull()) {
TSizeSizePrUSet& bucketExplicitNulls =
m_PersonAttributeExplicitNulls.get(time);
bucketExplicitNulls.insert(pidCid);
return true;
}
TSizeSizePr const pidCid = std::make_pair(pid, cid);

TSizeSizePrUInt64UMap& bucketCounts = m_PersonAttributeCounts.get(time);
if (count > 0) {
bucketCounts[pidCid] += count;
}
if (handleExplicitNull(data, time, pidCid)) {
return true;
}

const CEventData::TOptionalStrVec& influences = data.influences();
auto& influencerCounts = m_InfluencerCounts.get(time);
if (influences.size() != influencerCounts.size()) {
LOG_ERROR(<< "Unexpected influences: " << influences << " expected "
<< core::CContainerPrinter::print(this->beginInfluencers(),
this->endInfluencers()));
return false;
}
TSizeSizePrUInt64UMap& bucketCounts = m_PersonAttributeCounts.get(time);
if (count > 0) {
bucketCounts[pidCid] += count;
}

TOptionalStrVec canonicalInfluences(influencerCounts.size());
for (std::size_t i = 0; i < influences.size(); ++i) {
const CEventData::TOptionalStr& influence = influences[i];
if (influence) {
const std::string& inf = *influence;
canonicalInfluences[i] = inf;
if (count > 0) {
influencerCounts[i]
.emplace(boost::unordered::piecewise_construct,
boost::make_tuple(pidCid, inf),
boost::make_tuple(static_cast<std::uint64_t>(0)))
.first->second += count;
}
const CEventData::TOptionalStrVec& influences = data.influences();
auto& influencerCounts = m_InfluencerCounts.get(time);
if (influences.size() != influencerCounts.size()) {
LOG_ERROR(<< "Unexpected influences: " << influences << " expected "
<< core::CContainerPrinter::print(this->beginInfluencers(),
this->endInfluencers()));
return false;
}

TOptionalStrVec canonicalInfluences(influencerCounts.size());
auto updateInfluencer = [&](std::size_t i) {
if (const CEventData::TOptionalStr& influence = influences[i]) {
const std::string& inf = *influence;
canonicalInfluences[i] = inf;
if (count > 0 && resourceMonitor.areAllocationsAllowed()) {
influencerCounts[i]
.emplace(boost::unordered::piecewise_construct,
boost::make_tuple(pidCid, inf),
boost::make_tuple(static_cast<std::uint64_t>(0)))
.first->second += count;
}
}

this->addValue(pid, cid, time, data.values(), count, data.stringValue(),
canonicalInfluences);
};
for (std::size_t i = 0; i < influences.size(); ++i) {
updateInfluencer(i);
}

this->addValue(pid, cid, time, data.values(), count, data.stringValue(), canonicalInfluences);
return true;
}

Expand Down
2 changes: 1 addition & 1 deletion lib/model/CDataGatherer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ bool CDataGatherer::addArrival(const TStrCPtrVec& fieldValues,
return false;
}

return m_BucketGatherer->addEventData(data);
return m_BucketGatherer->addEventData(data, resourceMonitor);
}

void CDataGatherer::sampleNow(core_t::TTime sampleBucketStart) {
Expand Down