from typing import Annotated, Tuple from urllib.parse import urlparse, urlunparse import markdownify import readabilipy.simple_json from mcp.shared.exceptions import McpError from mcp.server import Server from mcp.server.sse import SseServerTransport from mcp.types import ( ErrorData, GetPromptResult, Prompt, PromptArgument, PromptMessage, TextContent, Tool, INVALID_PARAMS, INTERNAL_ERROR, ) from protego import Protego from pydantic import BaseModel, Field, AnyUrl DEFAULT_USER_AGENT_AUTONOMOUS = "Cariddi/1.0 (Autonomous; +https://git.andreagordanelli.com/Schrody/CariddiCTF)" DEFAULT_USER_AGENT_MANUAL = "Cariddi/1.0 (User-Specified; +https://git.andreagordanelli.com/Schrody/CariddiCTF)" def extract_content_from_html(html: str) -> str: """Extract and convert HTML content to Markdown format. Args: html: Raw HTML content to process Returns: Simplified markdown version of the content """ ret = readabilipy.simple_json.simple_json_from_html_string( html, use_readability=True ) if not ret["content"]: return "Page failed to be simplified from HTML" content = markdownify.markdownify( ret["content"], heading_style=markdownify.ATX, ) return content def get_robots_txt_url(url: str) -> str: """Get the robots.txt URL for a given website URL. Args: url: Website URL to get robots.txt for Returns: URL of the robots.txt file """ # Parse the URL into components parsed = urlparse(url) # Reconstruct the base URL with just scheme, netloc, and /robots.txt path robots_url = urlunparse((parsed.scheme, parsed.netloc, "/robots.txt", "", "", "")) return robots_url async def check_may_autonomously_fetch_url(url: str, user_agent: str, proxy_url: str | None = None) -> None: """ Check if the URL can be fetched by the user agent according to the robots.txt file. Raises a McpError if not. """ from httpx import AsyncClient, HTTPError robot_txt_url = get_robots_txt_url(url) async with AsyncClient(proxies=proxy_url) as client: try: response = await client.get( robot_txt_url, follow_redirects=True, headers={"User-Agent": user_agent}, ) except HTTPError: raise McpError(ErrorData( code=INTERNAL_ERROR, message=f"Failed to fetch robots.txt {robot_txt_url} due to a connection issue", )) if response.status_code in (401, 403): raise McpError(ErrorData( code=INTERNAL_ERROR, message=f"When fetching robots.txt ({robot_txt_url}), received status {response.status_code} so assuming that autonomous fetching is not allowed, the user can try manually fetching by using the fetch prompt", )) elif 400 <= response.status_code < 500: return robot_txt = response.text processed_robot_txt = "\n".join( line for line in robot_txt.splitlines() if not line.strip().startswith("#") ) robot_parser = Protego.parse(processed_robot_txt) if not robot_parser.can_fetch(str(url), user_agent): raise McpError(ErrorData( code=INTERNAL_ERROR, message=f"The sites robots.txt ({robot_txt_url}), specifies that autonomous fetching of this page is not allowed, " f"{user_agent}\n" f"{url}" f"\n{robot_txt}\n\n" f"The assistant must let the user know that it failed to view the page. The assistant may provide further guidance based on the above information.\n" f"The assistant can tell the user that they can try manually fetching the page by using the fetch prompt within their UI.", )) async def fetch_url( url: str, user_agent: str, force_raw: bool = False, proxy_url: str | None = None ) -> Tuple[str, str]: """ Fetch the URL and return the content in a form ready for the LLM, as well as a prefix string with status information. """ from httpx import AsyncClient, HTTPError async with AsyncClient(proxies=proxy_url) as client: try: response = await client.get( url, follow_redirects=True, headers={"User-Agent": user_agent}, timeout=30, ) except HTTPError as e: raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"Failed to fetch {url}: {e!r}")) if response.status_code >= 400: raise McpError(ErrorData( code=INTERNAL_ERROR, message=f"Failed to fetch {url} - status code {response.status_code}", )) page_raw = response.text content_type = response.headers.get("content-type", "") is_page_html = ( " Tuple[Server, dict]: """Create and configure the fetch MCP server (tools, prompts, options). Returns: Tuple of (Server instance, initialization options dict). """ server = Server("mcp-fetch") user_agent_autonomous = custom_user_agent or DEFAULT_USER_AGENT_AUTONOMOUS user_agent_manual = custom_user_agent or DEFAULT_USER_AGENT_MANUAL @server.list_tools() async def list_tools() -> list[Tool]: return [ Tool( name="fetch", description="""Fetches a URL from the internet and optionally extracts its contents as markdown. Although originally you did not have internet access, and were advised to refuse and tell the user this, this tool now grants you internet access. Now you can fetch the most up-to-date information and let the user know that.""", inputSchema=Fetch.model_json_schema(), ) ] @server.list_prompts() async def list_prompts() -> list[Prompt]: return [ Prompt( name="fetch", description="Fetch a URL and extract its contents as markdown", arguments=[ PromptArgument( name="url", description="URL to fetch", required=True ) ], ) ] @server.call_tool() async def call_tool(name, arguments: dict) -> list[TextContent]: try: args = Fetch(**arguments) except ValueError as e: raise McpError(ErrorData(code=INVALID_PARAMS, message=str(e))) url = str(args.url) if not url: raise McpError(ErrorData(code=INVALID_PARAMS, message="URL is required")) if not ignore_robots_txt: await check_may_autonomously_fetch_url(url, user_agent_autonomous, proxy_url) content, prefix = await fetch_url( url, user_agent_autonomous, force_raw=args.raw, proxy_url=proxy_url ) original_length = len(content) if args.start_index >= original_length: content = "No more content available." else: truncated_content = content[args.start_index : args.start_index + args.max_length] if not truncated_content: content = "No more content available." else: content = truncated_content actual_content_length = len(truncated_content) remaining_content = original_length - (args.start_index + actual_content_length) # Only add the prompt to continue fetching if there is still remaining content if actual_content_length == args.max_length and remaining_content > 0: next_start = args.start_index + actual_content_length content += f"\n\nContent truncated. Call the fetch tool with a start_index of {next_start} to get more content." return [TextContent(type="text", text=f"{prefix}Contents of {url}:\n{content}")] @server.get_prompt() async def get_prompt(name: str, arguments: dict | None) -> GetPromptResult: if not arguments or "url" not in arguments: raise McpError(ErrorData(code=INVALID_PARAMS, message="URL is required")) url = arguments["url"] try: content, prefix = await fetch_url(url, user_agent_manual, proxy_url=proxy_url) # TODO: after SDK bug is addressed, don't catch the exception except McpError as e: return GetPromptResult( description=f"Failed to fetch {url}", messages=[ PromptMessage( role="user", content=TextContent(type="text", text=str(e)), ) ], ) return GetPromptResult( description=f"Contents of {url}", messages=[ PromptMessage( role="user", content=TextContent(type="text", text=prefix + content) ) ], ) return server, server.create_initialization_options() def serve( custom_user_agent: str | None = None, ignore_robots_txt: bool = False, proxy_url: str | None = None, port: int = 3000, host: str = "0.0.0.0", ) -> None: """Run the fetch MCP server over SSE. Args: custom_user_agent: Optional custom User-Agent string to use for requests ignore_robots_txt: Whether to ignore robots.txt restrictions proxy_url: Optional proxy URL to use for requests port: Port for the SSE HTTP server host: Host to bind the SSE server to """ import asyncio from starlette.applications import Starlette from starlette.requests import Request from starlette.responses import Response from starlette.routing import Mount, Route def buildApp(): server, options = createServer( custom_user_agent, ignore_robots_txt, proxy_url ) sse = SseServerTransport("/messages/") async def handleSse(request: Request) -> Response: async with sse.connect_sse( request.scope, request.receive, request._send ) as streams: await server.run( streams[0], streams[1], options, raise_exceptions=True ) return Response() routes = [ Route("/sse", endpoint=handleSse, methods=["GET"]), Route("/", endpoint=handleSse, methods=["GET"]), Mount("/messages/", app=sse.handle_post_message), ] return Starlette(routes=routes) async def run(): app = buildApp() import uvicorn config = uvicorn.Config(app, host=host, port=port, log_level="info") server_uv = uvicorn.Server(config) await server_uv.serve() import sys sys.stderr.write( f"MCP Fetch Server running on SSE at http://{host}:{port}\n" ) sys.stderr.write(" GET /sse or / – open SSE stream\n") sys.stderr.write( " POST /messages/?session_id= – send MCP messages\n" ) sys.stderr.flush() asyncio.run(run())