Compare commits

5 Commits

Author SHA1 Message Date
1909bc3954 Updated readme 2026-02-12 22:53:34 +01:00
19fae10e5f Added chall and some stuff 2026-02-12 22:52:30 +01:00
fe21c6b790 Modified client 2026-02-12 22:00:08 +01:00
58d09ebc35 Add heartbeat 2026-02-12 21:59:47 +01:00
Schrody
28169f48bc Changed readme 2026-02-12 17:51:42 +01:00
10 changed files with 951 additions and 516 deletions

321
README.md
View File

@@ -1,353 +1,136 @@
# Cariddi - MCP Client and Server # Cariddi MCP Client and Server
A complete MCP (Model Context Protocol) solution consisting of: Complete MCP (Model Context Protocol) stack for CTF and reverse engineering workflows:
- **Cariddi Server**: A FastMCP server with filesystem tools for file operations and command execution
- **Cariddi Client**: A Python client that uses Ollama models for inference and connects to MCP servers, specialized as a Crypto Solver Agent for CTF challenges
## Project Structure
```
Cariddi/
├── mcpServer/ # MCP Server implementation
│ ├── main.py # FastMCP server entry point
│ ├── modules/
│ │ └── filesystem.py # Filesystem operation implementations
│ ├── requirements.txt
│ ├── Dockerfile
│ ├── docker-compose.yml
│ └── mcp.json # MCP server configuration
├── mcpClient/ # MCP Client with Ollama
│ ├── mcpClient.py # Main client implementation
│ └── requirements.txt
└── challs/ # CTF challenges
└── cryptoEasy/
├── challenge.py
└── cryptoeasy.txt
```
--- ---
## Cariddi Server ## Cariddi Server
A FastMCP server that provides filesystem tools for file operations, command execution, and Python file writing with proper handling of escape characters. FastMCP server exposing filesystem and execution tools, with correct handling of escape characters when writing Python files.
### Server Setup ### Setup
1. Navigate to the server directory:
```bash ```bash
cd Cariddi cd Cariddi
```
2. Create and activate virtual environment:
```bash
python3 -m venv venv python3 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate source venv/bin/activate # Windows: venv\Scripts\activate
```
3. Install dependencies:
```bash
pip install -r requirements.txt pip install -r requirements.txt
``` ```
### Running the Server ### Run
```bash ```bash
source venv/bin/activate # On Windows: venv\Scripts\activate source venv/bin/activate
python main.py python main.py
``` ```
The server will start on `http://0.0.0.0:8000/mcp` with streamable-http transport. Server listens on `http://0.0.0.0:8000/mcp` (streamable HTTP).
### Environment Variables ### Environment
- `FASTMCP_HOST` or `MCP_HOST`: Server host (default: `0.0.0.0`) - `FASTMCP_HOST` / `MCP_HOST`: host (default `0.0.0.0`)
- `FASTMCP_PORT` or `MCP_PORT`: Server port (default: `8000`) - `FASTMCP_PORT` / `MCP_PORT`: port (default `8000`)
### Using MCP Inspector ### MCP Inspector
The MCP Inspector is a visual tool for testing and debugging MCP servers. With the server running:
#### Install and Run Inspector
1. Make sure your server is running (see above)
2. Run the inspector to connect to your server:
```bash ```bash
npx @modelcontextprotocol/inspector --url http://localhost:8000/mcp npx @modelcontextprotocol/inspector --url http://localhost:8000/mcp
``` ```
The inspector will open in your browser (typically at `http://localhost:5173`). Use transport **Streamable HTTP** and URL `http://localhost:8000/mcp`.
#### Alternative: Run Inspector with Server Or run inspector and server together:
You can also run the inspector and server together:
```bash ```bash
npx @modelcontextprotocol/inspector python main.py npx @modelcontextprotocol/inspector python main.py
``` ```
#### Setup MCP Inspector
Use "Streamable HTTP" With Compose:
URL: `http://localhost:8000/mcp`
and press connect.
### Docker Deployment
#### Build and Run with Docker
1. Navigate to the server directory:
```bash
cd Cariddi
```
2. Build the Docker image:
```bash
docker build -t cariddi-mcp-server .
```
3. Run the container:
```bash
docker run -d -p 8000:8000 --name cariddi-mcp cariddi-mcp-server
```
#### Using Docker Compose
1. Navigate to the server directory:
```bash
cd Cariddi
```
2. Start the server:
```bash ```bash
docker-compose up -d docker-compose up -d
``` ```
3. View logs:
```bash
docker-compose logs -f
```
4. Stop the server:
```bash
docker-compose down
```
The server will be accessible at `http://localhost:8000/mcp` from your host machine.
### Server Tools
The server provides the following tools:
- **`listFiles(path: str)`** - List all files in the given path
- **`readFile(path: str)`** - Read the contents of a file
- **`writeFile(path: str, content: str)`** - Write contents to a file
- **`executeCommand(command: str)`** - Execute a shell command and return stdout, stderr, and return code
- **`writePythonFile(path: str, content: str)`** - Write a Python file handling streaming and escape characters correctly (handles code blocks and unicode escapes)
---
## Cariddi Client ## Cariddi Client
A Python MCP client that uses Ollama models for inference. The client is specialized as a **Crypto Solver Agent** for CTF (Capture The Flag) challenges, capable of identifying, analyzing, and solving cryptographic challenges. Python MCP client that talks to Ollama and connects to MCP servers. Configured as a **Crypto Solver Agent** for CTF crypto challenges.
### Client Requirements ### Requirements
- Python 3.7+ - Python 3.7+
- Ollama installed and running (see https://ollama.ai/) - [Ollama](https://ollama.ai/) installed and running
### Client Installation ### Install
1. Navigate to the client directory:
```bash ```bash
cd CariddiClient cd CariddiClient
```
2. Install Python dependencies:
```bash
pip install -r requirements.txt pip install -r requirements.txt
```
3. Make sure Ollama is running:
```bash
ollama serve ollama serve
ollama pull ministral-3 # or llama3.2
``` ```
4. Pull a model (if you haven't already): ### Usage
```bash
ollama pull ministral-3
# or
ollama pull llama3.2
```
### Client Usage
#### List available models
```bash ```bash
# List models
python mcpClient.py --list-models python mcpClient.py --list-models
```
#### Send a single prompt # Single prompt
```bash
python mcpClient.py --prompt "What is the capital of France?" python mcpClient.py --prompt "What is the capital of France?"
```
#### Interactive mode # Interactive
```bash
python mcpClient.py --interactive python mcpClient.py --interactive
```
#### Custom Ollama URL and model # Custom Ollama and model
```bash
python mcpClient.py --base-url http://localhost:11434 --model ministral-3 --prompt "Hello!" python mcpClient.py --base-url http://localhost:11434 --model ministral-3 --prompt "Hello!"
```
#### Connect to MCP server (streamable HTTP) # Connect to MCP server (streamable HTTP)
```bash
# Connect to MCP server via streamable HTTP
python mcpClient.py --mcp-server "http://localhost:8000/mcp" --prompt "Use tools to help me" python mcpClient.py --mcp-server "http://localhost:8000/mcp" --prompt "Use tools to help me"
python mcpClient.py --mcp-server "http://localhost:8000/mcp" --interactive
# With authentication headers # With auth headers
python mcpClient.py --mcp-server "http://localhost:8000/mcp" --mcp-headers '{"Authorization": "Bearer token"}' --interactive python mcpClient.py --mcp-server "http://localhost:8000/mcp" --mcp-headers '{"Authorization": "Bearer token"}' --interactive
``` ```
### Client Examples ### Defaults
```bash - Ollama: `http://localhost:11434`
# Simple question - Model: `ministral-3`
python mcpClient.py --prompt "Explain quantum computing in simple terms" - MCP Server: `http://localhost:8000/mcp`
# Interactive chat
python mcpClient.py -i
# Use a different model
python mcpClient.py --model mistral --prompt "Write a haiku about coding"
```
### Client Features
- Connects to local or remote Ollama instances
- Supports chat and generation modes
- **Connect to MCP servers** and use their tools automatically
- Tool registration for extensibility
- Interactive and non-interactive modes
- Health checking for Ollama server
- Automatic tool calling from MCP server tools
- **Specialized Crypto Solver Agent** with built-in knowledge for CTF challenges
### Crypto Solver Agent ### Crypto Solver Agent
The client is configured as a specialized Crypto Solver Agent that: The client is tuned to:
1. **Exploration**: Lists files in `/tmp` directory to identify relevant challenge files 1. **Explore**: List files (e.g. in `/tmp`) to find challenge files.
2. **Analysis**: Identifies cryptographic schemes (RSA, AES, DES, XOR, etc.) and vulnerabilities 2. **Analyze**: Recognize crypto (RSA, AES, DES, XOR, encodings) and typical weaknesses.
3. **Execution**: Writes and executes Python scripts to solve challenges 3. **Execute**: Write and run Python scripts to recover keys or plaintext.
4. **Validation**: Searches for flags in the format `flag{...}` 4. **Validate**: Look for flags in the form `flag{...}`.
The agent can handle: Covered areas: RSA (small modulus, low exponent, Wiener, Hastad, common modulus), symmetric (AES/DES, ECB/CBC, IV/key reuse), classical ciphers, Base64/Hex/endianness.
- **RSA**: Small modulus factorization, low public exponent attacks, Wiener attack, Hastad attack, common modulus attacks
- **Symmetric Encryption**: AES/DES with various modes (ECB, CBC), IV vulnerabilities, key reuse
- **Classical Ciphers**: Frequency analysis, fixed-key attacks
- **Encoding**: Base64, Hex, Big-Endian/Little-Endian conversions
### Connecting to an MCP Server
The client uses **FastMCP** to connect to an existing MCP server via **streamable HTTP**. Once connected, the client:
1. Automatically loads available tools from the MCP server
2. Passes them to Ollama as usable tools
3. Executes tools when requested by the model
4. Returns results to the model to continue the conversation
#### Example with MCP Server
```bash
# Connect to an MCP server via streamable HTTP
python mcpClient.py --mcp-server "http://localhost:8000/mcp" --interactive
# With authentication headers
python mcpClient.py --mcp-server "http://localhost:8000/mcp" --mcp-headers '{"Authorization": "Bearer your-token"}' --prompt "Use your tools"
```
### Default Configuration
- **Default Ollama URL**: `http://localhost:11434`
- **Default Model**: `ministral-3`
- **Default MCP Server**: `http://localhost:8000/mcp`
---
## Complete Workflow Example
### 1. Start the MCP Server
```bash
cd Cariddi
python main.py
```
The server will start on `http://localhost:8000/mcp`.
### 2. Run the Client and Connect to the Server
In another terminal:
```bash
cd CariddiClient
python mcpClient.py --mcp-server "http://localhost:8000/mcp" --interactive
```
### 3. Use the Crypto Solver Agent
The client will automatically discover and use the server's tools (like `listFiles`, `readFile`, `writeFile`, `executeCommand`, `writePythonFile`) through Ollama. You can ask it to solve CTF challenges:
```
You: Analyze the files in /tmp and solve the crypto challenge
```
The agent will:
- List files in `/tmp`
- Read relevant files
- Analyze the cryptographic scheme
- Write and execute Python scripts to solve the challenge
- Return the flag
---
## CTF Challenges ## CTF Challenges
The `challs/` directory contains CTF challenges for testing the Crypto Solver Agent: - **cryptoEasy**: DiffieHellman + AES encryption challenge (in `challs/cryptoEasy/`).
- **cryptoEasy**: A Diffie-Hellman based challenge with AES encryption
--- ---
## Development ## Candidate MCP Servers
### Server Development Other MCP servers you can combine with Cariddi or use in similar workflows (reverse engineering, binary analysis, malware analysis, shell execution):
The server is built using FastMCP and provides filesystem operations. To add new tools:
1. Implement the tool function in `modules/filesystem.py`
2. Register it as an MCP tool in `main.py` using `@mcpServer.tool()`
### Client Development
The client uses FastMCP for server communication and Ollama for inference. To modify the agent's behavior:
1. Edit the system prompt in `mcpClient.py` (line 248)
2. Add custom tools using `registerTool()` method
3. Modify the tool execution logic in `_executeTool()` method
| Project | Description |
|--------|-------------|
| [radare2-mcp](https://github.com/radareorg/radare2-mcp) | MCP stdio server for radare2 binary analysis with r2, r2pipe, optional raw r2 commands. |
| [headless-ida-mcp-server](https://github.com/cnitlrt/headless-ida-mcp-server) | Headless IDA Pro MCP server analyze binaries via IDAs headless mode (idat). |
| [MalwareAnalyzerMCP](https://github.com/abdessamad-elamrani/malwareanalyzermcp) | MCP server for malware analysis `file`, `strings`, `hexdump`, `objdump`, `xxd`, shell commands with timeouts. |
| [GhidrAssistMCP](https://github.com/jtang613/ghidrassistmcp) | Ghidra MCP extension 34 tools, resources, prompts for reverse engineering (decompile, xrefs, structs, etc.). |
| [shell-exec-mcp](https://github.com/domdomegg/shell-exec-mcp) | MCP server for shell command execution run bash commands with optional timeout and background jobs. |
| [ida-pro-mcp](https://github.com/mrexodia/ida-pro-mcp) | IDA Pro MCP bridge AI-assisted reversing in IDA (decompile, disasm, xrefs, types, debugger extension). |
| [desktop-commander](https://github.com/wonderwhy-er/DesktopCommanderMCP) | TBD
--- ---
## License
[Add your license information here]
---
## Contributing
[Add contributing guidelines here]

249
challs/cryptoMedium/aes.py Normal file
View File

@@ -0,0 +1,249 @@
"""
This is a slightly modified version of BoppreH's AES implementation found at at https://github.com/boppreh/AES
Follow the original disclaimer
__________________________________
This is an exercise in secure symmetric-key encryption, implemented in pure
Python (no external libraries needed).
Original AES-128 implementation by Bo Zhu (http://about.bozhu.me) at
https://github.com/bozhu/AES-Python . PKCS#7 padding, CBC mode, PKBDF2, HMAC,
byte array and string support added by me at https://github.com/boppreh/aes.
Other block modes contributed by @righthandabacus.
Although this is an exercise, the `encrypt` and `decrypt` functions should
provide reasonable security to encrypted messages.
"""
s_box = (
0x63, 0x7C, 0x77, 0x7B, 0xF2, 0x6B, 0x6F, 0xC5, 0x30, 0x01, 0x67, 0x2B, 0xFE, 0xD7, 0xAB, 0x76,
0xCA, 0x82, 0xC9, 0x7D, 0xFA, 0x59, 0x47, 0xF0, 0xAD, 0xD4, 0xA2, 0xAF, 0x9C, 0xA4, 0x72, 0xC0,
0xB7, 0xFD, 0x93, 0x26, 0x36, 0x3F, 0xF7, 0xCC, 0x34, 0xA5, 0xE5, 0xF1, 0x71, 0xD8, 0x31, 0x15,
0x04, 0xC7, 0x23, 0xC3, 0x18, 0x96, 0x05, 0x9A, 0x07, 0x12, 0x80, 0xE2, 0xEB, 0x27, 0xB2, 0x75,
0x09, 0x83, 0x2C, 0x1A, 0x1B, 0x6E, 0x5A, 0xA0, 0x52, 0x3B, 0xD6, 0xB3, 0x29, 0xE3, 0x2F, 0x84,
0x53, 0xD1, 0x00, 0xED, 0x20, 0xFC, 0xB1, 0x5B, 0x6A, 0xCB, 0xBE, 0x39, 0x4A, 0x4C, 0x58, 0xCF,
0xD0, 0xEF, 0xAA, 0xFB, 0x43, 0x4D, 0x33, 0x85, 0x45, 0xF9, 0x02, 0x7F, 0x50, 0x3C, 0x9F, 0xA8,
0x51, 0xA3, 0x40, 0x8F, 0x92, 0x9D, 0x38, 0xF5, 0xBC, 0xB6, 0xDA, 0x21, 0x10, 0xFF, 0xF3, 0xD2,
0xCD, 0x0C, 0x13, 0xEC, 0x5F, 0x97, 0x44, 0x17, 0xC4, 0xA7, 0x7E, 0x3D, 0x64, 0x5D, 0x19, 0x73,
0x60, 0x81, 0x4F, 0xDC, 0x22, 0x2A, 0x90, 0x88, 0x46, 0xEE, 0xB8, 0x14, 0xDE, 0x5E, 0x0B, 0xDB,
0xE0, 0x32, 0x3A, 0x0A, 0x49, 0x06, 0x24, 0x5C, 0xC2, 0xD3, 0xAC, 0x62, 0x91, 0x95, 0xE4, 0x79,
0xE7, 0xC8, 0x37, 0x6D, 0x8D, 0xD5, 0x4E, 0xA9, 0x6C, 0x56, 0xF4, 0xEA, 0x65, 0x7A, 0xAE, 0x08,
0xBA, 0x78, 0x25, 0x2E, 0x1C, 0xA6, 0xB4, 0xC6, 0xE8, 0xDD, 0x74, 0x1F, 0x4B, 0xBD, 0x8B, 0x8A,
0x70, 0x3E, 0xB5, 0x66, 0x48, 0x03, 0xF6, 0x0E, 0x61, 0x35, 0x57, 0xB9, 0x86, 0xC1, 0x1D, 0x9E,
0xE1, 0xF8, 0x98, 0x11, 0x69, 0xD9, 0x8E, 0x94, 0x9B, 0x1E, 0x87, 0xE9, 0xCE, 0x55, 0x28, 0xDF,
0x8C, 0xA1, 0x89, 0x0D, 0xBF, 0xE6, 0x42, 0x68, 0x41, 0x99, 0x2D, 0x0F, 0xB0, 0x54, 0xBB, 0x16,
)
inv_s_box = (
0x52, 0x09, 0x6A, 0xD5, 0x30, 0x36, 0xA5, 0x38, 0xBF, 0x40, 0xA3, 0x9E, 0x81, 0xF3, 0xD7, 0xFB,
0x7C, 0xE3, 0x39, 0x82, 0x9B, 0x2F, 0xFF, 0x87, 0x34, 0x8E, 0x43, 0x44, 0xC4, 0xDE, 0xE9, 0xCB,
0x54, 0x7B, 0x94, 0x32, 0xA6, 0xC2, 0x23, 0x3D, 0xEE, 0x4C, 0x95, 0x0B, 0x42, 0xFA, 0xC3, 0x4E,
0x08, 0x2E, 0xA1, 0x66, 0x28, 0xD9, 0x24, 0xB2, 0x76, 0x5B, 0xA2, 0x49, 0x6D, 0x8B, 0xD1, 0x25,
0x72, 0xF8, 0xF6, 0x64, 0x86, 0x68, 0x98, 0x16, 0xD4, 0xA4, 0x5C, 0xCC, 0x5D, 0x65, 0xB6, 0x92,
0x6C, 0x70, 0x48, 0x50, 0xFD, 0xED, 0xB9, 0xDA, 0x5E, 0x15, 0x46, 0x57, 0xA7, 0x8D, 0x9D, 0x84,
0x90, 0xD8, 0xAB, 0x00, 0x8C, 0xBC, 0xD3, 0x0A, 0xF7, 0xE4, 0x58, 0x05, 0xB8, 0xB3, 0x45, 0x06,
0xD0, 0x2C, 0x1E, 0x8F, 0xCA, 0x3F, 0x0F, 0x02, 0xC1, 0xAF, 0xBD, 0x03, 0x01, 0x13, 0x8A, 0x6B,
0x3A, 0x91, 0x11, 0x41, 0x4F, 0x67, 0xDC, 0xEA, 0x97, 0xF2, 0xCF, 0xCE, 0xF0, 0xB4, 0xE6, 0x73,
0x96, 0xAC, 0x74, 0x22, 0xE7, 0xAD, 0x35, 0x85, 0xE2, 0xF9, 0x37, 0xE8, 0x1C, 0x75, 0xDF, 0x6E,
0x47, 0xF1, 0x1A, 0x71, 0x1D, 0x29, 0xC5, 0x89, 0x6F, 0xB7, 0x62, 0x0E, 0xAA, 0x18, 0xBE, 0x1B,
0xFC, 0x56, 0x3E, 0x4B, 0xC6, 0xD2, 0x79, 0x20, 0x9A, 0xDB, 0xC0, 0xFE, 0x78, 0xCD, 0x5A, 0xF4,
0x1F, 0xDD, 0xA8, 0x33, 0x88, 0x07, 0xC7, 0x31, 0xB1, 0x12, 0x10, 0x59, 0x27, 0x80, 0xEC, 0x5F,
0x60, 0x51, 0x7F, 0xA9, 0x19, 0xB5, 0x4A, 0x0D, 0x2D, 0xE5, 0x7A, 0x9F, 0x93, 0xC9, 0x9C, 0xEF,
0xA0, 0xE0, 0x3B, 0x4D, 0xAE, 0x2A, 0xF5, 0xB0, 0xC8, 0xEB, 0xBB, 0x3C, 0x83, 0x53, 0x99, 0x61,
0x17, 0x2B, 0x04, 0x7E, 0xBA, 0x77, 0xD6, 0x26, 0xE1, 0x69, 0x14, 0x63, 0x55, 0x21, 0x0C, 0x7D,
)
def sub_bytes(s):
for i in range(4):
for j in range(4):
s[i][j] = s_box[s[i][j]]
def inv_sub_bytes(s):
for i in range(4):
for j in range(4):
s[i][j] = inv_s_box[s[i][j]]
def shift_rows(s):
s[0][1], s[1][1], s[2][1], s[3][1] = s[1][1], s[2][1], s[3][1], s[0][1]
s[0][2], s[1][2], s[2][2], s[3][2] = s[2][2], s[3][2], s[0][2], s[1][2]
s[0][3], s[1][3], s[2][3], s[3][3] = s[3][3], s[0][3], s[1][3], s[2][3]
def inv_shift_rows(s):
s[0][1], s[1][1], s[2][1], s[3][1] = s[3][1], s[0][1], s[1][1], s[2][1]
s[0][2], s[1][2], s[2][2], s[3][2] = s[2][2], s[3][2], s[0][2], s[1][2]
s[0][3], s[1][3], s[2][3], s[3][3] = s[1][3], s[2][3], s[3][3], s[0][3]
def add_round_key(s, k):
for i in range(4):
for j in range(4):
s[i][j] ^= k[i][j]
# learned from http://cs.ucsb.edu/~koc/cs178/projects/JT/aes.c
xtime = lambda a: (((a << 1) ^ 0x1B) & 0xFF) if (a & 0x80) else (a << 1)
def mix_single_column(a):
# see Sec 4.1.2 in The Design of Rijndael
t = a[0] ^ a[1] ^ a[2] ^ a[3]
u = a[0]
a[0] ^= t ^ xtime(a[0] ^ a[1])
a[1] ^= t ^ xtime(a[1] ^ a[2])
a[2] ^= t ^ xtime(a[2] ^ a[3])
a[3] ^= t ^ xtime(a[3] ^ u)
def mix_columns(s):
for i in range(4):
mix_single_column(s[i])
def inv_mix_columns(s):
# see Sec 4.1.3 in The Design of Rijndael
for i in range(4):
u = xtime(xtime(s[i][0] ^ s[i][2]))
v = xtime(xtime(s[i][1] ^ s[i][3]))
s[i][0] ^= u
s[i][1] ^= v
s[i][2] ^= u
s[i][3] ^= v
mix_columns(s)
r_con = (
0x00, 0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40,
0x80, 0x1B, 0x36, 0x6C, 0xD8, 0xAB, 0x4D, 0x9A,
0x2F, 0x5E, 0xBC, 0x63, 0xC6, 0x97, 0x35, 0x6A,
0xD4, 0xB3, 0x7D, 0xFA, 0xEF, 0xC5, 0x91, 0x39,
)
def bytes2matrix(text):
""" Converts a 16-byte array into a 4x4 matrix. """
return [list(text[i:i+4]) for i in range(0, len(text), 4)]
def matrix2bytes(matrix):
""" Converts a 4x4 matrix into a 16-byte array. """
return bytes(sum(matrix, []))
def xor_bytes(a, b):
""" Returns a new byte array with the elements xor'ed. """
return bytes(i^j for i, j in zip(a, b))
def inc_bytes(a):
""" Returns a new byte array with the value increment by 1 """
out = list(a)
for i in reversed(range(len(out))):
if out[i] == 0xFF:
out[i] = 0
else:
out[i] += 1
break
return bytes(out)
def split_blocks(message, block_size=16, require_padding=True):
assert len(message) % block_size == 0 or not require_padding
return [message[i:i+16] for i in range(0, len(message), block_size)]
class AES:
"""
Class for AES-128
"""
rounds_by_key_size = {16: 10, 24: 12, 32: 14}
def __init__(self, master_key, rounds = None):
"""
Initializes the object with a given key.
"""
assert len(master_key) in AES.rounds_by_key_size
if rounds is None:
self.n_rounds = AES.rounds_by_key_size[len(master_key)]
else:
self.n_rounds = rounds
self._key_matrices = self._expand_key(master_key)
def _expand_key(self, master_key):
"""
Expands and returns a list of key matrices for the given master_key.
"""
# Initialize round keys with raw key material.
key_columns = bytes2matrix(master_key)
iteration_size = len(master_key) // 4
# Each iteration has exactly as many columns as the key material.
columns_per_iteration = len(key_columns)
i = 1
while len(key_columns) < (self.n_rounds + 1) * 4:
# Copy previous word.
word = list(key_columns[-1])
# Perform schedule_core once every "row".
if len(key_columns) % iteration_size == 0:
# Circular shift.
word.append(word.pop(0))
# Map to S-BOX.
word = [s_box[b] for b in word]
# XOR with first byte of R-CON, since the others bytes of R-CON are 0.
word[0] ^= r_con[i]
i += 1
elif len(master_key) == 32 and len(key_columns) % iteration_size == 4:
# Run word through S-box in the fourth iteration when using a
# 256-bit key.
word = [s_box[b] for b in word]
# XOR with equivalent word from previous iteration.
word = xor_bytes(word, key_columns[-iteration_size])
key_columns.append(word)
# Group key words in 4x4 byte matrices.
return [key_columns[4*i : 4*(i+1)] for i in range(len(key_columns) // 4)]
def encrypt_block(self, plaintext):
"""
Encrypts a single block of 16 byte long plaintext.
"""
assert len(plaintext) == 16
plain_state = bytes2matrix(plaintext)
add_round_key(plain_state, self._key_matrices[0])
for i in range(1, self.n_rounds):
sub_bytes(plain_state)
shift_rows(plain_state)
mix_columns(plain_state)
add_round_key(plain_state, self._key_matrices[i])
sub_bytes(plain_state)
shift_rows(plain_state)
mix_columns(plain_state) # added mix_columns
add_round_key(plain_state, self._key_matrices[-1])
return matrix2bytes(plain_state)
def decrypt_block(self, ciphertext):
"""
Decrypts a single block of 16 byte long ciphertext.
"""
assert len(ciphertext) == 16
cipher_state = bytes2matrix(ciphertext)
add_round_key(cipher_state, self._key_matrices[-1])
inv_shift_rows(cipher_state)
inv_sub_bytes(cipher_state)
for i in range(self.n_rounds - 1, 0, -1):
add_round_key(cipher_state, self._key_matrices[i])
inv_mix_columns(cipher_state)
inv_shift_rows(cipher_state)
inv_sub_bytes(cipher_state)
add_round_key(cipher_state, self._key_matrices[0])
return matrix2bytes(cipher_state)

View File

@@ -0,0 +1,21 @@
from aes import AES
from secret import flag
import os
cipher = AES(flag, rounds = 1)
p = []
c = []
for _ in range(2):
plaintext = os.urandom(16)
p.append(plaintext.hex())
ciphertext = cipher.encrypt_block(plaintext)
c.append(ciphertext.hex())
print('plaintexts = ', p)
print('ciphertexts = ', c)
# plaintexts = ['ae976f8d95e5bd2eaf40a0efcec97e0d', 'e99c923581939d80e1cf25ee19c891e4']
# ciphertexts = ['ce3776b4dc0551217e856bdfb25251b3', '70439bebea8c21e2b8eb7bad1723656b']

View File

@@ -1,18 +1,118 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Simple MCP client that uses Ollama models for inference. MCP client that uses Ollama for inference and LangChain create_agent with
runtime-registered MCP tools (see https://docs.langchain.com/oss/python/langchain/agents#runtime-tool-registration).
""" """
from fastmcp.client.transports import NodeStdioTransport, PythonStdioTransport, SSETransport, StreamableHttpTransport
import json import json
import sys import sys
import os import os
import asyncio import asyncio
from typing import Optional, Dict, Any, List from pathlib import Path
from typing import Optional, Dict, Any, List, Callable, Awaitable
import requests import requests
from fastmcp import Client as FastMcpClient from fastmcp import Client as FastMcpClient
from ollama import ResponseError as OllamaResponseError
from pydantic import BaseModel, ConfigDict, Field, create_model
# LangChain agent and middleware
try:
from langchain.agents import create_agent
from langchain.agents.middleware import AgentMiddleware, ModelRequest, ModelResponse, ToolCallRequest
from langchain_core.tools import StructuredTool, tool
from langchain_ollama import ChatOllama
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, ToolMessage
except ImportError as e:
print(f"Missing dependency: {e}. Install with: pip install langchain langgraph langchain-community langchain-ollama", file=sys.stderr)
sys.exit(1)
@tool
def getTime() -> str:
"""Get the current time in ISO format."""
from datetime import datetime
return datetime.now().isoformat()
@tool
def countWords(text: str) -> int:
"""Count the number of words in a text."""
return len(text.split())
def loadMcpConfig(configPath: Optional[str] = None) -> Dict[str, str]:
"""Load MCP server URLs from mcp.json. Returns dict serverName -> url."""
if configPath is None:
# Default: mcpServer/mcp.json relative to project root or cwd
base = Path(__file__).resolve().parent.parent
configPath = str(base / "mcpServer" / "mcp.json")
path = Path(configPath)
if not path.exists():
return {}
try:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
except (json.JSONDecodeError, OSError) as e:
print(f"Warning: Could not load MCP config from {path}: {e}", file=sys.stderr)
return {}
servers = data.get("mcpServers") or data.get("mcp_servers") or {}
return {name: info.get("url", "") for name, info in servers.items() if isinstance(info, dict) and info.get("url")}
class GenericToolArgs(BaseModel):
"""Accept any keyword arguments for MCP tool calls (fallback when schema is missing)."""
model_config = ConfigDict(extra="allow")
def _jsonSchemaTypeToPython(jsonType: str) -> type:
"""Map JSON schema type to Python type."""
return {"string": str, "integer": int, "number": float, "boolean": bool, "array": list, "object": dict}.get(jsonType, str)
def _defaultForJsonType(jsonType: str) -> Any:
"""Sensible default for optional MCP params so server does not receive null."""
return {"string": "", "integer": 0, "number": 0.0, "boolean": False, "array": [], "object": {}}.get(jsonType, "")
def _defaultsFromInputSchema(inputSchema: Dict[str, Any]) -> Dict[str, Any]:
"""Build default values for all params so we never send null to the MCP server (LLM may omit required params)."""
if not inputSchema:
return {}
properties = inputSchema.get("properties") or {}
out: Dict[str, Any] = {}
for name, spec in properties.items():
if not isinstance(spec, dict):
continue
if "default" in spec:
out[name] = spec["default"]
else:
out[name] = _defaultForJsonType(spec.get("type", "string"))
return out
def buildArgsSchemaFromMcpInputSchema(toolName: str, inputSchema: Dict[str, Any]) -> type[BaseModel]:
"""Build a Pydantic model from MCP tool inputSchema so the LLM gets exact parameter names (path, content, etc.)."""
if not inputSchema:
return GenericToolArgs
properties = inputSchema.get("properties") or {}
required = set(inputSchema.get("required") or [])
if not properties:
return GenericToolArgs
fields: Dict[str, Any] = {}
for name, spec in properties.items():
if not isinstance(spec, dict):
continue
desc = spec.get("description", "")
jsonType = spec.get("type", "string")
pyType = _jsonSchemaTypeToPython(jsonType)
if name in required:
fields[name] = (pyType, Field(..., description=desc))
else:
fields[name] = (Optional[pyType], Field(None, description=desc))
if not fields:
return GenericToolArgs
return create_model(f"McpArgs_{toolName}", **fields)
class OllamaClient: class OllamaClient:
@@ -48,7 +148,7 @@ class OllamaClient:
response = requests.post( response = requests.post(
f"{self.baseUrl}/api/chat", f"{self.baseUrl}/api/chat",
json=payload, json=payload,
timeout=60*60 timeout=60*60*60
) )
response.raise_for_status() response.raise_for_status()
data = response.json() data = response.json()
@@ -176,249 +276,317 @@ class McpServerWrapper:
return [] return []
class OllamaMcpClient: def _serializeToolResult(result: Any) -> Any:
"""Simple MCP client that uses Ollama for inference."""
def __init__(self, ollamaClient: OllamaClient, mcpServer: Optional[McpServerWrapper] = None):
self.ollamaClient = ollamaClient
self.mcpServer = mcpServer
self.tools: List[Dict[str, Any]] = []
self.resources: List[Dict[str, Any]] = []
def _serializeToolResult(self, result: Any) -> Any:
"""Serialize tool result to JSON-serializable format.""" """Serialize tool result to JSON-serializable format."""
# Handle TextContent and other content objects if hasattr(result, "text"):
if hasattr(result, 'text'):
return result.text return result.text
if hasattr(result, 'content'): if hasattr(result, "content"):
content = result.content content = result.content
if hasattr(content, 'text'): if hasattr(content, "text"):
return content.text return content.text
return content return content
# Handle lists of content objects
if isinstance(result, list): if isinstance(result, list):
return [self._serializeToolResult(item) for item in result] return [_serializeToolResult(item) for item in result]
# Handle dicts
if isinstance(result, dict): if isinstance(result, dict):
return {k: self._serializeToolResult(v) for k, v in result.items()} return {k: _serializeToolResult(v) for k, v in result.items()}
# Already serializable (str, int, float, bool, None)
return result return result
async def _loadServerTools(self):
"""Load tools from connected MCP server.""" def _makeMcpToolCoroutine(
if self.mcpServer: toolName: str,
serverTools = await self.mcpServer.listServerTools() server: McpServerWrapper,
for tool in serverTools: defaultArgs: Dict[str, Any],
# Handle both Pydantic Tool objects and dicts toolTimeout: Optional[float] = None,
if hasattr(tool, "name"): ) -> Callable[..., Awaitable[Any]]:
# Pydantic Tool object - access attributes directly async def _invoke(**kwargs: Any) -> Any:
name = getattr(tool, "name", "") merged = {**defaultArgs, **kwargs}
description = getattr(tool, "description", "") # Strip None values - MCP server Zod schemas often reject null for optional params (expect number | undefined, not number | null)
# Try both camelCase and snake_case for inputSchema merged = {k: v for k, v in merged.items() if v is not None}
inputSchema = getattr(tool, "inputSchema", getattr(tool, "input_schema", {})) try:
if toolTimeout is not None and toolTimeout > 0:
result = await asyncio.wait_for(
server.callServerTool(toolName, merged),
timeout=toolTimeout,
)
else: else:
# Dict - use .get() result = await server.callServerTool(toolName, merged)
name = tool.get("name", "") except asyncio.TimeoutError:
description = tool.get("description", "") return (
inputSchema = tool.get("inputSchema", tool.get("input_schema", {})) f"[Tool timeout] '{toolName}' exceeded {toolTimeout}s. "
"The operation may have hung (e.g. command not found, subprocess blocking). "
"Try an alternative (e.g. 'python' instead of 'python3') or increase --tool-timeout."
)
return _serializeToolResult(result)
return _invoke
self.tools.append({
"name": name,
"description": description,
"inputSchema": inputSchema
})
def registerTool(self, name: str, description: str, parameters: Dict[str, Any]): async def buildMcpLangChainTools(
"""Register a tool that can be used by the model.""" mcpServers: List[McpServerWrapper],
self.tools.append({ toolTimeout: Optional[float] = None,
"name": name, ) -> List[StructuredTool]:
"description": description, """Build LangChain StructuredTools from connected MCP servers (runtime tool registration)."""
"inputSchema": { tools: List[StructuredTool] = []
"type": "object", for server in mcpServers:
"properties": parameters, rawTools = await server.listServerTools()
"required": list(parameters.keys()) for raw in rawTools:
} name = getattr(raw, "name", None) or (raw.get("name") if isinstance(raw, dict) else None)
}) description = getattr(raw, "description", None) or (raw.get("description", "") if isinstance(raw, dict) else "")
inputSchema = getattr(raw, "inputSchema", None) or getattr(raw, "input_schema", None) or (raw.get("inputSchema") or raw.get("input_schema") if isinstance(raw, dict) else None)
if not name:
continue
description = description or f"MCP tool: {name}"
schemaDict = inputSchema or {}
argsSchema = buildArgsSchemaFromMcpInputSchema(name, schemaDict)
defaultArgs = _defaultsFromInputSchema(schemaDict)
tool = StructuredTool.from_function(
name=name,
description=description,
args_schema=argsSchema,
coroutine=_makeMcpToolCoroutine(name, server, defaultArgs, toolTimeout),
)
tools.append(tool)
return tools
async def processRequest(self, prompt: str, context: Optional[List[str]] = None, maxIterations: int = 5) -> str:
"""Process a request using Ollama with optional context and tool support."""
messages = [
{
"role": "system",
"content": """Sei un Crypto Solver Agent specializzato in sfide CTF (Capture The Flag). Il tuo obiettivo primario è identificare, analizzare e risolvere sfide crittografiche memorizzate nella directory /tmp per recuperare la flag. REGOLE OPERATIVE: Esplorazione: Inizia sempre elencando i file presenti in /tmp. Identifica i file rilevanti come sorgenti Python (.py), output di testo (.txt), file cifrati o chiavi pubbliche/private (.pem, .pub). Analisi: Leggi i file trovati. Determina il tipo di crittografia coinvolta. Casi comuni: RSA: analizza parametri come n, e, c. Verifica se n è piccolo (fattorizzabile), se e è basso (attacco radice e-esima) o se ci sono vulnerabilità note (Wiener, Hastad, moduli comuni). Simmetrica (AES/DES): cerca la modalità (ECB, CBC), vulnerabilità nel IV, o riutilizzo della chiave. XOR/Cifrari Classici: esegui analisi delle frequenze o attacchi a chiave fissa. Encoding: gestisci correttamente Base64, Hex, Big-Endian/Little-Endian. Esecuzione: Scrivi ed esegui script Python per risolvere la sfida. Utilizza librerie come pycryptodome, gmpy2 o sympy se disponibili nell'ambiente. Non limitarti a spiegare la teoria: scrivi il codice necessario a produrre il plaintext. Validazione: Una volta decifrato il contenuto, cerca stringhe nel formato flag{...}. Se il risultato non è leggibile, rivaluta l'approccio e prova una strategia alternativa. REQUISITI DI OUTPUT: Fornisci una breve spiegazione della vulnerabilità trovata. Mostra il codice Python risolutivo che hai generato. Restituisci la flag finale in modo chiaramente visibile. LIMITI: Opera esclusivamente all'interno della directory /tmp. Non tentare di forzare la password di sistema; concentrati sulla logica crittografica. Se mancano dati (es. un file citato nel codice non è presente), chiedi esplicitamente o cercalo nelle sottocartelle di /tmp. Inizia ora analizzando il contenuto di /tmp."""
}
]
class LogToolCallsMiddleware(AgentMiddleware):
"""Middleware that logs every tool call (name and args)."""
def wrap_tool_call(self, request: ToolCallRequest, handler: Callable):
_logToolCallRequest(request)
return handler(request)
async def awrap_tool_call(self, request: ToolCallRequest, handler: Callable):
_logToolCallRequest(request)
return await handler(request)
def _extractTextFromAIMessageContent(content: Any) -> str:
"""Extract plain text from AIMessage.content (str or list of content blocks)."""
if content is None:
return ""
if isinstance(content, str):
return content.strip()
if isinstance(content, list):
parts: List[str] = []
for block in content:
if isinstance(block, dict) and "text" in block:
parts.append(str(block["text"]))
elif isinstance(block, str):
parts.append(block)
return "\n".join(parts).strip() if parts else ""
return str(content).strip()
def _extractFinalResponse(result: Dict[str, Any]) -> str:
"""Extract the final assistant text from agent result; handle recursion limit / no final message."""
messages = result.get("messages") or []
for msg in reversed(messages):
if isinstance(msg, AIMessage) and hasattr(msg, "content"):
text = _extractTextFromAIMessageContent(msg.content)
if text:
return text
return (
"Agent stopped without a final text response (e.g. hit step limit after tool calls). "
"Try again or increase --recursion-limit."
)
def _logToolCallRequest(request: ToolCallRequest) -> None:
tc = request.tool_call
name = tc.get("name") if isinstance(tc, dict) else getattr(tc, "name", None)
args = tc.get("args", tc.get("arguments", {})) if isinstance(tc, dict) else getattr(tc, "args", getattr(tc, "arguments", {}))
argsStr = json.dumps(args, ensure_ascii=False)
if len(argsStr) > 500:
argsStr = argsStr[:497] + "..."
print(f"[Tool Call] {name} args={argsStr}", file=sys.stderr)
class McpToolsMiddleware(AgentMiddleware):
"""Middleware that adds MCP tools at runtime and handles their execution (runtime tool registration)."""
def __init__(self, mcpTools: List[StructuredTool], staticToolNames: Optional[List[str]] = None):
self.mcpTools = mcpTools
self.mcpToolsByName = {t.name: t for t in mcpTools}
staticNames = set(staticToolNames or [])
self.validToolNames = staticNames | set(self.mcpToolsByName.keys())
def wrap_model_call(self, request: ModelRequest, handler: Callable) -> ModelResponse:
updated = request.override(tools=[*request.tools, *self.mcpTools])
return handler(updated)
async def awrap_model_call(self, request: ModelRequest, handler: Callable):
updated = request.override(tools=[*request.tools, *self.mcpTools])
return await handler(updated)
def _toolExists(self, name: Optional[str]) -> bool:
return bool(name and name in self.validToolNames)
def _unknownToolErrorToolMessage(self, request: ToolCallRequest, name: str) -> ToolMessage:
available = ", ".join(sorted(self.validToolNames))
content = (
f"[Error] Tool '{name}' does not exist. "
f"Only the following tools are available: {available}. "
"Do not call tools that are not in this list."
)
tc = request.tool_call
toolCallId = tc.get("id") if isinstance(tc, dict) else getattr(tc, "id", None)
return ToolMessage(
content=content,
tool_call_id=toolCallId or "unknown",
name=name or "unknown",
status="error",
)
def wrap_tool_call(self, request: ToolCallRequest, handler: Callable):
name = request.tool_call.get("name") if isinstance(request.tool_call, dict) else getattr(request.tool_call, "name", None)
if not self._toolExists(name):
return self._unknownToolErrorToolMessage(request, name or "<unknown>")
if name and name in self.mcpToolsByName:
return handler(request.override(tool=self.mcpToolsByName[name]))
return handler(request)
async def awrap_tool_call(self, request: ToolCallRequest, handler: Callable):
name = request.tool_call.get("name") if isinstance(request.tool_call, dict) else getattr(request.tool_call, "name", None)
if not self._toolExists(name):
return self._unknownToolErrorToolMessage(request, name or "<unknown>")
if name and name in self.mcpToolsByName:
return await handler(request.override(tool=self.mcpToolsByName[name]))
return await handler(request)
''' TODO Use this if you want sequential thinking
SYSTEM_PROMPT = """
ROLE:
Sei un esperto Analista di Cybersecurity specializzato in CTF (Capture The Flag) e analisi di vulnerabilità. Operi in un ambiente Linux sandbox dove la tua unica area di lavoro è la directory /tmp.
WORKSPACE CONSTRAINT: IL "SINGLE SOURCE OF TRUTH"
- Obbligo Assoluto: Tutte le operazioni di lettura, scrittura, download e analisi devono avvenire esclusivamente all'interno di /tmp.
- Percorsi: Ogni file deve essere referenziato con il percorso assoluto (es. /tmp/binary.bin). Non usare mai directory come ~/, /home o altre al di fuori di /tmp.
- Condivisione: Ricorda che /tmp è montata su tutti i container MCP (fetch, filesystem, ecc.). Se scarichi un file con fetch in /tmp, il tool filesystem lo troverà immediatamente lì.
TOOLSET & WORKFLOW:
Utilizza i tuoi tool secondo questa logica:
1. sequentialthinking (Pianificazione): Usa questo tool PRIMA di ogni azione complessa. Suddividi la sfida in step logici (es. 1. Download, 2. Analisi Header, 3. Estrazione Flag). Ti aiuta a non perdere il filo durante task lunghi.
2. fetch (Ingestion): Usalo per recuperare binari, exploit o dati remoti. Salva l'output sempre in /tmp.
3. filesystem (Manipolazione): Usalo per ispezionare i file scaricati, creare script di exploit o leggere file di log e flag direttamente in /tmp.
4. memory (Stato): Utilizza questo tool per memorizzare scoperte chiave, indirizzi di memoria, offset o password trovate durante la sfida. Ti serve per mantenere il contesto tra diverse fasi del ragionamento.
METODOLOGIA DI ANALISI:
- Ipotesi e Test: Prima di agire, formula un'ipotesi basata sui dati presenti in /tmp.
- Verifica Intermedia: Dopo ogni comando o modifica ai file, verifica il risultato usando il tool filesystem. Non dare mai per scontato che un'operazione sia riuscita senza controllare.
- Pulizia Mentale: Se una strategia fallisce, usa sequentialthinking per rivedere il piano e aggiorna il tool memory con il motivo del fallimento per non ripetere lo stesso errore.
REGOLE DI COMUNICAZIONE:
- Sii estremamente tecnico, sintetico e preciso.
- Se un file non è presente in /tmp, non provare a indovinarne il contenuto; usa fetch per ottenerlo o filesystem per cercarlo.
- Rispondi con l'output delle tue analisi e l'eventuale flag trovata nel formato richiesto dalla sfida.
"""
'''
SYSTEM_PROMPT = "ROLE:\nSei un esperto Analista di Cybersecurity specializzato in CTF (Capture The Flag) e analisi di vulnerabilità. Operi in un ambiente Linux sandbox dove la tua unica area di lavoro è la directory /tmp.\n\nWORKSPACE CONSTRAINT: IL \"SINGLE SOURCE OF TRUTH\"\n- Obbligo Assoluto: Tutte le operazioni di lettura, scrittura e analisi devono avvenire esclusivamente all'interno di /tmp.\n- Percorsi: Ogni file deve essere referenziato con il percorso assoluto (es. /tmp/binary.bin). Non usare mai directory esterne a /tmp.\n- Condivisione: /tmp è montata su tutti i container MCP. I file creati o modificati da un tool sono immediatamente visibili agli altri.\n\nSTRETTO DIVIETO DI ALLUCINAZIONE TOOL:\n- USA ESCLUSIVAMENTE I TOOL MCP FORNITI: 'memory', 'filesystem'.\n\nREGOLE DI COMUNICAZIONE:\n- Sii estremamente tecnico, sintetico e preciso.\n- Non fare mai riferimento a tool che non siano 'memory' o 'filesystem'."
class OllamaMcpClient:
"""MCP client that uses Ollama and LangChain create_agent with optional runtime MCP tools."""
def __init__(
self,
ollamaClient: OllamaClient,
mcpTools: Optional[List[StructuredTool]] = None,
systemPrompt: Optional[str] = None,
):
self.ollamaClient = ollamaClient
self.mcpTools = mcpTools or []
self.systemPrompt = systemPrompt or SYSTEM_PROMPT
staticTools: List[Any] = [getTime, countWords]
staticToolNames = [getTime.name, countWords.name]
middleware: List[AgentMiddleware] = [LogToolCallsMiddleware()]
if self.mcpTools:
middleware.append(McpToolsMiddleware(self.mcpTools, staticToolNames=staticToolNames))
model = ChatOllama(
base_url=ollamaClient.baseUrl,
model=ollamaClient.model,
temperature=0.1,
)
self.agent = create_agent(
model,
tools=staticTools,
middleware=middleware,
system_prompt=self.systemPrompt,
)
async def processRequest(self, prompt: str, context: Optional[List[str]] = None, recursionLimit: int = 50) -> str:
"""Process a request using the LangChain agent (ReAct loop with tools)."""
messages: List[Any] = [HumanMessage(content=prompt)]
if context: if context:
messages.append({ messages.insert(0, SystemMessage(content=f"Context:\n{chr(10).join(context)}"))
"role": "system", config: Dict[str, Any] = {"recursion_limit": recursionLimit}
"content": f"Context:\n{'\n\n'.join(context)}" toolParseRetryPrompt = (
}) "ATTENZIONE: Una chiamata write_file ha prodotto JSON non valido. "
"Quando scrivi file con codice Python: usa \\n per le newline nel JSON, escapa le virgolette con \\. "
if self.tools: "Non aggiungere parametri extra (es. overwrite). Usa edit_file per modifiche incrementali se il contenuto è lungo."
toolDescriptions = json.dumps(self.tools, indent=2) )
messages.append({
"role": "system",
"content": f"Available tools:\n{toolDescriptions}\n\nTo use a tool, respond with JSON: {{\"tool_name\": \"name\", \"tool_args\": {{...}}}}"
})
messages.append({
"role": "user",
"content": prompt
})
iteration = 0
while iteration < maxIterations:
response = self.ollamaClient.chat(messages)
# Check if response contains tool call
toolCall = self._parseToolCall(response)
if toolCall:
toolName = toolCall.get("tool_name")
toolArgs = toolCall.get("tool_args", {})
# Print agent intent (response before tool call)
print(f"\n[Agent Intent]: {response}", file=sys.stderr)
print(f"[Tool Call Detected]: {toolName} with arguments: {toolArgs}", file=sys.stderr)
# Try to call the tool
try: try:
print(f"[Executing Tool]: {toolName} with arguments: {toolArgs}", file=sys.stderr) result = await self.agent.ainvoke({"messages": messages}, config=config)
toolResult = await self._executeTool(toolName, toolArgs) except OllamaResponseError as e:
# Serialize tool result to JSON-serializable format errStr = str(e)
serializedResult = self._serializeToolResult(toolResult) if "error parsing tool call" in errStr:
print(f"[Tool Output]: {json.dumps(serializedResult, indent=2)}", file=sys.stderr) print(f"[Agent Error]: Tool call parse error, retrying with guidance: {errStr[:200]}...", file=sys.stderr)
messages.append({ retryMessages: List[Any] = [SystemMessage(content=toolParseRetryPrompt)]
"role": "assistant", retryMessages.extend(messages)
"content": response result = await self.agent.ainvoke({"messages": retryMessages}, config=config)
}) else:
messages.append({ print(f"[Agent Error]: {e}", file=sys.stderr)
"role": "user", raise
"content": f"Tool result: {json.dumps(serializedResult)}"
})
iteration += 1
continue
except Exception as e: except Exception as e:
print(f"[Tool Error]: {str(e)}", file=sys.stderr) print(f"[Agent Error]: {e}", file=sys.stderr)
messages.append({ raise
"role": "assistant", return _extractFinalResponse(result)
"content": response
})
messages.append({
"role": "user",
"content": f"Tool error: {str(e)}"
})
iteration += 1
continue
# No tool call, return response def listTools(self) -> List[str]:
print(f"\n[Agent Response (Final)]: {response}", file=sys.stderr) """List tool names (static + MCP)."""
return response names = [getTime.name, countWords.name]
names.extend(t.name for t in self.mcpTools)
return messages[-1].get("content", "Max iterations reached") return names
def _parseToolCall(self, response: str) -> Optional[Dict[str, Any]]:
"""Try to parse tool call from response."""
# Try to find JSON object in response
try:
# Look for JSON in response
startIdx = response.find("{")
endIdx = response.rfind("}") + 1
if startIdx >= 0 and endIdx > startIdx:
jsonStr = response[startIdx:endIdx]
parsed = json.loads(jsonStr)
if "tool_name" in parsed:
return parsed
except:
pass
return None
async def _executeTool(self, toolName: str, toolArgs: Dict[str, Any]) -> Any:
"""Execute a tool - either from server or local."""
# First check if it's a server tool
if self.mcpServer:
# Check if tool exists in server tools
for tool in self.mcpServer.serverTools:
# Handle both Pydantic Tool objects and dicts
tool_name = getattr(tool, "name", None) if hasattr(tool, "name") else tool.get("name") if isinstance(tool, dict) else None
if tool_name == toolName:
return await self.mcpServer.callServerTool(toolName, toolArgs)
# Check local tools
if toolName == "get_time":
from datetime import datetime
return datetime.now().isoformat()
elif toolName == "count_words":
text = toolArgs.get("text", "")
return len(text.split())
raise ValueError(f"Tool '{toolName}' not found")
def listTools(self) -> List[Dict[str, Any]]:
"""List all registered tools."""
return self.tools
def listResources(self) -> List[Dict[str, Any]]:
"""List all available resources."""
return self.resources
async def async_main(args, ollamaClient: OllamaClient): async def async_main(args, ollamaClient: OllamaClient):
"""Async main function.""" """Async main: MCP tools come only from mcp.json (Docker containers exposing SSE). Ollama is used only as LLM."""
# Connect to MCP server if specified mcpTools: List[StructuredTool] = []
mcpServerWrapper = None mcpServers: List[McpServerWrapper] = []
# MCP servers from config file (mcp.json) Docker containers with SSE endpoints
serverUrls: Dict[str, str] = loadMcpConfig(args.mcp_config)
if args.mcp_server: if args.mcp_server:
headers = {} serverUrls["default"] = args.mcp_server.rstrip("/")
if args.mcp_headers:
try:
headers = json.loads(args.mcp_headers)
except json.JSONDecodeError:
print("Warning: Invalid JSON in --mcp-headers, ignoring", file=sys.stderr)
mcpServerWrapper = McpServerWrapper(httpUrl=args.mcp_server, headers=headers) # Which servers to use: default = all from mcp.json; or --mcp-tools fetch,filesystem to pick a subset
if not await mcpServerWrapper.connect(): wantServers = [s.strip() for s in (args.mcp_tools or "").split(",") if s.strip()]
print("Error: Failed to connect to MCP server", file=sys.stderr) if not wantServers and serverUrls:
sys.exit(1) wantServers = list(serverUrls.keys())
print("Connected to MCP server via streamable HTTP", file=sys.stderr) print(f"MCP tools from config (all SSE servers): {wantServers}", file=sys.stderr)
for name in wantServers:
url = serverUrls.get(name)
if not url:
print(f"Warning: MCP server '{name}' not in config (known: {list(serverUrls.keys())})", file=sys.stderr)
continue
wrapper = McpServerWrapper(httpUrl=url)
if await wrapper.connect():
mcpServers.append(wrapper)
print(f"Connected to MCP server '{name}' at {url}", file=sys.stderr)
else:
print(f"Error: Failed to connect to MCP server '{name}' at {url}", file=sys.stderr)
# Initialize MCP client if mcpServers:
mcpClient = OllamaMcpClient(ollamaClient, mcpServerWrapper) mcpTools = await buildMcpLangChainTools(mcpServers, toolTimeout=getattr(args, "tool_timeout", None))
#print(f"Loaded {len(mcpTools)} MCP tools: {[t.name for t in mcpTools]}", file=sys.stderr)
# Load server tools mcpClient = OllamaMcpClient(ollamaClient, mcpTools=mcpTools)
if mcpServerWrapper: print(f"Agent tools: {mcpClient.listTools()}", file=sys.stderr)
await mcpClient._loadServerTools()
serverTools = await mcpServerWrapper.listServerTools()
if serverTools:
# Handle both Pydantic Tool objects and dicts
tool_names = [
getattr(t, "name", "") if hasattr(t, "name") else t.get("name", "") if isinstance(t, dict) else ""
for t in serverTools
]
print(f"Available MCP server tools: {tool_names}", file=sys.stderr)
# Register some example tools
mcpClient.registerTool(
name="get_time",
description="Get the current time",
parameters={}
)
mcpClient.registerTool(
name="count_words",
description="Count words in a text",
parameters={
"text": {
"type": "string",
"description": "The text to count words in"
}
}
)
# Process prompt or run interactively
if args.prompt: if args.prompt:
response = await mcpClient.processRequest(args.prompt) response = await mcpClient.processRequest(args.prompt, recursionLimit=args.recursion_limit)
print(response) print(response)
elif args.interactive: elif args.interactive:
print("MCP Client with Ollama - Interactive Mode") print("MCP Client with Ollama (LangChain agent) - Interactive Mode")
print("Type 'quit' or 'exit' to exit\n") print("Type 'quit' or 'exit' to exit\n")
while True: while True:
try: try:
@@ -427,7 +595,7 @@ async def async_main(args, ollamaClient: OllamaClient):
break break
if not prompt: if not prompt:
continue continue
response = await mcpClient.processRequest(prompt) response = await mcpClient.processRequest(prompt, recursionLimit=args.recursion_limit)
print(f"Assistant: {response}\n") print(f"Assistant: {response}\n")
except KeyboardInterrupt: except KeyboardInterrupt:
print("\nGoodbye!") print("\nGoodbye!")
@@ -435,9 +603,8 @@ async def async_main(args, ollamaClient: OllamaClient):
except Exception as e: except Exception as e:
print(f"Error: {e}", file=sys.stderr) print(f"Error: {e}", file=sys.stderr)
# Cleanup for wrapper in mcpServers:
if mcpServerWrapper: await wrapper.disconnect()
await mcpServerWrapper.disconnect()
def main(): def main():
@@ -452,7 +619,7 @@ def main():
) )
parser.add_argument( parser.add_argument(
"--model", "--model",
default="ministral-3", default="gpt-oss:20b",
help="Ollama model to use (default: ministral-3)" help="Ollama model to use (default: ministral-3)"
) )
parser.add_argument( parser.add_argument(
@@ -470,14 +637,35 @@ def main():
action="store_true", action="store_true",
help="Run in interactive mode" help="Run in interactive mode"
) )
parser.add_argument(
"--mcp-config",
default=None,
help="Path to mcp.json (default: mcpServer/mcp.json relative to project)"
)
parser.add_argument(
"--mcp-tools",
default="",
help="Comma-separated MCP server names from mcp.json (default: all servers in config). E.g. fetch,filesystem"
)
parser.add_argument( parser.add_argument(
"--mcp-server", "--mcp-server",
help="HTTP URL for MCP server (e.g., 'http://localhost:8000/mcp')", help="Override: single MCP SSE URL (e.g. http://localhost:3000/sse). Added as server 'default' in addition to mcp.json."
default="http://localhost:8000/mcp"
) )
parser.add_argument( parser.add_argument(
"--mcp-headers", "--mcp-headers",
help="Additional headers for MCP server as JSON string (e.g., '{\"Authorization\": \"Bearer token\"}')" help="Additional headers for MCP server as JSON string (e.g. '{\"Authorization\": \"Bearer token\"}')"
)
parser.add_argument(
"--recursion-limit",
type=int,
default=5000,
help="Max agent steps (model + tool calls) before stopping (default: 50)"
)
parser.add_argument(
"--tool-timeout",
type=float,
default=60,
help="Timeout in seconds for each MCP tool call. Prevents agent from freezing when a tool hangs (e.g. run with missing executable). Default: 60"
) )
args = parser.parse_args() args = parser.parse_args()

View File

@@ -1,2 +1,8 @@
requests>=2.31.0 requests>=2.31.0
fastmcp>=0.9.0 fastmcp>=0.9.0
langchain>=0.3.0
langchain-core>=0.3.0
langgraph>=0.2.0
langchain-community>=0.3.0
langchain-ollama>=0.2.0
pydantic>=2.0.0

View File

@@ -3,23 +3,24 @@ FROM node:22.12-alpine AS builder
WORKDIR /app WORKDIR /app
COPY . /app COPY . /app
COPY tsconfig.json /tsconfig.json
RUN npm install RUN npm install
RUN npm run build
RUN npm ci --ignore-scripts --omit-dev
FROM node:22-alpine AS release FROM node:22-alpine AS release
RUN apk add --no-cache bash
WORKDIR /app WORKDIR /app
COPY --from=builder /app/dist /app/dist COPY --from=builder /app/dist /app/dist
COPY --from=builder /app/package.json /app/package.json COPY --from=builder /app/package.json /app/package.json
COPY --from=builder /app/package-lock.json /app/package-lock.json
ENV NODE_ENV=production ENV NODE_ENV=production
ENV MCP_DEFAULT_CWD=/tmp
RUN npm ci --ignore-scripts --omit-dev RUN npm install --ignore-scripts --omit-dev
WORKDIR /tmp
ENTRYPOINT ["node", "/app/dist/index.js"] ENTRYPOINT ["node", "/app/dist/index.js"]

View File

@@ -7,6 +7,7 @@ import fs from "fs/promises";
import { createReadStream } from "fs"; import { createReadStream } from "fs";
import http from "http"; import http from "http";
import path from "path"; import path from "path";
import { spawn } from "child_process";
import { URL } from "url"; import { URL } from "url";
import { z } from "zod"; import { z } from "zod";
import { minimatch } from "minimatch"; import { minimatch } from "minimatch";
@@ -95,6 +96,15 @@ const GetFileInfoArgsSchema = z.object({
path: z.string(), path: z.string(),
}); });
const RunCommandArgsSchema = z.object({
command: z.string().min(1),
cwd: z.string().optional().describe("Working directory to run the command in. Must be within allowed directories."),
timeoutMs: z.number().int().positive().optional().default(600_000).describe("Kill the command if it runs longer than this (ms). Default: 10 minutes."),
env: z.record(z.string(), z.string()).optional().default({}).describe("Extra environment variables (string values)."),
shell: z.enum(["bash", "sh"]).optional().default("bash").describe("Shell to use for the command. Default: bash."),
maxOutputChars: z.number().int().positive().optional().default(200_000).describe("Maximum characters captured for each of stdout/stderr."),
});
// Server setup // Server setup
const server = new McpServer( const server = new McpServer(
{ {
@@ -121,6 +131,89 @@ async function readFileAsBase64Stream(filePath: string): Promise<string> {
}); });
} }
function appendWithLimit(current: string, chunk: string, maxChars: number): { next: string; truncated: boolean } {
if (maxChars <= 0) return { next: "", truncated: true };
if (current.length >= maxChars) return { next: current, truncated: true };
const remaining = maxChars - current.length;
if (chunk.length <= remaining) return { next: current + chunk, truncated: false };
return { next: current + chunk.slice(0, remaining), truncated: true };
}
async function runShellCommand(args: z.infer<typeof RunCommandArgsSchema>): Promise<{
exitCode: number | null;
signal: string | null;
stdout: string;
stderr: string;
stdoutTruncated: boolean;
stderrTruncated: boolean;
timedOut: boolean;
effectiveCwd: string;
}> {
const defaultCwd = process.env.MCP_DEFAULT_CWD ?? "/workspace";
const requestedCwd = args.cwd ?? defaultCwd;
const effectiveCwd = await validatePath(requestedCwd);
const cwdStats = await fs.stat(effectiveCwd);
if (!cwdStats.isDirectory()) {
throw new Error(`cwd is not a directory: ${requestedCwd}`);
}
const shellPath = args.shell === "sh" ? "/bin/sh" : "/bin/bash";
const env: NodeJS.ProcessEnv = { ...process.env, ...args.env };
return await new Promise((resolve) => {
const child = spawn(shellPath, ["-lc", args.command], {
cwd: effectiveCwd,
env,
stdio: ["ignore", "pipe", "pipe"],
});
let stdout = "";
let stderr = "";
let stdoutTruncated = false;
let stderrTruncated = false;
let timedOut = false;
const killTimer = setTimeout(() => {
timedOut = true;
try {
child.kill("SIGKILL");
} catch {
// ignore
}
}, args.timeoutMs);
child.stdout?.setEncoding("utf-8");
child.stderr?.setEncoding("utf-8");
child.stdout?.on("data", (data: string) => {
const result = appendWithLimit(stdout, data, args.maxOutputChars);
stdout = result.next;
stdoutTruncated = stdoutTruncated || result.truncated;
});
child.stderr?.on("data", (data: string) => {
const result = appendWithLimit(stderr, data, args.maxOutputChars);
stderr = result.next;
stderrTruncated = stderrTruncated || result.truncated;
});
child.on("close", (exitCode, signal) => {
clearTimeout(killTimer);
resolve({
exitCode,
signal: signal ? String(signal) : null,
stdout,
stderr,
stdoutTruncated,
stderrTruncated,
timedOut,
effectiveCwd,
});
});
});
}
// Tool registrations // Tool registrations
// read_file (deprecated) and read_text_file // read_file (deprecated) and read_text_file
@@ -616,6 +709,63 @@ server.registerTool(
} }
); );
async function runCommandToolHandler(rawArgs: unknown) {
const args = RunCommandArgsSchema.parse(rawArgs);
const result = await runShellCommand(args);
const textLines = [
`cwd: ${result.effectiveCwd}`,
`exitCode: ${result.exitCode === null ? "null" : String(result.exitCode)}`,
`signal: ${result.signal ?? "null"}`,
`timedOut: ${result.timedOut ? "true" : "false"}`,
`stdoutTruncated: ${result.stdoutTruncated ? "true" : "false"}`,
`stderrTruncated: ${result.stderrTruncated ? "true" : "false"}`,
"",
"stdout:",
result.stdout || "(empty)",
"",
"stderr:",
result.stderr || "(empty)",
];
const text = textLines.join("\n");
return {
content: [{ type: "text" as const, text }],
structuredContent: {
content: text,
cwd: result.effectiveCwd,
exitCode: result.exitCode,
signal: result.signal,
timedOut: result.timedOut,
stdout: result.stdout,
stderr: result.stderr,
stdoutTruncated: result.stdoutTruncated,
stderrTruncated: result.stderrTruncated,
},
};
}
const runCommandToolDefinition = {
title: "Run Command",
description:
"Execute a shell command inside this same container (same filesystem as the filesystem tools). " +
"Uses a non-interactive shell (`bash -lc` by default). Returns stdout/stderr, exit code, and timeout info.",
inputSchema: {
command: z.string(),
cwd: z.string().optional().describe("Working directory to run the command in. Must be within allowed directories."),
timeoutMs: z.number().optional().describe("Kill the command if it runs longer than this (ms). Default: 10 minutes."),
env: z.record(z.string(), z.string()).optional().describe("Extra environment variables (string values)."),
shell: z.enum(["bash", "sh"]).optional().describe("Shell to use for the command. Default: bash."),
maxOutputChars: z.number().optional().describe("Maximum characters captured for each of stdout/stderr."),
},
outputSchema: { content: z.string() },
annotations: { readOnlyHint: false, idempotentHint: false, destructiveHint: true },
};
// Keep snake_case for consistency with existing filesystem tools, and also provide a camelCase alias.
server.registerTool("run_command", runCommandToolDefinition, runCommandToolHandler);
server.registerTool("runCommand", runCommandToolDefinition, runCommandToolHandler);
// SSE transport session routing (sessionId -> transport) // SSE transport session routing (sessionId -> transport)
const sseTransportsBySessionId = new Map<string, SSEServerTransport>(); const sseTransportsBySessionId = new Map<string, SSEServerTransport>();
@@ -633,6 +783,20 @@ function runServer() {
transport.onclose = () => { transport.onclose = () => {
sseTransportsBySessionId.delete(transport.sessionId); sseTransportsBySessionId.delete(transport.sessionId);
}; };
// SSE heartbeat to prevent client ReadTimeout during idle (e.g. while waiting for Ollama)
const heartbeatIntervalMs = 15_000;
const heartbeatInterval = setInterval(() => {
try {
if (!res.writableEnded) {
res.write(': heartbeat\n\n');
} else {
clearInterval(heartbeatInterval);
}
} catch {
clearInterval(heartbeatInterval);
}
}, heartbeatIntervalMs);
res.on('close', () => clearInterval(heartbeatInterval));
await server.connect(transport); await server.connect(transport);
console.error("Secure MCP Filesystem Server: new SSE client connected"); console.error("Secure MCP Filesystem Server: new SSE client connected");
} catch (error) { } catch (error) {

View File

@@ -483,6 +483,18 @@ function runServer() {
transport.onclose = () => { transport.onclose = () => {
sseTransportsBySessionId.delete(transport.sessionId); sseTransportsBySessionId.delete(transport.sessionId);
}; };
const heartbeatInterval = setInterval(() => {
try {
if (!res.writableEnded) {
res.write(': heartbeat\n\n');
} else {
clearInterval(heartbeatInterval);
}
} catch {
clearInterval(heartbeatInterval);
}
}, 15_000);
res.on('close', () => clearInterval(heartbeatInterval));
await server.connect(transport); await server.connect(transport);
console.error("Knowledge Graph MCP Server: new SSE client connected"); console.error("Knowledge Graph MCP Server: new SSE client connected");
} catch (error) { } catch (error) {

View File

@@ -123,6 +123,18 @@ function runServer() {
transport.onclose = () => { transport.onclose = () => {
sseTransportsBySessionId.delete(transport.sessionId); sseTransportsBySessionId.delete(transport.sessionId);
}; };
const heartbeatInterval = setInterval(() => {
try {
if (!res.writableEnded) {
res.write(': heartbeat\n\n');
} else {
clearInterval(heartbeatInterval);
}
} catch {
clearInterval(heartbeatInterval);
}
}, 15_000);
res.on('close', () => clearInterval(heartbeatInterval));
await server.connect(transport); await server.connect(transport);
console.error("Sequential Thinking MCP Server: new SSE client connected"); console.error("Sequential Thinking MCP Server: new SSE client connected");
} catch (error) { } catch (error) {

View File

@@ -1 +0,0 @@
mcp[cli]>=1.25.0