Skip to content
7 changes: 7 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
"eslint": "^8.56.0",
"eslint-plugin-prefer-arrow-functions": "^3.2.4",
"mocha": "^10.3.0",
"mockdate": "^3.0.5",
"msw": "^2.1.2",
"nodemon": "^3.0.3",
"ts-essentials": "^9.1.2",
Expand Down
27 changes: 27 additions & 0 deletions src/backend/common/infrastructure/config/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,32 @@ export interface PollingOptions {
* @examples [30]
* */
maxInterval?: number

/**
* Number of seconds after which A Player is considered Stale
*
* When Polling the source does not recieve data about a specific Player after X seconds it becomes Stale. When the Player becomes Stale:
*
* * The current listening session is ended. If the Player becomes active again a new listening session is started (Player will miss `interval` seconds of listening)
* * If the player has an existing session w/ track then MS attempts to scrobble it
*
* This option DOES NOT need to be set. It is automatically calculated as (`interval` * 3) when not defined.
*/
staleAfter?: number

/**
* Number of seconds after which A Player is considered Orphaned
*
* When Polling the source does not recieve data about a specific Player after X seconds it becomes Orphaned. When the Player becomes Orphaned:
*
* * The current Player session is ended and the Player is removed from MS
* * MS attempts to scrobble, if the Player has an existing session w/ track
*
* A Player should become Orphaned EQUAL TO OR AFTER it becomes Stale.
*
* * This option DOES NOT need to be set. It is automatically calculated as (`interval` * 5) when not defined.
* * If it is set it must be equal to or larger than `staleAfter` or (`interval * 3`)
*/
orphanedAfter?: number
}

10 changes: 6 additions & 4 deletions src/backend/sources/AbstractSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import {
sortByNewestPlayDate,
sortByOldestPlayDate,
} from "../utils.js";
import { comparePlayTemporally, temporalAccuracyIsAtLeast, todayAwareFormat } from "../utils/TimeUtils.js";
import { comparePlayTemporally, temporalAccuracyIsAtLeast, timeToHumanTimestamp, todayAwareFormat } from "../utils/TimeUtils.js";
import { getRoot } from '../ioc.js';
import { componentFileLogger } from '../common/logging.js';
import { WebhookPayload } from '../common/infrastructure/config/health/webhooks.js';
Expand Down Expand Up @@ -440,6 +440,8 @@ export default abstract class AbstractSource extends AbstractComponent implement

