Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions apps/rowboat/app/actions/uploaded-images.actions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
"use server";
import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
import crypto from 'crypto';
import { authCheck } from '@/app/actions/auth.actions';
import { USE_AUTH } from '@/app/lib/feature_flags';
import { GoogleGenerativeAI } from '@google/generative-ai';
import { UsageTracker } from '@/app/lib/billing';
import { logUsage } from '@/app/actions/billing.actions';

export async function getUploadUrlForImage(mimeType: string): Promise<{ id: string; key: string; uploadUrl: string; url: string; mimeType: string }> {
// Enforce auth in server action context (supports guest mode when auth disabled)
if (USE_AUTH) {
await authCheck();
}

if (!mimeType || typeof mimeType !== 'string') {
throw new Error('mimeType is required');
}

const bucket = process.env.RAG_UPLOADS_S3_BUCKET || '';
if (!bucket) {
throw new Error('S3 bucket not configured');
}

const ext = mimeType === 'image/jpeg' ? '.jpg'
: mimeType === 'image/webp' ? '.webp'
: mimeType === 'image/png' ? '.png'
: '.bin';

const id = crypto.randomUUID();
const idWithExt = `${id}${ext}`;
const last2 = id.slice(-2).padStart(2, '0');
const dirA = last2.charAt(0);
const dirB = last2.charAt(1);
const key = `uploaded_images/${dirA}/${dirB}/${idWithExt}`;

const region = process.env.RAG_UPLOADS_S3_REGION || 'us-east-1';
const s3 = new S3Client({
region,
credentials: process.env.AWS_ACCESS_KEY_ID && process.env.AWS_SECRET_ACCESS_KEY ? {
accessKeyId: process.env.AWS_ACCESS_KEY_ID as string,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY as string,
} : undefined,
});

const command = new PutObjectCommand({ Bucket: bucket, Key: key, ContentType: mimeType });
const uploadUrl = await getSignedUrl(s3, command, { expiresIn: 600 });

return { id: idWithExt, key, uploadUrl, url: `/api/uploaded-images/${idWithExt}`, mimeType };
}

export async function describeUploadedImage(id: string): Promise<{ id: string; description: string | null }> {
if (USE_AUTH) {
await authCheck();
}

if (!id || typeof id !== 'string') {
throw new Error('id is required');
}

const bucket = process.env.RAG_UPLOADS_S3_BUCKET || '';
if (!bucket) {
throw new Error('S3 bucket not configured');
}

const region = process.env.RAG_UPLOADS_S3_REGION || 'us-east-1';
const s3 = new S3Client({
region,
credentials: process.env.AWS_ACCESS_KEY_ID && process.env.AWS_SECRET_ACCESS_KEY ? {
accessKeyId: process.env.AWS_ACCESS_KEY_ID as string,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY as string,
} : undefined,
});

const lastDot = id.lastIndexOf('.');
const idWithoutExt = lastDot > 0 ? id.slice(0, lastDot) : id;
const last2 = idWithoutExt.slice(-2).padStart(2, '0');
const dirA = last2.charAt(0);
const dirB = last2.charAt(1);
const key = `uploaded_images/${dirA}/${dirB}/${id}`;

// Fetch object bytes from S3
const { GetObjectCommand } = await import('@aws-sdk/client-s3');
const resp = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: key }));
const contentType = resp.ContentType || 'application/octet-stream';
const body = resp.Body as any;
const chunks: Uint8Array[] = [];
await new Promise<void>((resolve, reject) => {
body.on('data', (c: Uint8Array) => chunks.push(c));
body.on('end', () => resolve());
body.on('error', reject);
});
const buf = Buffer.concat(chunks);

let descriptionMarkdown: string | null = null;
const usageTracker = new UsageTracker();
try {
const apiKey = process.env.GOOGLE_API_KEY || process.env.GEMINI_API_KEY || '';
if (apiKey) {
const genAI = new GoogleGenerativeAI(apiKey);
const model = genAI.getGenerativeModel({ model: 'gemini-2.5-flash' });
const prompt = 'Describe this image in concise, high-quality Markdown. Focus on key objects, text, layout, style, colors, and any notable details. Do not include extra commentary or instructions.';
const result = await model.generateContent([
{ inlineData: { data: buf.toString('base64'), mimeType: contentType } },
prompt,
]);
const response: any = result.response as any;
descriptionMarkdown = response?.text?.() || null;

// Track usage
try {
const inputTokens = response?.usageMetadata?.promptTokenCount || 0;
const outputTokens = response?.usageMetadata?.candidatesTokenCount || 0;
usageTracker.track({
type: 'LLM_USAGE',
modelName: 'gemini-2.5-flash',
inputTokens,
outputTokens,
context: 'uploaded_images.describe',
});
} catch {
// ignore
}
}
} catch (e) {
console.warn('Gemini description failed', e);
}

