-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmcp_image.py
376 lines (308 loc) · 15.8 KB
/
mcp_image.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
#!/usr/bin/env python3
import os
import sys
import asyncio
import httpx
import logging
from io import BytesIO
from datetime import datetime
from PIL import Image as PILImage
from urllib.parse import urlparse
from mcp.server.fastmcp import FastMCP, Image, Context
from typing import List, Dict, Any, Union, Optional
MAX_IMAGE_SIZE = 1024 # Maximum dimension size in pixels
TEMP_DIR = "./Temp"
DATA_DIR = "./data"
# Ensure directories exist
os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(TEMP_DIR, exist_ok=True)
# Configure logging: first disable other loggers
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("asyncio").setLevel(logging.WARNING)
logging.getLogger("mcp").setLevel(logging.WARNING)
# Configure our logger
log_filename = os.path.join(DATA_DIR, datetime.now().strftime("%d-%m-%y.log"))
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# Create handlers
file_handler = logging.FileHandler(log_filename)
file_handler.setFormatter(formatter)
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setFormatter(formatter)
# Set up our logger
logger = logging.getLogger("image-mcp")
logger.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# Prevent double logging
logger.propagate = False
# Create a FastMCP server instance
mcp = FastMCP("image-service")
async def process_image_data(data: bytes, content_type: str, image_source: str, ctx: Context) -> Image | None:
"""Process image data and return an MCP Image object."""
try:
# If image is not large, try to log dimensions without processing
if len(data) <= 1048576:
try:
with PILImage.open(BytesIO(data)) as img:
width, height = img.size
logger.debug(f"Original image dimensions from {image_source}: {width}x{height}")
logger.debug(f"Image format from PIL: {img.format}, mode: {img.mode}")
except Exception as e:
logger.debug(f"Could not determine dimensions for {image_source}: {e}")
# Ensure content_type is valid and doesn't include 'image/'
if content_type.startswith('image/'):
content_type = content_type.split('/')[-1]
logger.debug(f"Creating Image object with format: {content_type}")
return Image(data=data, format=content_type)
# For large images, save to temp file and process
temp_path = os.path.join(TEMP_DIR, f"temp_image_{hash(image_source)}." + content_type.split('/')[-1])
with open(temp_path, "wb") as f:
f.write(data)
try:
with PILImage.open(temp_path) as img:
orig_width, orig_height = img.size
logger.debug(f"Original image dimensions from {image_source}: {orig_width}x{orig_height}")
logger.debug(f"Large image format from PIL: {img.format}, mode: {img.mode}")
if img.mode in ('RGBA', 'P'):
img = img.convert('RGB')
width, height = img.size
new_img = img
quality = 85
while True:
img_byte_arr = BytesIO()
new_img.save(img_byte_arr, format='JPEG', quality=quality)
if len(img_byte_arr.getvalue()) <= 1048576:
try:
with PILImage.open(BytesIO(img_byte_arr.getvalue())) as processed_img:
new_width, new_height = processed_img.size
logger.debug(f"Processed image dimensions from {image_source}: {new_width}x{new_height} (quality={quality})")
except Exception as e:
logger.debug(f"Could not determine processed dimensions for {image_source}: {e}")
logger.debug(f"Returning processed image with format: jpeg")
return Image(data=img_byte_arr.getvalue(), format='jpeg')
if quality > 30:
quality -= 10
else:
width = int(width * 0.8)
height = int(height * 0.8)
if width < 200 or height < 200:
ctx.error("Unable to compress image to acceptable size while maintaining quality")
logger.error(f"Failed processing image from {image_source}: dimensions {width}x{height} too small")
return None
new_img = img.resize((width, height), PILImage.LANCZOS)
quality = 85
except Exception as e:
ctx.error(f"Image processing error: {str(e)}")
logger.exception(f"Exception processing image from {image_source}")
return None
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
except Exception as e:
ctx.error(f"Error processing image: {str(e)}")
logger.exception(f"Unexpected error processing {image_source}")
return None
async def process_local_image(file_path: str, ctx: Context) -> Dict[str, Any]:
"""Processes a local image file and returns a dictionary with the result."""
try:
if not os.path.exists(file_path):
error_msg = f"File not found: {file_path}"
ctx.error(error_msg)
logger.error(error_msg)
return {"path": file_path, "error": error_msg}
# Determine content type based on file extension
_, ext = os.path.splitext(file_path)
ext = ext[1:].lower() if ext else "jpeg" # Default to jpeg if no extension
# Map extension to proper MIME type
mime_type_map = {
"jpg": "jpeg",
"jpeg": "jpeg",
"png": "png",
"gif": "gif",
"bmp": "bmp",
"webp": "webp",
"tiff": "tiff",
"tif": "tiff"
}
content_type = mime_type_map.get(ext, "jpeg") # Default to jpeg if unknown extension
logger.debug(f"Local image {file_path} has extension '{ext}', mapped to content type '{content_type}'")
# For large files, read and process directly without loading entire file into memory
file_size = os.path.getsize(file_path)
if file_size > 1048576:
logger.debug(f"Large local image detected: {file_path} ({file_size} bytes)")
# Process the image directly using the same logic as for URL images
return await process_large_local_image(file_path, content_type, ctx)
# For smaller files, read the entire content
with open(file_path, "rb") as f:
file_data = f.read()
logger.debug(f"Read local image from {file_path} with {len(file_data)} bytes")
processed_image = await process_image_data(file_data, content_type, file_path, ctx)
if processed_image is None:
return {"path": file_path, "error": "Failed to process image"}
return {"path": file_path, "image": processed_image}
except Exception as e:
error_msg = f"Error processing local image {file_path}: {str(e)}"
ctx.error(error_msg)
logger.exception(error_msg)
return {"path": file_path, "error": error_msg}
async def process_large_local_image(file_path: str, content_type: str, ctx: Context) -> Dict[str, Any]:
"""Process a large local image file directly without loading it entirely into memory."""
temp_path = None
try:
# Create a temporary file path for processing
temp_path = os.path.join(TEMP_DIR, f"temp_local_{os.path.basename(file_path)}")
# Open the original image with PIL directly
with PILImage.open(file_path) as img:
orig_width, orig_height = img.size
logger.debug(f"Original large local image dimensions from {file_path}: {orig_width}x{orig_height}")
logger.debug(f"Original image format: {img.format}, mode: {img.mode}")
if img.mode in ('RGBA', 'P'):
img = img.convert('RGB')
width, height = img.size
new_img = img
quality = 85
while True:
# Save the processed image to a temporary BytesIO
img_byte_arr = BytesIO()
new_img.save(img_byte_arr, format='JPEG', quality=quality)
processed_data = img_byte_arr.getvalue()
if len(processed_data) <= 1048576:
logger.debug(f"Successfully compressed large local image {file_path} to {len(processed_data)} bytes (quality={quality})")
return {"path": file_path, "image": Image(data=processed_data, format='jpeg')}
if quality > 30:
quality -= 10
logger.debug(f"Reducing quality to {quality} for {file_path}")
else:
width = int(width * 0.8)
height = int(height * 0.8)
if width < 200 or height < 200:
error_msg = f"Unable to compress large local image {file_path} to acceptable size while maintaining quality"
ctx.error(error_msg)
logger.error(error_msg)
return {"path": file_path, "error": error_msg}
logger.debug(f"Resizing large local image {file_path} to {width}x{height}")
new_img = img.resize((width, height), PILImage.LANCZOS)
quality = 85
except Exception as e:
error_msg = f"Error processing large local image {file_path}: {str(e)}"
ctx.error(error_msg)
logger.exception(error_msg)
return {"path": file_path, "error": error_msg}
finally:
# Clean up temporary file if it exists
if temp_path and os.path.exists(temp_path):
os.remove(temp_path)
async def fetch_single_image(url: str, client: httpx.AsyncClient, ctx: Context) -> Dict[str, Any]:
"""Fetches and processes a single image asynchronously."""
try:
parsed = urlparse(url)
if not all([parsed.scheme in ['http', 'https'], parsed.netloc]):
error_msg = f"Invalid URL: {url}"
ctx.error(error_msg)
return {"url": url, "error": error_msg}
response = await client.get(url)
response.raise_for_status()
content_type = response.headers.get('content-type', '')
if not content_type.startswith('image/'):
error_msg = f"Not an image (got {content_type})"
ctx.error(error_msg)
return {"url": url, "error": error_msg}
logger.debug(f"Fetched image from {url} with {len(response.content)} bytes")
logger.debug(f"Content-Type from server: {content_type}")
# Extract the format from content-type
format_type = content_type.split('/')[-1]
logger.debug(f"Extracted format type: {format_type}")
processed_image = await process_image_data(response.content, format_type, url, ctx)
if processed_image is None:
return {"url": url, "error": "Failed to process image"}
return {"url": url, "image": processed_image}
except httpx.HTTPError as e:
error_msg = f"HTTP error: {str(e)}"
ctx.error(error_msg)
return {"url": url, "error": error_msg}
except Exception as e:
error_msg = f"Unexpected error: {str(e)}"
ctx.error(error_msg)
return {"url": url, "error": error_msg}
def is_url(path_or_url: str) -> bool:
"""Determine if the given string is a URL or a local file path."""
parsed = urlparse(path_or_url)
return bool(parsed.scheme and parsed.netloc)
async def process_images_async(image_sources: List[str], ctx: Context) -> List[Dict[str, Any]]:
"""Process multiple images (URLs or local files) concurrently."""
if not image_sources:
raise ValueError("No image sources provided")
# Separate URLs from local file paths
urls = [src for src in image_sources if is_url(src)]
local_paths = [src for src in image_sources if not is_url(src)]
results = []
# Process URLs if any
if urls:
logger.debug(f"Processing {len(urls)} URLs")
async with httpx.AsyncClient() as client:
url_tasks = [fetch_single_image(url, client, ctx) for url in urls]
url_results = await asyncio.gather(*url_tasks)
results.extend(url_results)
# Process local files if any
if local_paths:
logger.debug(f"Processing {len(local_paths)} local files")
local_tasks = [process_local_image(path, ctx) for path in local_paths]
local_results = await asyncio.gather(*local_tasks)
results.extend(local_results)
# Ensure results are in the same order as input sources
ordered_results = []
for src in image_sources:
for result in results:
if (src == result.get("url", None)) or (src == result.get("path", None)):
ordered_results.append(result)
break
return ordered_results
@mcp.tool()
async def fetch_images(image_sources: List[str], ctx: Context) -> List[Image | None]:
"""
Fetch and process images from URLs or local file paths, returning them in a format suitable for LLMs.
This tool accepts a list of image sources which can be either:
1. URLs pointing to web-hosted images (http:// or https://)
2. Local file paths pointing to images stored on the local filesystem (e.g., "C:/images/photo1.jpg")
For a single image, provide a one-element list. The function will process images in parallel
when multiple sources are provided. Images that exceed the size limit (1MB) will be automatically
compressed while maintaining aspect ratio and reasonable quality.
Args:
image_sources: A list of image URLs or local file paths. For a single image, provide a one-element list.
Returns:
A list of Image objects or None values (if processing failed) in the same order as the input sources.
"""
try:
start_time = asyncio.get_event_loop().time()
# Validate input
if not image_sources:
ctx.error("No image sources provided")
logger.error("fetch_images called with empty source list")
return []
# Log the types of sources we're processing
url_count = sum(1 for src in image_sources if is_url(src))
local_count = len(image_sources) - url_count
logger.debug(f"Processing {len(image_sources)} image sources: {url_count} URLs and {local_count} local files")
# Process all images
results = await process_images_async(image_sources, ctx)
# Extract just the Image objects or None values
image_results = []
for result in results:
if "image" in result:
image_results.append(result["image"])
else:
image_results.append(None)
elapsed = asyncio.get_event_loop().time() - start_time
success_count = sum(1 for r in image_results if r is not None)
logger.debug(
f"Processed {len(image_sources)} images in {elapsed:.2f} seconds. "
f"Success: {success_count}, Failed: {len(image_sources) - success_count}"
)
return image_results
except Exception as e:
logger.exception("Error in fetch_images")
ctx.error(f"Failed to process images: {str(e)}")
return [None] * len(image_sources)
if __name__ == "__main__":
mcp.run(transport='stdio')