const activeThreshold = this.lastActivityAt.add(checkActiveFor, 's');
const inactiveFor = dayjs.duration(Math.abs(activeThreshold.diff(dayjs(), 'millisecond'))).humanize(false);
const relativeActivity = dayjs.duration(this.lastActivityAt.diff(dayjs(), 'ms'));
const humanRelativeActivity = relativeActivity.asSeconds() > -3 ? '' : ` (${timeToHumanTimestamp(relativeActivity)} ago)`;
let friendlyInterval = '';
const friendlyLastFormat = todayAwareFormat(this.lastActivityAt);
if (activeThreshold.isBefore(dayjs())) {
Expand All @@ -452,12 +454,12 @@ export default abstract class AbstractSource extends AbstractComponent implement
sleepTime = interval + backoff;
}
if(isDebugMode()) {
debugMsgs.push(`Last activity ${friendlyLastFormat} is ${inactiveFor} outside of polling period (last activity + ${checkActiveFor}s)`);
debugMsgs.push(`Last activity ${friendlyLastFormat}${humanRelativeActivity} is ${inactiveFor} outside of polling period (last activity + ${checkActiveFor}s)`);
} else {
debugMsgs.push(`Last activity was at ${friendlyLastFormat}`);
debugMsgs.push(`Last activity was at ${friendlyLastFormat}${humanRelativeActivity}`);
}
} else {
debugMsgs.push(`Last activity was at ${friendlyLastFormat}`);
debugMsgs.push(`Last activity was at ${friendlyLastFormat}${humanRelativeActivity}`);
friendlyInterval = `${formatNumber(sleepTime)}s`;
}
debugMsgs.push(`Next check in ${friendlyInterval}`);
Expand Down
2 changes: 1 addition & 1 deletion src/backend/sources/JellyfinSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import {
doubleReturnNewline,
isDebugMode,
parseBool,
parseDurationFromTimestamp,
playObjDataMatch,
} from "../utils.js";
import { parseDurationFromTimestamp } from '../utils/TimeUtils.js';
import {
comparePlayTemporally,
temporalAccuracyIsAtLeast,
Expand Down
20 changes: 14 additions & 6 deletions src/backend/sources/MemorySource.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Logger } from "@foxxmd/logging";
import dayjs from "dayjs";
import dayjs, { Dayjs } from "dayjs";
import { EventEmitter } from "events";
import objectHash from 'object-hash';
import { SimpleIntervalJob, Task, ToadScheduler } from "toad-scheduler";
Expand Down Expand Up @@ -27,7 +27,7 @@ import {
playObjDataMatch,
thresholdResultSummary,
} from "../utils.js";
import { timePassesScrobbleThreshold } from "../utils/TimeUtils.js";
import { timePassesScrobbleThreshold, timeToHumanTimestamp } from "../utils/TimeUtils.js";
import AbstractSource from "./AbstractSource.js";
import { AbstractPlayerState, createPlayerOptions, PlayerStateOptions } from "./PlayerState/AbstractPlayerState.js";
import { GenericPlayerState } from "./PlayerState/GenericPlayerState.js";
Expand Down Expand Up @@ -116,11 +116,19 @@ export default class MemorySource extends AbstractSource {
}
if(discoverable) {
discoveredCleanupPlay = cleanupPlay;
// we are discovering/scrobbling play
// and since player is now stale we should treat this "session" as ended
// -- so if user resumes a stale play later its a new session (new real time period of them listening)
// basically this is the same as if the player was orphaned and removed
//
// so we remove listen ranges so the old accumulated listen time can't be used for the "new" listening session
player.listenRanges = [];
player.currentListenRange = undefined;
}
}
}
if(deletePlayer) {
this.deletePlayer(player.platformIdStr, `Removed after being orphaned for ${dayjs.duration(player.stateIntervalOptions.orphanedInterval, 'seconds').asMinutes()} minutes`);
this.deletePlayer(player.platformIdStr, `Removed after being orphaned for ${timeToHumanTimestamp(dayjs.duration(player.stateIntervalOptions.orphanedInterval, 'seconds'))}`);
}