// Log usage to billing
try {
const items = usageTracker.flush();
if (items.length > 0) {
await logUsage({ items });
}
} catch {
// ignore billing logging errors
}

return { id, description: descriptionMarkdown };
}
99 changes: 99 additions & 0 deletions apps/rowboat/app/api/tmp-images/upload/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { NextRequest, NextResponse } from 'next/server';
import { requireAuth } from '@/app/lib/auth';
import { tempBinaryCache } from '@/src/application/services/temp-binary-cache';
import { GoogleGenerativeAI } from '@google/generative-ai';
import { UsageTracker, getCustomerForUserId, logUsage as libLogUsage } from '@/app/lib/billing';
import { USE_AUTH, USE_BILLING } from '@/app/lib/feature_flags';

// POST /api/tmp-images/upload
// Accepts an image file (multipart/form-data, field name: "file")
// Stores it in the in-memory temp cache and returns a temporary URL.
export async function POST(request: NextRequest) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming we get a presigned upload url from s3, why do we need this route to exist at all? Wouldn't the frontend directly upload to the presigned url?

try {
// Require authentication if enabled
let currentUser: any | null = null;
if (USE_AUTH) {
try {
currentUser = await requireAuth();
} catch {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 });
}
}

const contentType = request.headers.get('content-type') || '';
if (!contentType.includes('multipart/form-data')) {
return NextResponse.json({ error: 'Expected multipart/form-data' }, { status: 400 });
}

const form = await request.formData();
const file = form.get('file') as File | null;
if (!file) {
return NextResponse.json({ error: 'Missing file' }, { status: 400 });
}

const arrayBuf = await file.arrayBuffer();
const buf = Buffer.from(arrayBuf);
const mime = file.type || 'application/octet-stream';

// Optionally describe image with Gemini
let descriptionMarkdown: string | null = null;
const usageTracker = new UsageTracker();
try {
const apiKey = process.env.GOOGLE_API_KEY || process.env.GEMINI_API_KEY || '';
if (apiKey) {
const genAI = new GoogleGenerativeAI(apiKey);
const model = genAI.getGenerativeModel({ model: 'gemini-2.5-flash' });
const prompt = 'Describe this image in concise, high-quality Markdown. Focus on key objects, text, layout, style, colors, and any notable details. Do not include extra commentary or instructions.';
const result = await model.generateContent([
{ inlineData: { data: buf.toString('base64'), mimeType: mime } },
prompt,
]);
const response: any = result.response as any;
descriptionMarkdown = response?.text?.() || null;

// Track usage similar to rag-worker
try {
const inputTokens = response?.usageMetadata?.promptTokenCount || 0;
const outputTokens = response?.usageMetadata?.candidatesTokenCount || 0;
usageTracker.track({
type: 'LLM_USAGE',
modelName: 'gemini-2.5-flash',
inputTokens,
outputTokens,
context: 'tmp_images.upload_with_description',
});
} catch {
// ignore usage tracking errors
}
}
} catch (e) {
console.warn('Gemini description failed', e);
}

// Store in temp cache and return temp URL
const ttlSec = 10 * 60; // 10 minutes
const id = tempBinaryCache.put(buf, mime, ttlSec * 1000);
const url = `/api/tmp-images/${id}`;

// Log usage to billing similar to rag-worker
try {
if (USE_BILLING && currentUser) {
const customer = await getCustomerForUserId(currentUser.id);
if (customer) {
const items = usageTracker.flush();
if (items.length > 0) {
await libLogUsage(customer.id, { items });
}
}
}
} catch {
// ignore billing logging errors
}

return NextResponse.json({ url, storage: 'temp', id, mimeType: mime, expiresInSec: ttlSec, description: descriptionMarkdown });
} catch (e) {
console.error('tmp image upload error', e);
return NextResponse.json({ error: 'Upload failed' }, { status: 500 });
}
}

63 changes: 63 additions & 0 deletions apps/rowboat/app/api/uploaded-images/[id]/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { NextRequest, NextResponse } from 'next/server';
import { S3Client, GetObjectCommand } from '@aws-sdk/client-s3';
import { Readable } from 'stream';
import { requireAuth } from '@/app/lib/auth';

