Compare commits
2 Commits
master
...
2969ad73ea
| Author | SHA1 | Date | |
|---|---|---|---|
| 2969ad73ea | |||
| a34633eaee |
353
README.md
353
README.md
@@ -1,2 +1,353 @@
|
|||||||
# Cariddi
|
# Cariddi - MCP Client and Server
|
||||||
|
|
||||||
|
A complete MCP (Model Context Protocol) solution consisting of:
|
||||||
|
- **Cariddi Server**: A FastMCP server with filesystem tools for file operations and command execution
|
||||||
|
- **Cariddi Client**: A Python client that uses Ollama models for inference and connects to MCP servers, specialized as a Crypto Solver Agent for CTF challenges
|
||||||
|
|
||||||
|
## Project Structure
|
||||||
|
|
||||||
|
```
|
||||||
|
Cariddi/
|
||||||
|
├── mcpServer/ # MCP Server implementation
|
||||||
|
│ ├── main.py # FastMCP server entry point
|
||||||
|
│ ├── modules/
|
||||||
|
│ │ └── filesystem.py # Filesystem operation implementations
|
||||||
|
│ ├── requirements.txt
|
||||||
|
│ ├── Dockerfile
|
||||||
|
│ ├── docker-compose.yml
|
||||||
|
│ └── mcp.json # MCP server configuration
|
||||||
|
├── mcpClient/ # MCP Client with Ollama
|
||||||
|
│ ├── mcpClient.py # Main client implementation
|
||||||
|
│ └── requirements.txt
|
||||||
|
└── challs/ # CTF challenges
|
||||||
|
└── cryptoEasy/
|
||||||
|
├── challenge.py
|
||||||
|
└── cryptoeasy.txt
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Cariddi Server
|
||||||
|
|
||||||
|
A FastMCP server that provides filesystem tools for file operations, command execution, and Python file writing with proper handling of escape characters.
|
||||||
|
|
||||||
|
### Server Setup
|
||||||
|
|
||||||
|
1. Navigate to the server directory:
|
||||||
|
```bash
|
||||||
|
cd Cariddi
|
||||||
|
```
|
||||||
|
|
||||||
|
2. Create and activate virtual environment:
|
||||||
|
```bash
|
||||||
|
python3 -m venv venv
|
||||||
|
source venv/bin/activate # On Windows: venv\Scripts\activate
|
||||||
|
```
|
||||||
|
|
||||||
|
3. Install dependencies:
|
||||||
|
```bash
|
||||||
|
pip install -r requirements.txt
|
||||||
|
```
|
||||||
|
|
||||||
|
### Running the Server
|
||||||
|
|
||||||
|
```bash
|
||||||
|
source venv/bin/activate # On Windows: venv\Scripts\activate
|
||||||
|
python main.py
|
||||||
|
```
|
||||||
|
|
||||||
|
The server will start on `http://0.0.0.0:8000/mcp` with streamable-http transport.
|
||||||
|
|
||||||
|
### Environment Variables
|
||||||
|
|
||||||
|
- `FASTMCP_HOST` or `MCP_HOST`: Server host (default: `0.0.0.0`)
|
||||||
|
- `FASTMCP_PORT` or `MCP_PORT`: Server port (default: `8000`)
|
||||||
|
|
||||||
|
### Using MCP Inspector
|
||||||
|
|
||||||
|
The MCP Inspector is a visual tool for testing and debugging MCP servers.
|
||||||
|
|
||||||
|
#### Install and Run Inspector
|
||||||
|
|
||||||
|
1. Make sure your server is running (see above)
|
||||||
|
|
||||||
|
2. Run the inspector to connect to your server:
|
||||||
|
```bash
|
||||||
|
npx @modelcontextprotocol/inspector --url http://localhost:8000/mcp
|
||||||
|
```
|
||||||
|
|
||||||
|
The inspector will open in your browser (typically at `http://localhost:5173`).
|
||||||
|
|
||||||
|
#### Alternative: Run Inspector with Server
|
||||||
|
|
||||||
|
You can also run the inspector and server together:
|
||||||
|
```bash
|
||||||
|
npx @modelcontextprotocol/inspector python main.py
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Setup MCP Inspector
|
||||||
|
|
||||||
|
Use "Streamable HTTP"
|
||||||
|
|
||||||
|
URL: `http://localhost:8000/mcp`
|
||||||
|
|
||||||
|
and press connect.
|
||||||
|
|
||||||
|
### Docker Deployment
|
||||||
|
|
||||||
|
#### Build and Run with Docker
|
||||||
|
|
||||||
|
1. Navigate to the server directory:
|
||||||
|
```bash
|
||||||
|
cd Cariddi
|
||||||
|
```
|
||||||
|
|
||||||
|
2. Build the Docker image:
|
||||||
|
```bash
|
||||||
|
docker build -t cariddi-mcp-server .
|
||||||
|
```
|
||||||
|
|
||||||
|
3. Run the container:
|
||||||
|
```bash
|
||||||
|
docker run -d -p 8000:8000 --name cariddi-mcp cariddi-mcp-server
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Using Docker Compose
|
||||||
|
|
||||||
|
1. Navigate to the server directory:
|
||||||
|
```bash
|
||||||
|
cd Cariddi
|
||||||
|
```
|
||||||
|
|
||||||
|
2. Start the server:
|
||||||
|
```bash
|
||||||
|
docker-compose up -d
|
||||||
|
```
|
||||||
|
|
||||||
|
3. View logs:
|
||||||
|
```bash
|
||||||
|
docker-compose logs -f
|
||||||
|
```
|
||||||
|
|
||||||
|
4. Stop the server:
|
||||||
|
```bash
|
||||||
|
docker-compose down
|
||||||
|
```
|
||||||
|
|
||||||
|
The server will be accessible at `http://localhost:8000/mcp` from your host machine.
|
||||||
|
|
||||||
|
### Server Tools
|
||||||
|
|
||||||
|
The server provides the following tools:
|
||||||
|
|
||||||
|
- **`listFiles(path: str)`** - List all files in the given path
|
||||||
|
- **`readFile(path: str)`** - Read the contents of a file
|
||||||
|
- **`writeFile(path: str, content: str)`** - Write contents to a file
|
||||||
|
- **`executeCommand(command: str)`** - Execute a shell command and return stdout, stderr, and return code
|
||||||
|
- **`writePythonFile(path: str, content: str)`** - Write a Python file handling streaming and escape characters correctly (handles code blocks and unicode escapes)
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Cariddi Client
|
||||||
|
|
||||||
|
A Python MCP client that uses Ollama models for inference. The client is specialized as a **Crypto Solver Agent** for CTF (Capture The Flag) challenges, capable of identifying, analyzing, and solving cryptographic challenges.
|
||||||
|
|
||||||
|
### Client Requirements
|
||||||
|
|
||||||
|
- Python 3.7+
|
||||||
|
- Ollama installed and running (see https://ollama.ai/)
|
||||||
|
|
||||||
|
### Client Installation
|
||||||
|
|
||||||
|
1. Navigate to the client directory:
|
||||||
|
```bash
|
||||||
|
cd CariddiClient
|
||||||
|
```
|
||||||
|
|
||||||
|
2. Install Python dependencies:
|
||||||
|
```bash
|
||||||
|
pip install -r requirements.txt
|
||||||
|
```
|
||||||
|
|
||||||
|
3. Make sure Ollama is running:
|
||||||
|
```bash
|
||||||
|
ollama serve
|
||||||
|
```
|
||||||
|
|
||||||
|
4. Pull a model (if you haven't already):
|
||||||
|
```bash
|
||||||
|
ollama pull ministral-3
|
||||||
|
# or
|
||||||
|
ollama pull llama3.2
|
||||||
|
```
|
||||||
|
|
||||||
|
### Client Usage
|
||||||
|
|
||||||
|
#### List available models
|
||||||
|
```bash
|
||||||
|
python mcpClient.py --list-models
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Send a single prompt
|
||||||
|
```bash
|
||||||
|
python mcpClient.py --prompt "What is the capital of France?"
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Interactive mode
|
||||||
|
```bash
|
||||||
|
python mcpClient.py --interactive
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Custom Ollama URL and model
|
||||||
|
```bash
|
||||||
|
python mcpClient.py --base-url http://localhost:11434 --model ministral-3 --prompt "Hello!"
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Connect to MCP server (streamable HTTP)
|
||||||
|
```bash
|
||||||
|
# Connect to MCP server via streamable HTTP
|
||||||
|
python mcpClient.py --mcp-server "http://localhost:8000/mcp" --prompt "Use tools to help me"
|
||||||
|
|
||||||
|
# With authentication headers
|
||||||
|
python mcpClient.py --mcp-server "http://localhost:8000/mcp" --mcp-headers '{"Authorization": "Bearer token"}' --interactive
|
||||||
|
```
|
||||||
|
|
||||||
|
### Client Examples
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Simple question
|
||||||
|
python mcpClient.py --prompt "Explain quantum computing in simple terms"
|
||||||
|
|
||||||
|
# Interactive chat
|
||||||
|
python mcpClient.py -i
|
||||||
|
|
||||||
|
# Use a different model
|
||||||
|
python mcpClient.py --model mistral --prompt "Write a haiku about coding"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Client Features
|
||||||
|
|
||||||
|
- Connects to local or remote Ollama instances
|
||||||
|
- Supports chat and generation modes
|
||||||
|
- **Connect to MCP servers** and use their tools automatically
|
||||||
|
- Tool registration for extensibility
|
||||||
|
- Interactive and non-interactive modes
|
||||||
|
- Health checking for Ollama server
|
||||||
|
- Automatic tool calling from MCP server tools
|
||||||
|
- **Specialized Crypto Solver Agent** with built-in knowledge for CTF challenges
|
||||||
|
|
||||||
|
### Crypto Solver Agent
|
||||||
|
|
||||||
|
The client is configured as a specialized Crypto Solver Agent that:
|
||||||
|
|
||||||
|
1. **Exploration**: Lists files in `/tmp` directory to identify relevant challenge files
|
||||||
|
2. **Analysis**: Identifies cryptographic schemes (RSA, AES, DES, XOR, etc.) and vulnerabilities
|
||||||
|
3. **Execution**: Writes and executes Python scripts to solve challenges
|
||||||
|
4. **Validation**: Searches for flags in the format `flag{...}`
|
||||||
|
|
||||||
|
The agent can handle:
|
||||||
|
- **RSA**: Small modulus factorization, low public exponent attacks, Wiener attack, Hastad attack, common modulus attacks
|
||||||
|
- **Symmetric Encryption**: AES/DES with various modes (ECB, CBC), IV vulnerabilities, key reuse
|
||||||
|
- **Classical Ciphers**: Frequency analysis, fixed-key attacks
|
||||||
|
- **Encoding**: Base64, Hex, Big-Endian/Little-Endian conversions
|
||||||
|
|
||||||
|
### Connecting to an MCP Server
|
||||||
|
|
||||||
|
The client uses **FastMCP** to connect to an existing MCP server via **streamable HTTP**. Once connected, the client:
|
||||||
|
|
||||||
|
1. Automatically loads available tools from the MCP server
|
||||||
|
2. Passes them to Ollama as usable tools
|
||||||
|
3. Executes tools when requested by the model
|
||||||
|
4. Returns results to the model to continue the conversation
|
||||||
|
|
||||||
|
#### Example with MCP Server
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Connect to an MCP server via streamable HTTP
|
||||||
|
python mcpClient.py --mcp-server "http://localhost:8000/mcp" --interactive
|
||||||
|
|
||||||
|
# With authentication headers
|
||||||
|
python mcpClient.py --mcp-server "http://localhost:8000/mcp" --mcp-headers '{"Authorization": "Bearer your-token"}' --prompt "Use your tools"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Default Configuration
|
||||||
|
|
||||||
|
- **Default Ollama URL**: `http://localhost:11434`
|
||||||
|
- **Default Model**: `ministral-3`
|
||||||
|
- **Default MCP Server**: `http://localhost:8000/mcp`
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Complete Workflow Example
|
||||||
|
|
||||||
|
### 1. Start the MCP Server
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd Cariddi
|
||||||
|
python main.py
|
||||||
|
```
|
||||||
|
|
||||||
|
The server will start on `http://localhost:8000/mcp`.
|
||||||
|
|
||||||
|
### 2. Run the Client and Connect to the Server
|
||||||
|
|
||||||
|
In another terminal:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd CariddiClient
|
||||||
|
python mcpClient.py --mcp-server "http://localhost:8000/mcp" --interactive
|
||||||
|
```
|
||||||
|
|
||||||
|
### 3. Use the Crypto Solver Agent
|
||||||
|
|
||||||
|
The client will automatically discover and use the server's tools (like `listFiles`, `readFile`, `writeFile`, `executeCommand`, `writePythonFile`) through Ollama. You can ask it to solve CTF challenges:
|
||||||
|
|
||||||
|
```
|
||||||
|
You: Analyze the files in /tmp and solve the crypto challenge
|
||||||
|
```
|
||||||
|
|
||||||
|
The agent will:
|
||||||
|
- List files in `/tmp`
|
||||||
|
- Read relevant files
|
||||||
|
- Analyze the cryptographic scheme
|
||||||
|
- Write and execute Python scripts to solve the challenge
|
||||||
|
- Return the flag
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## CTF Challenges
|
||||||
|
|
||||||
|
The `challs/` directory contains CTF challenges for testing the Crypto Solver Agent:
|
||||||
|
|
||||||
|
- **cryptoEasy**: A Diffie-Hellman based challenge with AES encryption
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Development
|
||||||
|
|
||||||
|
### Server Development
|
||||||
|
|
||||||
|
The server is built using FastMCP and provides filesystem operations. To add new tools:
|
||||||
|
|
||||||
|
1. Implement the tool function in `modules/filesystem.py`
|
||||||
|
2. Register it as an MCP tool in `main.py` using `@mcpServer.tool()`
|
||||||
|
|
||||||
|
### Client Development
|
||||||
|
|
||||||
|
The client uses FastMCP for server communication and Ollama for inference. To modify the agent's behavior:
|
||||||
|
|
||||||
|
1. Edit the system prompt in `mcpClient.py` (line 248)
|
||||||
|
2. Add custom tools using `registerTool()` method
|
||||||
|
3. Modify the tool execution logic in `_executeTool()` method
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## License
|
||||||
|
|
||||||
|
[Add your license information here]
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Contributing
|
||||||
|
|
||||||
|
[Add contributing guidelines here]
|
||||||
|
|||||||
63
challs/cryptoEasy/challenge.py
Normal file
63
challs/cryptoEasy/challenge.py
Normal file
@@ -0,0 +1,63 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
from Crypto.Cipher import AES
|
||||||
|
from Crypto.Util.Padding import pad, unpad
|
||||||
|
import os
|
||||||
|
|
||||||
|
def factorial(x):
|
||||||
|
prod = 1
|
||||||
|
for i in range (1,x+1):
|
||||||
|
prod = prod * i
|
||||||
|
return prod
|
||||||
|
|
||||||
|
a = 3
|
||||||
|
b = 8
|
||||||
|
p = 159043501668831001976189741401919059600158436023339250375247150721773143712698491956718970846959154624950002991005143073475212844582380943612898306056733646147380223572684106846684017427300415826606628398091756029258247836173822579694289151452726958472153473864316673552015163436466970719494284188245853583109
|
||||||
|
g = factorial(p-1)
|
||||||
|
|
||||||
|
flag = "flag{...}"
|
||||||
|
|
||||||
|
def getDHkey():
|
||||||
|
A = pow(g,a,p)
|
||||||
|
B = pow(g,b,p)
|
||||||
|
K = pow(B,a,p)
|
||||||
|
|
||||||
|
return K
|
||||||
|
|
||||||
|
def handle():
|
||||||
|
keyExchanged = str(getDHkey())
|
||||||
|
encryptedFlag = encrypt(flag.encode("utf-8"),keyExchanged)
|
||||||
|
print("Il messaggio crittografato è: {0}".format(encryptedFlag.hex()))
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
def fakePadding(k):
|
||||||
|
if (len(k) > 16):
|
||||||
|
raise ValueError('La tua chiave è più lunga di 16 byte')
|
||||||
|
else:
|
||||||
|
if len(k) == 16:
|
||||||
|
return k
|
||||||
|
else:
|
||||||
|
missingBytes = 16 - len(k)
|
||||||
|
for i in range(missingBytes):
|
||||||
|
k = ''.join([k,"0"])
|
||||||
|
return k
|
||||||
|
|
||||||
|
def encrypt(f,k):
|
||||||
|
key = bytes(fakePadding(k),"utf-8")
|
||||||
|
|
||||||
|
cipher = AES.new(key, AES.MODE_ECB)
|
||||||
|
encryptedFlag = cipher.encrypt(pad(f, AES.block_size))
|
||||||
|
return encryptedFlag
|
||||||
|
|
||||||
|
def decrypt(f, k):
|
||||||
|
|
||||||
|
key = fakePadding(str(k))
|
||||||
|
|
||||||
|
chiave = bytes(key, "utf-8")
|
||||||
|
cipher = AES.new(chiave, AES.MODE_ECB)
|
||||||
|
decryptedFlag = cipher.decrypt(f)
|
||||||
|
return decryptedFlag
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
handle()
|
||||||
3
challs/cryptoEasy/cryptoeasy.txt
Normal file
3
challs/cryptoEasy/cryptoeasy.txt
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
Diffie Hellman è così costoso computazionalmente se si usano valori particolari, come venirne fuori?
|
||||||
|
|
||||||
|
Ciphertext: b5609cfbad99f1b20ec3a93b97f379d8426f934ffcb77d83ea9161fefa78d243
|
||||||
510
mcpClient/mcpClient.py
Normal file
510
mcpClient/mcpClient.py
Normal file
@@ -0,0 +1,510 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Simple MCP client that uses Ollama models for inference.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from fastmcp.client.transports import NodeStdioTransport, PythonStdioTransport, SSETransport, StreamableHttpTransport
|
||||||
|
|
||||||
|
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import asyncio
|
||||||
|
from typing import Optional, Dict, Any, List
|
||||||
|
import requests
|
||||||
|
from fastmcp import Client as FastMcpClient
|
||||||
|
|
||||||
|
|
||||||
|
class OllamaClient:
|
||||||
|
"""Client for interacting with Ollama API."""
|
||||||
|
|
||||||
|
def __init__(self, baseUrl: str = "http://localhost:11434", model: str = "gpt-oss:20b"):
|
||||||
|
self.baseUrl = baseUrl
|
||||||
|
self.model = model
|
||||||
|
|
||||||
|
def listModels(self) -> List[str]:
|
||||||
|
"""List available Ollama models."""
|
||||||
|
try:
|
||||||
|
response = requests.get(f"{self.baseUrl}/api/tags", timeout=10)
|
||||||
|
response.raise_for_status()
|
||||||
|
data = response.json()
|
||||||
|
return [model["name"] for model in data.get("models", [])]
|
||||||
|
except requests.RequestException as e:
|
||||||
|
print(f"Error listing models: {e}", file=sys.stderr)
|
||||||
|
return []
|
||||||
|
|
||||||
|
def chat(self, messages: List[Dict[str, str]], options: Optional[Dict[str, Any]] = None) -> str:
|
||||||
|
"""Send chat messages to Ollama and get response."""
|
||||||
|
payload = {
|
||||||
|
"model": self.model,
|
||||||
|
"messages": messages,
|
||||||
|
"stream": False,
|
||||||
|
}
|
||||||
|
|
||||||
|
if options:
|
||||||
|
payload["options"] = options
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = requests.post(
|
||||||
|
f"{self.baseUrl}/api/chat",
|
||||||
|
json=payload,
|
||||||
|
timeout=60*60
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
data = response.json()
|
||||||
|
return data.get("message", {}).get("content", "")
|
||||||
|
except requests.RequestException as e:
|
||||||
|
print(f"Error in chat request: {e}", file=sys.stderr)
|
||||||
|
raise
|
||||||
|
|
||||||
|
def generate(self, prompt: str, options: Optional[Dict[str, Any]] = None) -> str:
|
||||||
|
"""Generate text from a prompt using Ollama."""
|
||||||
|
payload = {
|
||||||
|
"model": self.model,
|
||||||
|
"prompt": prompt,
|
||||||
|
"stream": False,
|
||||||
|
}
|
||||||
|
|
||||||
|
if options:
|
||||||
|
payload["options"] = options
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = requests.post(
|
||||||
|
f"{self.baseUrl}/api/generate",
|
||||||
|
json=payload,
|
||||||
|
timeout=120
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
data = response.json()
|
||||||
|
return data.get("response", "")
|
||||||
|
except requests.RequestException as e:
|
||||||
|
print(f"Error in generate request: {e}", file=sys.stderr)
|
||||||
|
raise
|
||||||
|
|
||||||
|
def checkHealth(self) -> bool:
|
||||||
|
"""Check if Ollama server is accessible."""
|
||||||
|
try:
|
||||||
|
response = requests.get(f"{self.baseUrl}/api/tags", timeout=5)
|
||||||
|
return response.status_code == 200
|
||||||
|
except requests.RequestException:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
class McpServerWrapper:
|
||||||
|
"""Wrapper around FastMCP Client for easier use."""
|
||||||
|
|
||||||
|
def __init__(self, httpUrl: str, headers: Optional[Dict[str, str]] = None):
|
||||||
|
self.httpUrl = httpUrl.rstrip("/")
|
||||||
|
self.headers = headers or {}
|
||||||
|
self.client: Optional[FastMcpClient] = None
|
||||||
|
self.serverTools: List[Dict[str, Any]] = []
|
||||||
|
|
||||||
|
async def connect(self) -> bool:
|
||||||
|
"""Connect and initialize with MCP server via HTTP."""
|
||||||
|
try:
|
||||||
|
# FastMcpClient doesn't support headers parameter directly
|
||||||
|
# Headers would need to be passed via custom transport or auth
|
||||||
|
# For now, we initialize without headers
|
||||||
|
self.client = FastMcpClient(self.httpUrl)
|
||||||
|
await self.client.__aenter__()
|
||||||
|
# Load tools after connection
|
||||||
|
tools = await self.listServerTools()
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error connecting to MCP server: {e}", file=sys.stderr)
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def disconnect(self):
|
||||||
|
"""Disconnect from MCP server."""
|
||||||
|
if self.client:
|
||||||
|
await self.client.__aexit__(None, None, None)
|
||||||
|
self.client = None
|
||||||
|
|
||||||
|
async def listServerTools(self) -> List[Dict[str, Any]]:
|
||||||
|
"""List tools available from MCP server."""
|
||||||
|
if not self.client:
|
||||||
|
return []
|
||||||
|
|
||||||
|
try:
|
||||||
|
tools = await self.client.list_tools()
|
||||||
|
self.serverTools = tools
|
||||||
|
return tools
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error listing tools: {e}", file=sys.stderr)
|
||||||
|
return []
|
||||||
|
|
||||||
|
async def callServerTool(self, name: str, arguments: Dict[str, Any]) -> Any:
|
||||||
|
"""Call a tool on the MCP server."""
|
||||||
|
if not self.client:
|
||||||
|
raise RuntimeError("Not connected to MCP server")
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = await self.client.call_tool(name, arguments)
|
||||||
|
# FastMCP call_tool returns a result object with .content
|
||||||
|
if hasattr(result, 'content'):
|
||||||
|
content = result.content
|
||||||
|
# If content is a list, return it as is (will be serialized later)
|
||||||
|
if isinstance(content, list):
|
||||||
|
return content
|
||||||
|
return content
|
||||||
|
elif isinstance(result, list):
|
||||||
|
# Handle list of results
|
||||||
|
if len(result) > 0:
|
||||||
|
# Extract content from each item if it exists
|
||||||
|
contents = []
|
||||||
|
for item in result:
|
||||||
|
if hasattr(item, 'content'):
|
||||||
|
contents.append(item.content)
|
||||||
|
else:
|
||||||
|
contents.append(item)
|
||||||
|
return contents if len(contents) > 1 else contents[0] if contents else None
|
||||||
|
return result
|
||||||
|
return result
|
||||||
|
except Exception as e:
|
||||||
|
raise RuntimeError(f"Tool call failed: {str(e)}")
|
||||||
|
|
||||||
|
async def listServerResources(self) -> List[Dict[str, Any]]:
|
||||||
|
"""List resources available from MCP server."""
|
||||||
|
if not self.client:
|
||||||
|
return []
|
||||||
|
|
||||||
|
try:
|
||||||
|
resources = await self.client.list_resources()
|
||||||
|
return resources
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error listing resources: {e}", file=sys.stderr)
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
class OllamaMcpClient:
|
||||||
|
"""Simple MCP client that uses Ollama for inference."""
|
||||||
|
|
||||||
|
def __init__(self, ollamaClient: OllamaClient, mcpServer: Optional[McpServerWrapper] = None):
|
||||||
|
self.ollamaClient = ollamaClient
|
||||||
|
self.mcpServer = mcpServer
|
||||||
|
self.tools: List[Dict[str, Any]] = []
|
||||||
|
self.resources: List[Dict[str, Any]] = []
|
||||||
|
|
||||||
|
def _serializeToolResult(self, result: Any) -> Any:
|
||||||
|
"""Serialize tool result to JSON-serializable format."""
|
||||||
|
# Handle TextContent and other content objects
|
||||||
|
if hasattr(result, 'text'):
|
||||||
|
return result.text
|
||||||
|
if hasattr(result, 'content'):
|
||||||
|
content = result.content
|
||||||
|
if hasattr(content, 'text'):
|
||||||
|
return content.text
|
||||||
|
return content
|
||||||
|
# Handle lists of content objects
|
||||||
|
if isinstance(result, list):
|
||||||
|
return [self._serializeToolResult(item) for item in result]
|
||||||
|
# Handle dicts
|
||||||
|
if isinstance(result, dict):
|
||||||
|
return {k: self._serializeToolResult(v) for k, v in result.items()}
|
||||||
|
# Already serializable (str, int, float, bool, None)
|
||||||
|
return result
|
||||||
|
|
||||||
|
async def _loadServerTools(self):
|
||||||
|
"""Load tools from connected MCP server."""
|
||||||
|
if self.mcpServer:
|
||||||
|
serverTools = await self.mcpServer.listServerTools()
|
||||||
|
for tool in serverTools:
|
||||||
|
# Handle both Pydantic Tool objects and dicts
|
||||||
|
if hasattr(tool, "name"):
|
||||||
|
# Pydantic Tool object - access attributes directly
|
||||||
|
name = getattr(tool, "name", "")
|
||||||
|
description = getattr(tool, "description", "")
|
||||||
|
# Try both camelCase and snake_case for inputSchema
|
||||||
|
inputSchema = getattr(tool, "inputSchema", getattr(tool, "input_schema", {}))
|
||||||
|
else:
|
||||||
|
# Dict - use .get()
|
||||||
|
name = tool.get("name", "")
|
||||||
|
description = tool.get("description", "")
|
||||||
|
inputSchema = tool.get("inputSchema", tool.get("input_schema", {}))
|
||||||
|
|
||||||
|
self.tools.append({
|
||||||
|
"name": name,
|
||||||
|
"description": description,
|
||||||
|
"inputSchema": inputSchema
|
||||||
|
})
|
||||||
|
|
||||||
|
def registerTool(self, name: str, description: str, parameters: Dict[str, Any]):
|
||||||
|
"""Register a tool that can be used by the model."""
|
||||||
|
self.tools.append({
|
||||||
|
"name": name,
|
||||||
|
"description": description,
|
||||||
|
"inputSchema": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": parameters,
|
||||||
|
"required": list(parameters.keys())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
async def processRequest(self, prompt: str, context: Optional[List[str]] = None, maxIterations: int = 5) -> str:
|
||||||
|
"""Process a request using Ollama with optional context and tool support."""
|
||||||
|
messages = [
|
||||||
|
{
|
||||||
|
"role": "system",
|
||||||
|
"content": """Sei un Crypto Solver Agent specializzato in sfide CTF (Capture The Flag). Il tuo obiettivo primario è identificare, analizzare e risolvere sfide crittografiche memorizzate nella directory /tmp per recuperare la flag. REGOLE OPERATIVE: Esplorazione: Inizia sempre elencando i file presenti in /tmp. Identifica i file rilevanti come sorgenti Python (.py), output di testo (.txt), file cifrati o chiavi pubbliche/private (.pem, .pub). Analisi: Leggi i file trovati. Determina il tipo di crittografia coinvolta. Casi comuni: RSA: analizza parametri come n, e, c. Verifica se n è piccolo (fattorizzabile), se e è basso (attacco radice e-esima) o se ci sono vulnerabilità note (Wiener, Hastad, moduli comuni). Simmetrica (AES/DES): cerca la modalità (ECB, CBC), vulnerabilità nel IV, o riutilizzo della chiave. XOR/Cifrari Classici: esegui analisi delle frequenze o attacchi a chiave fissa. Encoding: gestisci correttamente Base64, Hex, Big-Endian/Little-Endian. Esecuzione: Scrivi ed esegui script Python per risolvere la sfida. Utilizza librerie come pycryptodome, gmpy2 o sympy se disponibili nell'ambiente. Non limitarti a spiegare la teoria: scrivi il codice necessario a produrre il plaintext. Validazione: Una volta decifrato il contenuto, cerca stringhe nel formato flag{...}. Se il risultato non è leggibile, rivaluta l'approccio e prova una strategia alternativa. REQUISITI DI OUTPUT: Fornisci una breve spiegazione della vulnerabilità trovata. Mostra il codice Python risolutivo che hai generato. Restituisci la flag finale in modo chiaramente visibile. LIMITI: Opera esclusivamente all'interno della directory /tmp. Non tentare di forzare la password di sistema; concentrati sulla logica crittografica. Se mancano dati (es. un file citato nel codice non è presente), chiedi esplicitamente o cercalo nelle sottocartelle di /tmp. Inizia ora analizzando il contenuto di /tmp."""
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
if context:
|
||||||
|
messages.append({
|
||||||
|
"role": "system",
|
||||||
|
"content": f"Context:\n{'\n\n'.join(context)}"
|
||||||
|
})
|
||||||
|
|
||||||
|
if self.tools:
|
||||||
|
toolDescriptions = json.dumps(self.tools, indent=2)
|
||||||
|
messages.append({
|
||||||
|
"role": "system",
|
||||||
|
"content": f"Available tools:\n{toolDescriptions}\n\nTo use a tool, respond with JSON: {{\"tool_name\": \"name\", \"tool_args\": {{...}}}}"
|
||||||
|
})
|
||||||
|
|
||||||
|
messages.append({
|
||||||
|
"role": "user",
|
||||||
|
"content": prompt
|
||||||
|
})
|
||||||
|
|
||||||
|
iteration = 0
|
||||||
|
while iteration < maxIterations:
|
||||||
|
response = self.ollamaClient.chat(messages)
|
||||||
|
|
||||||
|
# Check if response contains tool call
|
||||||
|
toolCall = self._parseToolCall(response)
|
||||||
|
if toolCall:
|
||||||
|
toolName = toolCall.get("tool_name")
|
||||||
|
toolArgs = toolCall.get("tool_args", {})
|
||||||
|
|
||||||
|
# Print agent intent (response before tool call)
|
||||||
|
print(f"\n[Agent Intent]: {response}", file=sys.stderr)
|
||||||
|
print(f"[Tool Call Detected]: {toolName} with arguments: {toolArgs}", file=sys.stderr)
|
||||||
|
|
||||||
|
# Try to call the tool
|
||||||
|
try:
|
||||||
|
print(f"[Executing Tool]: {toolName} with arguments: {toolArgs}", file=sys.stderr)
|
||||||
|
toolResult = await self._executeTool(toolName, toolArgs)
|
||||||
|
# Serialize tool result to JSON-serializable format
|
||||||
|
serializedResult = self._serializeToolResult(toolResult)
|
||||||
|
print(f"[Tool Output]: {json.dumps(serializedResult, indent=2)}", file=sys.stderr)
|
||||||
|
messages.append({
|
||||||
|
"role": "assistant",
|
||||||
|
"content": response
|
||||||
|
})
|
||||||
|
messages.append({
|
||||||
|
"role": "user",
|
||||||
|
"content": f"Tool result: {json.dumps(serializedResult)}"
|
||||||
|
})
|
||||||
|
iteration += 1
|
||||||
|
continue
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[Tool Error]: {str(e)}", file=sys.stderr)
|
||||||
|
messages.append({
|
||||||
|
"role": "assistant",
|
||||||
|
"content": response
|
||||||
|
})
|
||||||
|
messages.append({
|
||||||
|
"role": "user",
|
||||||
|
"content": f"Tool error: {str(e)}"
|
||||||
|
})
|
||||||
|
iteration += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# No tool call, return response
|
||||||
|
print(f"\n[Agent Response (Final)]: {response}", file=sys.stderr)
|
||||||
|
return response
|
||||||
|
|
||||||
|
return messages[-1].get("content", "Max iterations reached")
|
||||||
|
|
||||||
|
def _parseToolCall(self, response: str) -> Optional[Dict[str, Any]]:
|
||||||
|
"""Try to parse tool call from response."""
|
||||||
|
# Try to find JSON object in response
|
||||||
|
try:
|
||||||
|
# Look for JSON in response
|
||||||
|
startIdx = response.find("{")
|
||||||
|
endIdx = response.rfind("}") + 1
|
||||||
|
if startIdx >= 0 and endIdx > startIdx:
|
||||||
|
jsonStr = response[startIdx:endIdx]
|
||||||
|
parsed = json.loads(jsonStr)
|
||||||
|
if "tool_name" in parsed:
|
||||||
|
return parsed
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def _executeTool(self, toolName: str, toolArgs: Dict[str, Any]) -> Any:
|
||||||
|
"""Execute a tool - either from server or local."""
|
||||||
|
# First check if it's a server tool
|
||||||
|
if self.mcpServer:
|
||||||
|
# Check if tool exists in server tools
|
||||||
|
for tool in self.mcpServer.serverTools:
|
||||||
|
# Handle both Pydantic Tool objects and dicts
|
||||||
|
tool_name = getattr(tool, "name", None) if hasattr(tool, "name") else tool.get("name") if isinstance(tool, dict) else None
|
||||||
|
if tool_name == toolName:
|
||||||
|
return await self.mcpServer.callServerTool(toolName, toolArgs)
|
||||||
|
|
||||||
|
# Check local tools
|
||||||
|
if toolName == "get_time":
|
||||||
|
from datetime import datetime
|
||||||
|
return datetime.now().isoformat()
|
||||||
|
elif toolName == "count_words":
|
||||||
|
text = toolArgs.get("text", "")
|
||||||
|
return len(text.split())
|
||||||
|
|
||||||
|
raise ValueError(f"Tool '{toolName}' not found")
|
||||||
|
|
||||||
|
def listTools(self) -> List[Dict[str, Any]]:
|
||||||
|
"""List all registered tools."""
|
||||||
|
return self.tools
|
||||||
|
|
||||||
|
def listResources(self) -> List[Dict[str, Any]]:
|
||||||
|
"""List all available resources."""
|
||||||
|
return self.resources
|
||||||
|
|
||||||
|
|
||||||
|
async def async_main(args, ollamaClient: OllamaClient):
|
||||||
|
"""Async main function."""
|
||||||
|
# Connect to MCP server if specified
|
||||||
|
mcpServerWrapper = None
|
||||||
|
if args.mcp_server:
|
||||||
|
headers = {}
|
||||||
|
if args.mcp_headers:
|
||||||
|
try:
|
||||||
|
headers = json.loads(args.mcp_headers)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
print("Warning: Invalid JSON in --mcp-headers, ignoring", file=sys.stderr)
|
||||||
|
|
||||||
|
mcpServerWrapper = McpServerWrapper(httpUrl=args.mcp_server, headers=headers)
|
||||||
|
if not await mcpServerWrapper.connect():
|
||||||
|
print("Error: Failed to connect to MCP server", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
print("Connected to MCP server via streamable HTTP", file=sys.stderr)
|
||||||
|
|
||||||
|
# Initialize MCP client
|
||||||
|
mcpClient = OllamaMcpClient(ollamaClient, mcpServerWrapper)
|
||||||
|
|
||||||
|
# Load server tools
|
||||||
|
if mcpServerWrapper:
|
||||||
|
await mcpClient._loadServerTools()
|
||||||
|
serverTools = await mcpServerWrapper.listServerTools()
|
||||||
|
if serverTools:
|
||||||
|
# Handle both Pydantic Tool objects and dicts
|
||||||
|
tool_names = [
|
||||||
|
getattr(t, "name", "") if hasattr(t, "name") else t.get("name", "") if isinstance(t, dict) else ""
|
||||||
|
for t in serverTools
|
||||||
|
]
|
||||||
|
print(f"Available MCP server tools: {tool_names}", file=sys.stderr)
|
||||||
|
|
||||||
|
# Register some example tools
|
||||||
|
mcpClient.registerTool(
|
||||||
|
name="get_time",
|
||||||
|
description="Get the current time",
|
||||||
|
parameters={}
|
||||||
|
)
|
||||||
|
mcpClient.registerTool(
|
||||||
|
name="count_words",
|
||||||
|
description="Count words in a text",
|
||||||
|
parameters={
|
||||||
|
"text": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "The text to count words in"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Process prompt or run interactively
|
||||||
|
if args.prompt:
|
||||||
|
response = await mcpClient.processRequest(args.prompt)
|
||||||
|
print(response)
|
||||||
|
elif args.interactive:
|
||||||
|
print("MCP Client with Ollama - Interactive Mode")
|
||||||
|
print("Type 'quit' or 'exit' to exit\n")
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
prompt = input("You: ").strip()
|
||||||
|
if prompt.lower() in ["quit", "exit"]:
|
||||||
|
break
|
||||||
|
if not prompt:
|
||||||
|
continue
|
||||||
|
response = await mcpClient.processRequest(prompt)
|
||||||
|
print(f"Assistant: {response}\n")
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("\nGoodbye!")
|
||||||
|
break
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error: {e}", file=sys.stderr)
|
||||||
|
|
||||||
|
# Cleanup
|
||||||
|
if mcpServerWrapper:
|
||||||
|
await mcpServerWrapper.disconnect()
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""Main function to run the MCP client."""
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description="MCP client using Ollama")
|
||||||
|
parser.add_argument(
|
||||||
|
"--base-url",
|
||||||
|
default="http://localhost:11434",
|
||||||
|
help="Ollama base URL (default: http://localhost:11434)"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--model",
|
||||||
|
default="ministral-3",
|
||||||
|
help="Ollama model to use (default: ministral-3)"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--list-models",
|
||||||
|
action="store_true",
|
||||||
|
help="List available Ollama models and exit"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--prompt",
|
||||||
|
help="Prompt to send to the model"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--interactive",
|
||||||
|
"-i",
|
||||||
|
action="store_true",
|
||||||
|
help="Run in interactive mode"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--mcp-server",
|
||||||
|
help="HTTP URL for MCP server (e.g., 'http://localhost:8000/mcp')",
|
||||||
|
default="http://localhost:8000/mcp"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--mcp-headers",
|
||||||
|
help="Additional headers for MCP server as JSON string (e.g., '{\"Authorization\": \"Bearer token\"}')"
|
||||||
|
)
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
# Initialize Ollama client
|
||||||
|
ollamaClient = OllamaClient(baseUrl=args.base_url, model=args.model)
|
||||||
|
|
||||||
|
# Check health
|
||||||
|
if not ollamaClient.checkHealth():
|
||||||
|
print(f"Error: Cannot connect to Ollama at {args.base_url}", file=sys.stderr)
|
||||||
|
print("Make sure Ollama is running and accessible.", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# List models if requested
|
||||||
|
if args.list_models:
|
||||||
|
models = ollamaClient.listModels()
|
||||||
|
print("Available models:")
|
||||||
|
for model in models:
|
||||||
|
print(f" - {model}")
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
# Run async main
|
||||||
|
asyncio.run(async_main(args, ollamaClient))
|
||||||
|
|
||||||
|
if not args.prompt and not args.interactive:
|
||||||
|
parser.print_help()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
2
mcpClient/requirements.txt
Normal file
2
mcpClient/requirements.txt
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
requests>=2.31.0
|
||||||
|
fastmcp>=0.9.0
|
||||||
39
mcpServer/.dockerignore
Normal file
39
mcpServer/.dockerignore
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
# Python
|
||||||
|
__pycache__/
|
||||||
|
*.py[cod]
|
||||||
|
*$py.class
|
||||||
|
*.so
|
||||||
|
.Python
|
||||||
|
*.egg-info/
|
||||||
|
dist/
|
||||||
|
build/
|
||||||
|
wheels/
|
||||||
|
|
||||||
|
# Virtual environments
|
||||||
|
venv/
|
||||||
|
.venv/
|
||||||
|
env/
|
||||||
|
ENV/
|
||||||
|
|
||||||
|
# Git
|
||||||
|
.git/
|
||||||
|
.gitignore
|
||||||
|
|
||||||
|
# IDE
|
||||||
|
.vscode/
|
||||||
|
.idea/
|
||||||
|
*.swp
|
||||||
|
*.swo
|
||||||
|
*~
|
||||||
|
|
||||||
|
# OS
|
||||||
|
.DS_Store
|
||||||
|
Thumbs.db
|
||||||
|
|
||||||
|
# Docker
|
||||||
|
Dockerfile
|
||||||
|
.dockerignore
|
||||||
|
docker-compose.yml
|
||||||
|
|
||||||
|
# Documentation
|
||||||
|
README.md
|
||||||
27
mcpServer/Dockerfile
Normal file
27
mcpServer/Dockerfile
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
# Use Python 3.12 slim image
|
||||||
|
FROM python:3.12-slim
|
||||||
|
|
||||||
|
# Set working directory
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
# Set environment variables
|
||||||
|
ENV PYTHONUNBUFFERED=1 \
|
||||||
|
PYTHONDONTWRITEBYTECODE=1 \
|
||||||
|
FASTMCP_HOST=0.0.0.0 \
|
||||||
|
FASTMCP_PORT=8000
|
||||||
|
|
||||||
|
# Copy requirements first for better caching
|
||||||
|
COPY requirements.txt .
|
||||||
|
|
||||||
|
# Install dependencies
|
||||||
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
|
|
||||||
|
# Copy application code
|
||||||
|
COPY modules/ ./modules/
|
||||||
|
COPY main.py .
|
||||||
|
|
||||||
|
# Expose port 8000 for the MCP server
|
||||||
|
EXPOSE 8000
|
||||||
|
|
||||||
|
# Run the MCP server
|
||||||
|
CMD ["python", "main.py"]
|
||||||
11
mcpServer/docker-compose.yml
Normal file
11
mcpServer/docker-compose.yml
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
services:
|
||||||
|
mcp-server:
|
||||||
|
build: .
|
||||||
|
container_name: cariddi-mcp-server
|
||||||
|
ports:
|
||||||
|
- "8000:8000"
|
||||||
|
environment:
|
||||||
|
- PYTHONUNBUFFERED=1
|
||||||
|
- FASTMCP_HOST=0.0.0.0
|
||||||
|
- FASTMCP_PORT=8000
|
||||||
|
restart: unless-stopped
|
||||||
41
mcpServer/main.py
Normal file
41
mcpServer/main.py
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
import os
|
||||||
|
from mcp.server.fastmcp import FastMCP
|
||||||
|
from modules.filesystem import internal_listFiles, internal_readFile, internal_writeFile, internal_executeCommand, internal_writePythonFile
|
||||||
|
|
||||||
|
mcpServer = FastMCP(
|
||||||
|
"Cariddi",
|
||||||
|
host=os.getenv("FASTMCP_HOST", os.getenv("MCP_HOST", "0.0.0.0")),
|
||||||
|
port=int(os.getenv("FASTMCP_PORT", os.getenv("MCP_PORT", "8000"))),
|
||||||
|
)
|
||||||
|
|
||||||
|
@mcpServer.tool()
|
||||||
|
def listFiles(path: str) -> list[str]:
|
||||||
|
"""List all files in the given path"""
|
||||||
|
return internal_listFiles(path)
|
||||||
|
|
||||||
|
@mcpServer.tool()
|
||||||
|
def readFile(path: str) -> str:
|
||||||
|
"""Read the contents of a file"""
|
||||||
|
return internal_readFile(path)
|
||||||
|
|
||||||
|
@mcpServer.tool()
|
||||||
|
def writeFile(path: str, content: str) -> bool:
|
||||||
|
"""Write the contents of a file"""
|
||||||
|
return internal_writeFile(path, content)
|
||||||
|
|
||||||
|
@mcpServer.tool()
|
||||||
|
def executeCommand(command: str) -> dict:
|
||||||
|
"""Execute a command"""
|
||||||
|
return internal_executeCommand(command)
|
||||||
|
|
||||||
|
@mcpServer.tool()
|
||||||
|
def writePythonFile(path: str, content: str) -> str:
|
||||||
|
"""Write a Python file handling streaming and escape characters correctly."""
|
||||||
|
return internal_writePythonFile(path, content)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
try:
|
||||||
|
mcpServer.run(transport="streamable-http")
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("Server stopped by user")
|
||||||
7
mcpServer/mcp.json
Normal file
7
mcpServer/mcp.json
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
{
|
||||||
|
"mcpServers": {
|
||||||
|
"cariddi": {
|
||||||
|
"url": "http://localhost:8000/mcp"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
61
mcpServer/modules/filesystem.py
Normal file
61
mcpServer/modules/filesystem.py
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
import os
|
||||||
|
from typing import List
|
||||||
|
import subprocess
|
||||||
|
def internal_listFiles(path: str) -> List[str]:
|
||||||
|
"""List all files in the given path"""
|
||||||
|
if os.path.exists(path) and os.path.isdir(path):
|
||||||
|
result = os.listdir(path)
|
||||||
|
return result
|
||||||
|
print(f"Path does not exist or is not a directory: {path}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
def internal_readFile(path: str) -> str:
|
||||||
|
"""Read the contents of a file"""
|
||||||
|
if os.path.exists(path) and os.path.isfile(path):
|
||||||
|
try:
|
||||||
|
with open(path, "r", encoding="utf-8") as f:
|
||||||
|
content = f.read()
|
||||||
|
return content
|
||||||
|
except Exception as e:
|
||||||
|
error_msg = f"Error reading file: {str(e)}"
|
||||||
|
return error_msg
|
||||||
|
return ""
|
||||||
|
|
||||||
|
def internal_writeFile(path: str, content: str) -> bool:
|
||||||
|
"""Write the contents of a file"""
|
||||||
|
try:
|
||||||
|
with open(path, "w", encoding="utf-8") as f:
|
||||||
|
f.write(content)
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error writing file: {str(e)}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def internal_executeCommand(command: str) -> str:
|
||||||
|
"""Execute a command"""
|
||||||
|
try:
|
||||||
|
result = subprocess.run(command, shell=True, capture_output=True, text=True)
|
||||||
|
output = {
|
||||||
|
"stderr": result.stderr,
|
||||||
|
"stdout": result.stdout,
|
||||||
|
"returncode": result.returncode
|
||||||
|
}
|
||||||
|
return output
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error executing command: {str(e)}")
|
||||||
|
return ""
|
||||||
|
|
||||||
|
def internal_writePythonFile(path: str, content: str) -> str:
|
||||||
|
"""Write a Python file handling streaming and escape characters correctly."""
|
||||||
|
content = content.encode('utf-8').decode('unicode_escape') if '\\n' in content else content
|
||||||
|
if "```python" in content:
|
||||||
|
content = content.split("```python")[1].split("```")[0].strip()
|
||||||
|
elif "```" in content:
|
||||||
|
content = content.split("```")[1].split("```")[0].strip()
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(path, "w", encoding="utf-8") as f:
|
||||||
|
f.write(content)
|
||||||
|
return f"File saved correctly in {path}"
|
||||||
|
except Exception as e:
|
||||||
|
return f"Error: {str(e)}"
|
||||||
1
mcpServer/requirements.txt
Normal file
1
mcpServer/requirements.txt
Normal file
@@ -0,0 +1 @@
|
|||||||
|
mcp[cli]>=1.25.0
|
||||||
Reference in New Issue
Block a user