Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,5 @@ export const MAIN_GROUP_FOLDER = 'main';

export const CONTAINER_IMAGE = process.env.CONTAINER_IMAGE || 'nanoclaw-agent:latest';
export const CONTAINER_TIMEOUT = parseInt(process.env.CONTAINER_TIMEOUT || '300000', 10);
export const IPC_POLL_INTERVAL = 1000;

export const TRIGGER_PATTERN = new RegExp(`^@${ASSISTANT_NAME}\\b`, 'i');
125 changes: 80 additions & 45 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import {
STORE_DIR,
DATA_DIR,
TRIGGER_PATTERN,
MAIN_GROUP_FOLDER,
IPC_POLL_INTERVAL
MAIN_GROUP_FOLDER
} from './config.js';
import { promises as fsp, watch, FSWatcher } from 'fs';
import { RegisteredGroup, Session, NewMessage } from './types.js';
import { initDatabase, storeMessage, storeChatMetadata, getNewMessages, getMessagesSince, getAllTasks, getTaskById } from './db.js';
import { startSchedulerLoop } from './task-scheduler.js';
Expand Down Expand Up @@ -144,60 +144,95 @@ async function sendMessage(jid: string, text: string): Promise<void> {
}
}

async function processIpcDirectory<T>(
dir: string,
processor: (data: T) => Promise<void>,
label: string
): Promise<void> {
try {
const files = (await fsp.readdir(dir)).filter(f => f.endsWith('.json'));
for (const file of files) {
const filePath = path.join(dir, file);
try {
const content = await fsp.readFile(filePath, 'utf-8');
const data = JSON.parse(content);
await processor(data);
await fsp.unlink(filePath);
} catch (err) {
logger.error({ file, err }, `Error processing IPC ${label}`);
const errorDir = path.join(DATA_DIR, 'ipc', 'errors');
await fsp.mkdir(errorDir, { recursive: true });
await fsp.rename(filePath, path.join(errorDir, file)).catch(() => {});
}
}
} catch (err) {
logger.error({ err }, `Error reading IPC ${label} directory`);
}
}

function startIpcWatcher(): void {
const messagesDir = path.join(DATA_DIR, 'ipc', 'messages');
const tasksDir = path.join(DATA_DIR, 'ipc', 'tasks');

fs.mkdirSync(messagesDir, { recursive: true });
fs.mkdirSync(tasksDir, { recursive: true });

const processIpcFiles = async () => {
try {
const messageFiles = fs.readdirSync(messagesDir).filter(f => f.endsWith('.json'));
for (const file of messageFiles) {
const filePath = path.join(messagesDir, file);
try {
const data = JSON.parse(fs.readFileSync(filePath, 'utf-8'));
if (data.type === 'message' && data.chatJid && data.text) {
await sendMessage(data.chatJid, `${ASSISTANT_NAME}: ${data.text}`);
logger.info({ chatJid: data.chatJid }, 'IPC message sent');
}
fs.unlinkSync(filePath);
} catch (err) {
logger.error({ file, err }, 'Error processing IPC message');
// Move to error directory instead of deleting
const errorDir = path.join(DATA_DIR, 'ipc', 'errors');
fs.mkdirSync(errorDir, { recursive: true });
fs.renameSync(filePath, path.join(errorDir, file));
}
}
} catch (err) {
logger.error({ err }, 'Error reading IPC messages directory');
}

try {
const taskFiles = fs.readdirSync(tasksDir).filter(f => f.endsWith('.json'));
for (const file of taskFiles) {
const filePath = path.join(tasksDir, file);
try {
const data = JSON.parse(fs.readFileSync(filePath, 'utf-8'));
await processTaskIpc(data);
fs.unlinkSync(filePath);
} catch (err) {
logger.error({ file, err }, 'Error processing IPC task');
const errorDir = path.join(DATA_DIR, 'ipc', 'errors');
fs.mkdirSync(errorDir, { recursive: true });
fs.renameSync(filePath, path.join(errorDir, file));
const watchers: FSWatcher[] = [];
let messagesPending = false;
let tasksPending = false;

const processMessages = async () => {
if (messagesPending) return;
messagesPending = true;
await processIpcDirectory<{ type?: string; chatJid?: string; text?: string }>(
messagesDir,
async (data) => {
if (data.type === 'message' && data.chatJid && data.text) {
await sendMessage(data.chatJid, `${ASSISTANT_NAME}: ${data.text}`);
logger.info({ chatJid: data.chatJid }, 'IPC message sent');
}
}
} catch (err) {
logger.error({ err }, 'Error reading IPC tasks directory');
}
},
'message'
);
messagesPending = false;
};

setTimeout(processIpcFiles, IPC_POLL_INTERVAL);
const processTasks = async () => {
if (tasksPending) return;
tasksPending = true;
await processIpcDirectory(tasksDir, processTaskIpc, 'task');
tasksPending = false;
};

processIpcFiles();
// Process any existing files on startup
processMessages();
processTasks();

// Watch for new files
try {
const messagesWatcher = watch(messagesDir, { persistent: false }, (event, filename) => {
if (filename?.endsWith('.json')) processMessages();
});
watchers.push(messagesWatcher);
messagesWatcher.on('error', (err) => {
logger.error({ err }, 'Messages watcher error');
});
} catch (err) {
logger.error({ err }, 'Failed to watch messages directory');
}

try {
const tasksWatcher = watch(tasksDir, { persistent: false }, (event, filename) => {
if (filename?.endsWith('.json')) processTasks();
});
watchers.push(tasksWatcher);
tasksWatcher.on('error', (err) => {
logger.error({ err }, 'Tasks watcher error');
});
} catch (err) {
logger.error({ err }, 'Failed to watch tasks directory');
}

logger.info('IPC watcher started');
}

Expand Down