// Serves uploaded images from S3 at path: /api/uploaded-images/{uuid}.{ext}
// Reconstructs the S3 key using the same sharding logic as image upload.
export async function GET(request: NextRequest, props: { params: Promise<{ id: string }> }) {
// Require authentication (handles guest mode internally when USE_AUTH is disabled)
await requireAuth();

const params = await props.params;
const id = params.id;
if (!id) {
return NextResponse.json({ error: 'Missing id' }, { status: 400 });
}

const bucket = process.env.RAG_UPLOADS_S3_BUCKET || '';
if (!bucket) {
return NextResponse.json({ error: 'S3 bucket not configured' }, { status: 500 });
}

const region = process.env.RAG_UPLOADS_S3_REGION || 'us-east-1';
const s3 = new S3Client({
region,
credentials: process.env.AWS_ACCESS_KEY_ID && process.env.AWS_SECRET_ACCESS_KEY ? {
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
} as any : undefined,
});

// Expect id to include extension (e.g., "<uuid>.png")
const lastDot = id.lastIndexOf('.');
const idWithoutExt = lastDot > 0 ? id.slice(0, lastDot) : id;
const filename = id;

// Reconstruct directory sharding from last two characters of UUID (without extension)
const last2 = idWithoutExt.slice(-2).padStart(2, '0');
const dirA = last2.charAt(0);
const dirB = last2.charAt(1);
const key = `uploaded_images/${dirA}/${dirB}/${id}`;
try {
const resp = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: key }));
const contentType = resp.ContentType || 'application/octet-stream';
const body = resp.Body as any;
const webStream = body?.transformToWebStream
? body.transformToWebStream()
: (Readable as any)?.toWeb
? (Readable as any).toWeb(body)
: body;
return new NextResponse(webStream, {
status: 200,
headers: {
'Content-Type': contentType,
'Cache-Control': 'public, max-age=31536000, immutable',
'Content-Disposition': `inline; filename="${filename}"`,
},
});
} catch (e) {
console.error('S3 get error', e);
return NextResponse.json({ error: 'Not found' }, { status: 404 });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,24 @@ export function Chat({
}
}, []);

function handleUserMessage(prompt: string) {
const updatedMessages: z.infer<typeof Message>[] = [...messages, {
role: 'user',
content: prompt,
}];
function handleUserMessage(prompt: string, imageDebug?: { url: string; description?: string | null }) {
// Insert an internal-only debug message with image URL/markdown (if provided),
// then the actual user message last so streaming triggers correctly.
const debugMessages: z.infer<typeof Message>[] = imageDebug ? [{
role: 'assistant',
content: `Image Description\n\nURL: ${imageDebug.url}\n\n${imageDebug.description ? imageDebug.description : ''}`.trim(),
agentName: 'Image Description',
responseType: 'internal',
} as any] : [];

const updatedMessages: z.infer<typeof Message>[] = [
...messages,
...debugMessages,
{
role: 'user',
content: prompt,
} as any,
];
setMessages(updatedMessages);
setError(null);
setIsLastInteracted(true);
Expand Down Expand Up @@ -229,9 +242,46 @@ export function Chat({
}

// set up a cached turn
// Merge-at-send: if the immediately preceding message is our internal
// Image Description debug message, append its details (URL/markdown)
// to the outgoing user message content, without changing the UI.
const last = messages[messages.length - 1];
let mergedContent = (typeof last?.content === 'string' ? last.content : '') || '';
if (messages.length >= 2) {
const prev = messages[messages.length - 2] as any;
const isImageDebug = prev && prev.role === 'assistant' && prev.responseType === 'internal' && prev.agentName === 'Image Description' && typeof prev.content === 'string';
if (isImageDebug) {
// Expect prev.content to have: "Image Description\n\nURL: <url>\n\n<markdown>"
// Extract URL and markdown blocks for a clean append
const content = prev.content as string;
let url: string | undefined;
let markdown: string | undefined;
const urlMatch = content.match(/URL:\s*(\S+)/i);
if (urlMatch) url = urlMatch[1];
// markdown is whatever comes after the blank line following URL
const parts = content.split(/\n\n/);
if (parts.length >= 3) {
markdown = parts.slice(2).join('\n\n').trim();
}
const appendSections: string[] = [];
if (url) appendSections.push(`The user uploaded an image. URL: ${url}`);
if (markdown) appendSections.push(`Image description (markdown):\n\n${markdown}`);
if (appendSections.length > 0) {
mergedContent = [mergedContent, appendSections.join('\n\n')]
.filter(Boolean)
.join('\n\n');
}
}
}

const messagesToSend: z.infer<typeof Message>[] = [{
role: 'user',
content: mergedContent,
} as any];

const response = await createCachedTurn({
conversationId: conversationId.current,
messages: messages.slice(-1), // only send the last message
messages: messagesToSend, // send merged content only
});
if (ignore) {
return;
Expand Down Expand Up @@ -500,4 +550,4 @@ export function Chat({
/>
</div>
);
}
}
Loading