Coreflux MCP Server
Connects Claude and other MCP-compatible AI assistants to a Coreflux MQTT broker, enabling them to discover and execute Coreflux commands for managing models, actions, rules, and routes through natural language.
README Documentation
Coreflux MQTT MCP Server
An enterprise-grade Model Context Protocol (MCP) server that provides secure, scalable access to Coreflux MQTT brokers and comprehensive automation capabilities for Claude and other MCP-compatible AI assistants.
🚀 Features
Core Functionality
- 🔌 MQTT Integration: Seamless connection to Coreflux MQTT brokers with full TLS support
- 🛠️ Complete Coreflux API: Full access to models, actions, rules, and routes
- 🤖 AI Code Generation: LOT (Language-of-Things) code generation via Coreflux Copilot API
- 🔍 Dynamic Discovery: Automatic discovery and listing of available actions
- 🏥 Health Monitoring: Comprehensive system health checks and monitoring
Enterprise Features
- 🔒 Production Security: Comprehensive log sanitization, input validation, and security features
- ⚡ Async Processing: Non-blocking message processing with rate limiting and queue management
- � Enhanced Logging: Structured logging with rotation, filtering, and security sanitization
- ✅ Configuration Validation: Comprehensive environment and file validation system
- 🧪 Testing Framework: Complete unit testing suite with mocking and coverage reporting
DevOps & Deployment
- 🐳 Container Ready: Full Docker and Kubernetes deployment support with health checks
- 🔄 CI/CD Pipeline: GitHub Actions with automated testing, security scanning, and quality checks
- 📦 Development Tools: Pre-commit hooks, code formatting, linting, and documentation generation
- ⚙️ Easy Setup: Interactive setup assistant with validation and testing
- 📚 Rich Documentation: API documentation, security guides, and deployment instructions
Quick Start
Docker Deployment (Recommended)
-
Clone and configure:
git clone https://github.com/CorefluxCommunity/Coreflux-MQTT-MCP-Server.git cd Coreflux-MQTT-MCP-Server cp .env.example .env # Edit .env with your configuration
-
Deploy with Docker:
docker-compose up -d
🚀 Quick Start
Prerequisites
- Python 3.11 or higher
- Docker (optional, for containerized deployment)
- Access to a Coreflux MQTT broker
- Coreflux Copilot API key (optional, for AI assistance)
Option 1: Docker Deployment (Recommended)
-
Clone and configure:
git clone https://github.com/CorefluxCommunity/Coreflux-MQTT-MCP-Server.git cd Coreflux-MQTT-MCP-Server cp .env.example .env # Edit .env with your configuration
-
Deploy with Docker:
docker-compose up -d
-
Verify deployment:
docker-compose logs -f coreflux-mcp-server
Option 2: Development Installation
-
Clone and setup:
git clone https://github.com/CorefluxCommunity/Coreflux-MQTT-MCP-Server.git cd Coreflux-MQTT-MCP-Server
-
Install dependencies:
pip install -r requirements.txt # For development pip install -r requirements-dev.txt
-
Configure environment:
python setup_assistant.py # Interactive configuration # OR cp .env.example .env && nano .env # Manual configuration
-
Validate and test:
make validate # Validate configuration make test # Run tests
-
Start the server:
python server.py # OR make run
For detailed deployment instructions, see DEPLOYMENT.md.
⚙️ Configuration
Interactive Setup Assistant
The server includes a comprehensive setup assistant that guides you through configuration:
python setup_assistant.py
The assistant helps with:
- 🔧 MQTT broker connection settings
- 🔐 TLS certificate configuration
- 🤖 Coreflux Copilot API integration
- 📝 Logging and monitoring setup
- ✅ Configuration validation and testing
Use the setup assistant when:
- Creating initial configuration
- Updating existing settings
- Troubleshooting connection issues
- Setting up TLS certificates
- Migrating between environments
Environment Configuration
Copy .env.example
to .env
and configure:
# MQTT Broker Configuration
MQTT_BROKER=your-broker-host.com
MQTT_PORT=8883
MQTT_USER=your-username
MQTT_PASSWORD=your-password
MQTT_USE_TLS=true
# TLS Configuration (when MQTT_USE_TLS=true)
MQTT_CA_CERT=/path/to/ca.crt
MQTT_CERT_FILE=/path/to/client.crt
MQTT_KEY_FILE=/path/to/client.key
# Coreflux Copilot API
DO_AGENT_API_KEY=your-api-key-here
# Logging Configuration
LOG_LEVEL=INFO
LOG_FILE=/var/log/coreflux-mcp.log
For detailed configuration options, see the Configuration Guide.
🔌 Connecting Claude to the MCP Server
Using Claude Desktop
-
Locate Claude Desktop config file:
- macOS/Linux:
~/Library/Application Support/Claude/claude_desktop_config.json
- Windows:
%USERPROFILE%\AppData\Roaming\Claude\claude_desktop_config.json
- macOS/Linux:
-
Add server configuration:
{ "mcpServers": { "coreflux": { "command": "python", "args": ["/path/to/your/server.py"], "env": { "MQTT_BROKER": "your-broker-host.com", "MQTT_PORT": "8883", "MQTT_USER": "your-username", "MQTT_PASSWORD": "your-password", "MQTT_USE_TLS": "true", "DO_AGENT_API_KEY": "your-copilot-api-key" } } } }
-
Restart Claude Desktop
Security Note: For production deployments, store secrets in secure environment variables or secret management systems rather than the Claude config file.
Using Environment Variables
For better security, use environment variables instead of hardcoding credentials:
{
"mcpServers": {
"coreflux": {
"command": "python",
"args": ["/path/to/your/server.py"],
"env": {
"MQTT_BROKER": "${COREFLUX_MQTT_BROKER}",
"MQTT_PORT": "${COREFLUX_MQTT_PORT}",
"MQTT_USER": "${COREFLUX_MQTT_USER}",
"MQTT_PASSWORD": "${COREFLUX_MQTT_PASSWORD}",
"DO_AGENT_API_KEY": "${COREFLUX_API_KEY}"
}
}
}
}
Testing the Connection
Once configured, test the connection by asking Claude:
Can you check the health of the Coreflux MCP server and show me the broker information?
Claude should respond with system status and broker details if the connection is successful.
🛠️ Available Tools
The server provides the following tools to Claude:
Core MQTT Tools
publish_to_coreflux
- Publish messages to MQTT topics with QoS and retention optionsget_broker_info
- Get detailed information about the MQTT broker connection
AI Assistance Tools
copilot_assist
- Query the Coreflux Copilot AI for automation assistance and code generation
System Management Tools
comprehensive_health_check
- Perform detailed health checks of all system components
For detailed API documentation, see API_DOCUMENTATION.md.
🧪 Development & Testing
Development Setup
-
Install development dependencies:
pip install -r requirements-dev.txt
-
Install pre-commit hooks:
pre-commit install
-
Run the full development setup:
make dev-setup # Complete development environment setup
Testing
Run the comprehensive test suite:
# Run all tests
make test
# Run tests with coverage
make test-coverage
# Run specific test categories
make test-unit # Unit tests only
make test-integration # Integration tests only
Code Quality
Maintain code quality with automated tools:
# Format code
make format
# Run linters
make lint
# Security scanning
make security-check
# Type checking
make type-check
# Run all quality checks
make quality-check
Available development commands:
# Development workflow
make dev-setup # Set up complete development environment
make validate # Validate configuration and environment
make run # Start the server with validation
make run-debug # Start server in debug mode
# Testing and validation
make test # Run all tests
make test-coverage # Run tests with coverage report
make test-unit # Run unit tests only
make validate-config # Validate configuration files
# Code quality
make format # Format code with black and isort
make lint # Run all linters (flake8, bandit, mypy)
make security-check # Run security scanning
make type-check # Run type checking with mypy
# Docker operations
make docker-build # Build Docker image
make docker-run # Run in Docker container
make docker-test # Run tests in Docker
# Documentation
make docs # Generate documentation
make docs-serve # Serve documentation locally
🔧 System Architecture
Core Components
server.py
- Main MCP server with tool implementationsconfig_validator.py
- Configuration validation and environment checkingmessage_processor.py
- Asynchronous MQTT message processing with rate limitingenhanced_logging.py
- Structured logging with rotation and security filteringconfig_schema.py
- Pydantic schemas for type-safe configurationparser.py
- Sanitization and parsing utilities
Security Features
- Input Sanitization - All inputs are sanitized to prevent injection attacks
- Log Security - Automatic sanitization of sensitive data in logs
- TLS Support - Full TLS encryption for MQTT connections
- Configuration Validation - Comprehensive validation of all configuration parameters
- Secret Management - Secure handling of credentials and API keys
Performance Features
- Async Processing - Non-blocking message processing
- Connection Pooling - Efficient MQTT connection management
- Rate Limiting - Configurable rate limits to prevent abuse
- Health Monitoring - Real-time health checks and system monitoring
📚 Documentation
- API Documentation - Complete API reference
- Deployment Guide - Production deployment instructions
- Secret Management - Security and secret management guide
- Configuration Reference - Complete configuration options
🐳 Docker Deployment
Quick Start with Docker
# Clone and configure
git clone https://github.com/CorefluxCommunity/Coreflux-MQTT-MCP-Server.git
cd Coreflux-MQTT-MCP-Server
# Copy and edit environment file
cp .env.example .env
nano .env # Configure your settings
# Start with Docker Compose
docker-compose up -d
# Check logs
docker-compose logs -f coreflux-mcp-server
# Health check
docker-compose exec coreflux-mcp-server python -c "
import os
os.system('python server.py --health-check')
"
Production Docker Deployment
See DEPLOYMENT.md for comprehensive production deployment instructions including:
- Multi-stage Docker builds
- Kubernetes deployments
- Health checks and monitoring
- Load balancing and scaling
- Security configurations
🔑 Coreflux Copilot Integration
The server includes powerful AI assistance through the Coreflux Copilot API:
Setup
- Obtain API Key from the Coreflux Copilot dashboard
- Configure the key:
# Option 1: Environment file echo "DO_AGENT_API_KEY=your_api_key_here" >> .env # Option 2: Environment variable export DO_AGENT_API_KEY=your_api_key_here
Features
- LOT Code Generation - Generate Language-of-Things code from natural language
- Automation Assistance - Get help with Coreflux automation tasks
- Best Practices - Receive guidance on optimal implementations
- Troubleshooting - Get assistance with debugging and optimization
Usage Examples
Ask Claude to help with Coreflux automation:
Generate LOT code for a temperature monitoring system that triggers an alert when the temperature exceeds 75°F
Help me create a rule that processes sensor data and stores it in a database
🚀 Advanced Features
Asynchronous Message Processing
The server includes a robust async message processor that:
- Prevents Blocking - Handles messages without blocking the main thread
- Rate Limiting - Configurable limits to prevent system overload
- Queue Management - Intelligent queue handling with backpressure
- Statistics - Real-time processing metrics and monitoring
Enhanced Logging System
Comprehensive logging with enterprise features:
- Structured Logging - JSON formatted logs for easy parsing
- Log Rotation - Automatic log file rotation to manage disk space
- Security Filtering - Automatic sanitization of sensitive information
- Multiple Outputs - Console, file, and syslog support
Configuration Validation
Robust validation system that checks:
- Environment Variables - Validates all required configuration
- File Permissions - Ensures certificate files are accessible
- Network Connectivity - Tests MQTT broker connectivity
- API Availability - Validates Copilot API access
🛡️ Security & Compliance
Security Features
- Input Sanitization - All inputs validated and sanitized
- TLS Encryption - Full TLS support for MQTT connections
- Secret Management - Secure credential handling
- Audit Logging - Comprehensive security event logging
- Non-root Execution - Runs with minimal privileges
Compliance Support
The server supports various compliance requirements:
- SOC 2 - Security controls and monitoring
- GDPR - Data protection and privacy
- HIPAA - Healthcare data protection (when properly configured)
For detailed security information, see SECRET_MANAGEMENT.md.
📊 Monitoring & Health Checks
Health Check Tool
Comprehensive health monitoring with the comprehensive_health_check
tool:
# Manual health check
python server.py --health-check
# Or ask Claude:
# "Please run a comprehensive health check on the Coreflux MCP server"
Monitoring Metrics
The server provides detailed metrics:
- Connection Status - MQTT broker connectivity
- Message Processing - Queue size and processing rates
- System Resources - Memory and CPU usage
- Error Rates - Failed operations and error statistics
- API Status - Copilot API availability and response times
Alerting
Configure alerts for:
- Connection failures
- High error rates
- Resource exhaustion
- Security events
🤝 Contributing
We welcome contributions! Please see our contribution guidelines:
Development Process
- Fork the repository
- Create a feature branch:
git checkout -b feature/amazing-feature
- Install development dependencies:
pip install -r requirements-dev.txt
- Setup pre-commit hooks:
pre-commit install
- Make your changes with tests
- Run quality checks:
make quality-check
- Commit your changes:
git commit -am 'Add amazing feature'
- Push to the branch:
git push origin feature/amazing-feature
- Create a Pull Request
Code Standards
- Python 3.11+ compatibility
- Type hints for all functions
- Comprehensive tests with >90% coverage
- Security scanning with bandit
- Code formatting with black and isort
- Documentation for all public APIs
📄 License
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
🆘 Support & Troubleshooting
Common Issues
Connection Refused
Error: MQTT connection failed
- Check broker hostname and port
- Verify network connectivity
- Confirm TLS configuration
Authentication Failed
Error: Authentication failed
- Verify username/password
- Check API key validity
- Confirm broker permissions
TLS Handshake Failed
Error: TLS handshake failed
- Verify certificate paths
- Check certificate validity
- Confirm TLS version compatibility
Debug Mode
Enable detailed logging for troubleshooting:
export LOG_LEVEL=DEBUG
python server.py
Getting Help
- GitHub Issues: Report bugs and request features
- Discussions: Community support and questions
- Documentation: Complete documentation
- Security Issues: Report to security@coreflux.org
🗺️ Roadmap
Current Status: v1.0.0 ✅
- ✅ Core MQTT functionality
- ✅ Copilot API integration
- ✅ Enterprise security features
- ✅ Comprehensive testing
- ✅ Production deployment support
Upcoming Features
- v1.1.0 - Enhanced monitoring and metrics
- v1.2.0 - Additional Coreflux API endpoints
- v1.3.0 - WebSocket support for real-time data
- v2.0.0 - Multi-broker support and federation
📋 Quick Reference
Essential Commands
# Setup and configuration
python setup_assistant.py # Interactive setup
make validate # Validate configuration
# Development
make dev-setup # Complete dev environment
make test # Run all tests
make quality-check # Run all quality checks
# Deployment
docker-compose up -d # Docker deployment
make docker-build # Build Docker image
# Monitoring
make health-check # System health check
docker-compose logs -f # View logs
Key Files
server.py
- Main MCP server.env
- Configuration filerequirements.txt
- Python dependenciesdocker-compose.yml
- Docker deploymentMakefile
- Development commands
Built with ❤️ by the Coreflux Community
remove_action
: Remove an action event/functionrun_action
: Run an action event/functionremove_all_models
: Remove all modelsremove_all_actions
: Remove all actionsremove_all_routes
: Remove all routeslist_discovered_actions
: List all discovered Coreflux actionsrequest_lot_code
: Generate LOT code using Coreflux Copilot API based on natural language prompts
Debugging and Troubleshooting
The MCP server now starts even if the MQTT broker is not available, allowing you to troubleshoot and configure connections through the MCP tools.
Connection Status and Recovery
- The server will start successfully even if the MQTT broker is unreachable
- Use the
get_connection_status
tool to check connection health and get troubleshooting guidance - Use the
setup_mqtt_connection
tool to configure a new broker connection without restarting - Use the
check_broker_health
orreconnect_mqtt
tools to test and retry connections
Available Tools for Connection Management
get_connection_status
: Get detailed connection status with troubleshooting guidancesetup_mqtt_connection
: Configure a new MQTT broker connection dynamicallymqtt_connect
: Connect to a specific MQTT broker with custom parameterscheck_broker_health
: Test broker connectivity and attempt reconnectionreconnect_mqtt
: Force reconnection to the configured broker
Traditional Troubleshooting Steps
If you encounter issues:
- Verify your MQTT broker credentials in your Claude configuration
- Ensure the broker is accessible
- Run the setup assistant to verify or update your configuration:
python setup_assistant.py
- Check Claude Desktop logs:
# Check Claude's logs for errors (macOS/Linux) tail -n 20 -f ~/Library/Logs/Claude/mcp*.log # Windows PowerShell Get-Content -Path "$env:USERPROFILE\AppData\Roaming\Claude\Logs\mcp*.log" -Tail 20 -Wait
- Run the server with debug logging:
# Direct execution with debug logging python server.py --mqtt-host localhost --mqtt-port 1883 --log-level DEBUG
References and Documentation
- DEPLOYMENT.md - Production deployment guide
- SECURITY.md - Security guidelines and best practices
- MCP Documentation - Official MCP documentation
- Coreflux Platform - Coreflux automation platform
Contributing
Contributions are welcome! Please read our contributing guidelines and submit pull requests to the development
branch.
License
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
Support
- 📖 Documentation: Check the README, DEPLOYMENT.md, and SECURITY.md files
- 🐛 Issues: Report bugs and feature requests on GitHub
- 💬 Community: Join the Coreflux community for discussions