return discoveredCleanupPlay;
Expand All @@ -141,7 +149,7 @@ export default class MemorySource extends AbstractSource {

setNewPlayer = (idStr: string, logger: Logger, id: PlayPlatformId, opts: PlayerStateOptions = {}) => {
this.players.set(idStr, this.getNewPlayer(this.logger, id, {
...createPlayerOptions(this.config.data as Partial<PollingOptions>),
...createPlayerOptions(this.config.data as Partial<PollingOptions>, this.playerSourceOfTruth, this.logger),
...opts
}));
this.playerState.set(idStr, '');
Expand Down Expand Up @@ -176,7 +184,7 @@ export default class MemorySource extends AbstractSource {
return sessions[0];
}

processRecentPlays = (datas: (PlayObject | PlayerStateDataMaybePlay)[]) => {
processRecentPlays = (datas: (PlayObject | PlayerStateDataMaybePlay)[], reportedTS?: Dayjs) => {

const {
options: {
Expand Down Expand Up @@ -231,7 +239,7 @@ export default class MemorySource extends AbstractSource {
playerState.position = playerState.play.meta?.trackProgressPosition;
}

const [currPlay, prevPlay] = player.update(playerState);
const [currPlay, prevPlay] = player.update(playerState, reportedTS);
const candidate = prevPlay !== undefined ? prevPlay : currPlay;
const playChanged = prevPlay !== undefined;

Expand Down
52 changes: 34 additions & 18 deletions src/backend/sources/PlayerState/AbstractPlayerState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { PollingOptions } from "../../common/infrastructure/config/common.js";
import { formatNumber, genGroupIdStr, playObjDataMatch, progressBar } from "../../utils.js";
import { ListenProgress } from "./ListenProgress.js";
import { ListenRange, ListenRangePositional } from "./ListenRange.js";
import { todayAwareFormat } from "../../utils/TimeUtils.js";
import { timeToHumanTimestamp, todayAwareFormat } from "../../utils/TimeUtils.js";

export interface PlayerStateIntervals {
staleInterval?: number
Expand All @@ -30,22 +30,38 @@ export interface PlayerStateOptions extends PlayerStateIntervals {

export const DefaultPlayerStateOptions: PlayerStateOptions = {};

export const createPlayerOptions = (pollingOpts?: Partial<PollingOptions>, sot: SOURCE_SOT_TYPES = SOURCE_SOT.PLAYER): PlayerStateOptions => {
export const createPlayerOptions = (pollingOpts?: Partial<PollingOptions>, sot: SOURCE_SOT_TYPES = SOURCE_SOT.PLAYER, logger?: Logger): PlayerStateOptions => {
const {
interval = 30,
maxInterval = 60,
staleAfter,
orphanedAfter
} = pollingOpts || {};
if(sot === SOURCE_SOT.PLAYER) {
return {
staleInterval: interval * 3,
orphanedInterval: interval * 5
}
}

let sa = staleAfter,
oa = orphanedAfter;

// if this player is not the source of truth we don't care about waiting around to see if the state comes back
// in fact, we probably want to get rid of it as fast as possible since its superficial and more of an ephemeral "Now Playing" status than something we are actually tracking
const staleAfterDefault = sot === SOURCE_SOT.PLAYER ? interval * 3 : interval;
const orphanedAfterDefault = sot === SOURCE_SOT.PLAYER ? interval * 5 : maxInterval;

if(sa === undefined) {
sa = staleAfterDefault;
}
if(oa === undefined) {
oa = orphanedAfterDefault;
}
if(oa < sa) {
oa = sa;
if(logger !== undefined) {
logger.warn(`'orhanedAfter' (${oa}s) was less than 'staleAfter' (${sa}s) which is not allowed! 'orhanedAfter' has been set to equal 'staleAfter'`);
}
}

return {
staleInterval: interval,
orphanedInterval: maxInterval
staleInterval: sa,
orphanedInterval: oa
}
}

Expand Down Expand Up @@ -86,18 +102,18 @@ export abstract class AbstractPlayerState {
return this.platformId[0] === candidateId[0] && this.platformId[1] === candidateId[1];
}

isUpdateStale() {
isUpdateStale(reportedTS?: Dayjs) {
if (this.currentPlay !== undefined) {
return Math.abs(dayjs().diff(this.playLastUpdatedAt, 'seconds')) > this.stateIntervalOptions.staleInterval;
return Math.abs((reportedTS ?? dayjs()).diff(this.playLastUpdatedAt, 'seconds')) > this.stateIntervalOptions.staleInterval;
}
return false;
}

checkStale() {
const isStale = this.isUpdateStale();
checkStale(reportedTS?: Dayjs) {
const isStale = this.isUpdateStale(reportedTS);
if (isStale && ![CALCULATED_PLAYER_STATUSES.stale, CALCULATED_PLAYER_STATUSES.orphaned].includes(this.calculatedStatus)) {
this.calculatedStatus = CALCULATED_PLAYER_STATUSES.stale;
this.logger.debug(`Stale after no Play updates for ${Math.abs(dayjs().diff(this.playLastUpdatedAt, 'seconds'))} seconds`);
this.logger.debug(`Stale after no Play updates for ${timeToHumanTimestamp(Math.abs((reportedTS ?? dayjs()).diff(this.playLastUpdatedAt, 'ms')))} (staleAfter ${this.stateIntervalOptions.staleInterval}s)`);
// end current listening sessions
this.currentListenSessionEnd();
}
Expand All @@ -116,7 +132,7 @@ export abstract class AbstractPlayerState {
const isOrphaned = this.isOrphaned();
if (isOrphaned && this.calculatedStatus !== CALCULATED_PLAYER_STATUSES.orphaned) {
this.calculatedStatus = CALCULATED_PLAYER_STATUSES.orphaned;
this.logger.debug(`Orphaned after no player updates for ${Math.abs(dayjs().diff(this.stateLastUpdatedAt, 'minutes'))} minutes`);
this.logger.debug(`Orphaned after no Player updates for ${timeToHumanTimestamp(Math.abs(dayjs().diff(this.stateLastUpdatedAt, 'ms')))} ${Math.abs(dayjs().diff(this.stateLastUpdatedAt, 'minutes'))} (orhanedAfter ${this.stateIntervalOptions.orphanedInterval}s)`);
}
return isOrphaned;
}
Expand Down Expand Up @@ -154,7 +170,7 @@ export abstract class AbstractPlayerState {

protected setPlay(state: PlayerStateData, reportedTS?: Dayjs): [PlayObject, PlayObject?] {
const {play, status, sessionId} = state;
this.playLastUpdatedAt = dayjs();
this.playLastUpdatedAt = reportedTS ?? dayjs();
if (status !== undefined) {
this.reportedStatus = status;
}
Expand Down Expand Up @@ -328,7 +344,7 @@ export abstract class AbstractPlayerState {
const {play, position} = state;

this.currentPlay = play;
this.playFirstSeenAt = dayjs();
this.playFirstSeenAt = reportedTS ?? dayjs();
this.listenRanges = [];
this.currentListenRange = undefined;

Expand Down
12 changes: 12 additions & 0 deletions src/backend/sources/PlayerState/PositionalPlayerState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,21 @@ export class PositionalPlayerState extends AbstractPlayerState {
// likely the track was listened to until it ended
// but polling interval or network delays caused MS to not get data on the very end
// also...within 3 seconds of ending is close enough to call this complete IMO
//
// -- STALE is included in this because there are some Sources (jellyfin or plex with 3rd party apps?)
// that will created a new "player" if the queue is cleared or stopping the player,
// from the user's perspective its all the same but then MS sees it as different
// so the player will eventually be pruned but we want to treat the play "going stale" it as if it finished to account for this behavior
this.logger.debug(`Listen duration was within ${this.gracefulEndBuffer}s of Play duration, bumping duration to 100% ${this.calculatedStatus === CALCULATED_PLAYER_STATUSES.stale ? 'because stale player probably finished Play before going dark.' : ' because we probably just missed Source reporting 100% before changing Play.'}`)
finalPosition = duration;
//this.currentListenRange.end.position = duration;

} else if(this.calculatedStatus === CALCULATED_PLAYER_STATUSES.stale && this.currentListenRange.isOverDrifted(this.currentListenRange.end.position)) {
// if player uses realtime but source went stale and WAS NOT close to the end its likely the RT is way overdrifted
// in which case we definitely do want to use RT as final position
// so use last known position before stale instead
this.logger.debug(`Player became Stale and realtime position overdrifted before session ended! Using last known position instead of RT position so listen duration stays accurate.`);
finalPosition = this.currentListenRange.end.position;
}
}
this.currentListenRange.finalize(finalPosition);
Expand Down
18 changes: 16 additions & 2 deletions src/backend/sources/PlayerState/RealtimePlayer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { childLogger, Logger } from "@foxxmd/logging";
import dayjs, { Dayjs } from "dayjs";
import { SimpleIntervalJob, Task, ToadScheduler } from "toad-scheduler";

const RT_TICK = 500;
Expand All @@ -9,13 +10,21 @@ export abstract class RealtimePlayer {
scheduler: ToadScheduler = new ToadScheduler();

protected position: number = 0;
private clockTS: Dayjs = dayjs();

protected constructor(/* logger: Logger */) {
//this.logger = childLogger(logger, `RT`);
const job = new SimpleIntervalJob({
milliseconds: RT_TICK,
runImmediately: true
}, new Task('updatePos', () => this.position += RT_TICK), { id: 'rt' });
}, new Task('updatePos', () => {
// in production RT_TICK and the diff between now and clockTS should always be the same
// but in order to mock for testing (where we manipulate Date now()) the source of truth
// needs to come from TS rather than simple TICK increase
this.setPosition()
//this.position += Math.abs(dayjs().diff(this.clockTS, 'ms')); // RT_TICK

}), { id: 'rt' });
this.scheduler.addSimpleIntervalJob(job);
this.scheduler.stop();
this.position = 0;
Expand All @@ -25,6 +34,7 @@ export abstract class RealtimePlayer {
if (position !== undefined) {
this.position = position;
}
this.clockTS = dayjs();
this.scheduler.startById('rt');
}

Expand All @@ -45,8 +55,12 @@ export abstract class RealtimePlayer {
return !asSeconds ? this.position : this.position / 1000;
}

public setPosition(time: number) {
public setPosition(time?: number) {
if(time === undefined) {
this.position += Math.abs(dayjs().diff(this.clockTS, 'ms'));
}
this.position = time;
this.clockTS = dayjs();
}
}

Expand Down
27 changes: 25 additions & 2 deletions src/backend/tests/player/player.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { loggerTest } from "@foxxmd/logging";
import { assert } from 'chai';
import { assert, expect } from 'chai';
import clone from "clone";
import dayjs, { Dayjs } from "dayjs";
import { describe, it } from 'mocha';
Expand All @@ -26,7 +26,7 @@ const testState = (data: Omit<PlayerStateDataMaybePlay, 'platformId'>): PlayerSt

class TestPositionalPlayerState extends PositionalPlayerState {
protected newListenRange(start?: ListenProgressPositional, end?: ListenProgressPositional, options: object = {}): ListenRangePositional {
const range = super.newListenRange(start, end, {rtImmediate: false, ...options});
const range = super.newListenRange(start, end, {allowedDrift: this.allowedDrift, rtImmediate: false, rtTruth: this.rtTruth, ...options});
return range;
}
public testSessionRepeat(position: number, reportedTS?: Dayjs) {
Expand Down Expand Up @@ -151,6 +151,29 @@ describe('Player status', function () {
assert.equal(CALCULATED_PLAYER_STATUSES.paused, player.calculatedStatus);
});

it('Uses last known position for final range when cleaning up stale player', function () {
const player = new TestPositionalPlayerState(logger, [NO_DEVICE, NO_USER], {staleInterval: 20, rtTruth: true});

const positioned = clone(newPlay);
positioned.meta.trackProgressPosition = 3;

player.update(testState({play: positioned, position: 3}));

player.currentListenRange.rtPlayer.setPosition(13000);
player.update(testState({play: positioned, position: 13}), dayjs().add(10, 'seconds'));

player.currentListenRange.rtPlayer.setPosition(23000);
player.update(testState({play: positioned, position: 23}), dayjs().add(20, 'seconds'));

const staleDate = dayjs().add(41, 'seconds')
player.currentListenRange.rtPlayer.setPosition(44000);
expect(player.currentListenRange.isOverDrifted(23)).to.be.true;

expect(player.checkStale(staleDate)).to.be.true;
expect(player.listenRanges[player.listenRanges.length - 1].end.position).to.eq(23);
expect(player.getListenDuration()).to.eq(20);
});

// TODO playback position reported and conflicts with player reported status
});
});
Expand Down
Loading
Loading