docs/RPC_HTTP_STREAMING.md
HTTP Streaming RPC is a bidirectional communication protocol for FastLED that uses:
This enables real-time communication between FastLED devices and web-based or desktop applications over standard HTTP.
| Feature | Serial | HTTP Streaming |
|---|---|---|
| Network-based | No | Yes (WiFi/Ethernet) |
| Multiple clients | No | Yes |
| Bidirectional streaming | Yes | Yes |
| Standard protocol | Custom | HTTP/1.1 + JSON-RPC 2.0 |
| Web browser support | No | Yes (Fetch API) |
| Automatic reconnection | No | Yes |
| Heartbeat/keepalive | No | Yes |
| Firewall-friendly | N/A | Yes (uses port 80/8080) |
Use Serial Transport when:
Use HTTP Streaming when:
#include <FastLED.h>
#include "fl/remote/remote.h"
#include "fl/remote/rpc/response_send.h"
#include "fl/stl/asio/http/stream_server.h"
// Include implementations
#include "fl/stl/asio/http/stream_server.cpp.hpp"
#include "fl/stl/asio/http/stream_transport.cpp.hpp"
#include "fl/stl/asio/http/connection.cpp.hpp"
#include "fl/net/http/chunked_encoding.cpp.hpp"
#include "fl/stl/asio/http/http_parser.cpp.hpp"
#include "fl/stl/asio/http/native_server.cpp.hpp"
#define SERVER_PORT 8080
void setup() {
// Create HTTP server transport
auto transport = fl::make_shared<fl::HttpStreamServer>(SERVER_PORT);
// Create Remote instance
fl::Remote remote(
[&transport]() { return transport->readRequest(); },
[&transport](const fl::json& r) { transport->writeResponse(r); }
);
// Register SYNC method
remote.bind("add", [](int a, int b) { return a + b; });
// Register ASYNC method
remote.bindAsync("longTask", [](fl::ResponseSend& response, int duration) {
// Task runs in background, sends result later
setTimeout([&response, duration]() {
response.send(duration * 2);
}, duration);
}, fl::RpcMode::ASYNC);
Serial.println("HTTP RPC Server started on port 8080");
}
void loop() {
transport->update(millis()); // Handle network I/O
remote.update(millis()); // Process RPC requests
}
Python Example:
import requests
import json
url = "http://localhost:8080/rpc"
headers = {
"Content-Type": "application/json",
"Transfer-Encoding": "chunked"
}
# SYNC mode: immediate response
response = requests.post(url, headers=headers, json={
"jsonrpc": "2.0",
"method": "add",
"params": [2, 3],
"id": 1
})
print(response.json()) # {"jsonrpc":"2.0","result":5,"id":1}
curl Example:
curl -X POST http://localhost:8080/rpc \
-H "Content-Type: application/json" \
-H "Transfer-Encoding: chunked" \
-d '{"jsonrpc":"2.0","method":"add","params":[2,3],"id":1}'
Browser Fetch API:
fetch('http://localhost:8080/rpc', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
jsonrpc: '2.0',
method: 'add',
params: [2, 3],
id: 1
})
})
.then(response => response.json())
.then(data => console.log(data.result)); // 5
#include "fl/remote/remote.h"
#include "fl/remote/transport/serial.h"
// Create Serial-based Remote
fl::Remote remote = fl::Remote::createSerial(Serial);
// Register method (SYNC only)
remote.bind("add", [](int a, int b) { return a + b; });
// Update loop
void loop() {
remote.update(millis());
}
#include "fl/remote/remote.h"
#include "fl/stl/asio/http/stream_server.h"
// ... include .cpp.hpp files
#define SERVER_PORT 8080
// Create HTTP server transport
auto transport = fl::make_shared<fl::HttpStreamServer>(SERVER_PORT);
// Create Remote with callbacks
fl::Remote remote(
[&transport]() { return transport->readRequest(); },
[&transport](const fl::json& r) { transport->writeResponse(r); }
);
// Register SYNC method (same API)
remote.bind("add", [](int a, int b) { return a + b; });
// Register ASYNC method (new capability)
remote.bindAsync("longTask", [](fl::ResponseSend& response, int duration) {
setTimeout([&response, duration]() {
response.send(duration * 2);
}, duration);
}, fl::RpcMode::ASYNC);
// Update loop (add transport update)
void loop() {
transport->update(millis()); // NEW: transport layer update
remote.update(millis());
}
| Component | Serial | HTTP Streaming |
|---|---|---|
| Include files | transport/serial.h | transport/http/stream_server.h + .cpp.hpp files |
| Constructor | Remote::createSerial(Serial) | Callback-based: Remote([]{...}, []{...}) |
| Transport object | None (Serial built-in) | HttpStreamServer or HttpStreamClient |
| Update loop | remote.update() only | transport->update() + remote.update() |
| Configuration | Serial baud rate | Port number + heartbeat interval |
// No transport object, Remote reads directly from Serial
fl::Remote remote = fl::Remote::createSerial(Serial);
// Transport object manages I/O, Remote uses callbacks
auto transport = fl::make_shared<fl::HttpStreamServer>(8080);
fl::Remote remote(
[&transport]() { return transport->readRequest(); },
[&transport](const fl::json& r) { transport->writeResponse(r); }
);
.cpp.hpp filesRemote::createSerial() with HttpStreamServer/HttpStreamClientRemote() constructortransport->update(millis()) in loopbindAsync() with ResponseSend&setOnConnect()/setOnDisconnect() callbacksServer-side HTTP streaming transport for native platforms (POSIX sockets).
HttpStreamServer(uint16_t port, uint32_t heartbeatIntervalMs = 30000);
port: TCP port to listen on (e.g., 8080)heartbeatIntervalMs: Heartbeat interval in milliseconds (default: 30s)// Connection management
bool connect(); // Start listening (called automatically)
void disconnect(); // Stop server and close all clients
bool isConnected() const; // Returns true if any client connected
// RequestSource/ResponseSink implementation
fl::optional<fl::json> readRequest();
void writeResponse(const fl::json& response);
// Update loop (MUST call in loop())
void update(uint32_t currentTimeMs);
// Configuration
void setHeartbeatInterval(uint32_t intervalMs);
void setTimeout(uint32_t timeoutMs);
uint32_t getHeartbeatInterval() const;
uint32_t getTimeout() const;
// Callbacks
void setOnConnect(StateCallback callback);
void setOnDisconnect(StateCallback callback);
auto transport = fl::make_shared<fl::HttpStreamServer>(8080);
transport->setHeartbeatInterval(60000); // 60s heartbeat
transport->setTimeout(120000); // 120s timeout
transport->setOnConnect([]() { Serial.println("Client connected"); });
void loop() {
transport->update(millis());
remote.update(millis());
}
Client-side HTTP streaming transport for native platforms.
HttpStreamClient(const char* host, uint16_t port, uint32_t heartbeatIntervalMs = 30000);
host: Server hostname/IP (e.g., "localhost" or "192.168.1.100")port: Server port (e.g., 8080)heartbeatIntervalMs: Heartbeat interval in milliseconds (default: 30s)// Connection management
bool connect(); // Connect to server
void disconnect(); // Close connection
bool isConnected() const; // Returns true if connected
// RequestSource/ResponseSink implementation
fl::optional<fl::json> readRequest();
void writeResponse(const fl::json& response);
// Update loop (MUST call in loop())
void update(uint32_t currentTimeMs);
// Configuration
void setHeartbeatInterval(uint32_t intervalMs);
void setTimeout(uint32_t timeoutMs);
uint32_t getHeartbeatInterval() const;
uint32_t getTimeout() const;
// Callbacks
void setOnConnect(StateCallback callback);
void setOnDisconnect(StateCallback callback);
auto transport = fl::make_shared<fl::HttpStreamClient>("localhost", 8080);
transport->setOnConnect([]() { Serial.println("Connected to server"); });
transport->setOnDisconnect([]() { Serial.println("Disconnected from server"); });
void loop() {
transport->update(millis());
remote.update(millis());
}
Abstract base class for HTTP streaming transports. Both HttpStreamServer and HttpStreamClient inherit from this class.
// Subclasses implement platform-specific I/O
virtual int sendData(const uint8_t* data, size_t length) = 0;
virtual int recvData(uint8_t* buffer, size_t maxLength) = 0;
virtual bool connect() = 0;
virtual void disconnect() = 0;
virtual bool isConnected() const = 0;
Controls how often heartbeat messages are sent to keep connection alive.
transport->setHeartbeatInterval(30000); // 30 seconds (default)
transport->setHeartbeatInterval(60000); // 60 seconds
Recommendation: 30-60 seconds for most applications.
Controls how long to wait before declaring connection dead.
transport->setTimeout(60000); // 60 seconds (default)
transport->setTimeout(120000); // 120 seconds
Recommendation: 2x heartbeat interval (e.g., 60s timeout with 30s heartbeat).
Handled automatically by HttpConnection with exponential backoff:
// Reconnection is automatic, but you can track state:
transport->setOnDisconnect([]() {
Serial.println("Disconnected, auto-reconnect will trigger");
});
transport->setOnConnect([]() {
Serial.println("Reconnected successfully");
});
HTTP Streaming RPC supports three modes: SYNC, ASYNC, and ASYNC_STREAM. Each mode has different response patterns.
Use case: Simple request/response with immediate result.
Flow:
Code Example (Server):
// Register SYNC method
remote.bind("add", [](int a, int b) {
return a + b; // Result sent immediately
});
Code Example (Client):
curl -X POST http://localhost:8080/rpc \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"add","params":[2,3],"id":1}'
# Response (immediate):
# {"jsonrpc":"2.0","result":5,"id":1}
Protocol:
Client → Server: {"jsonrpc":"2.0","method":"add","params":[2,3],"id":1}
Server → Client: {"jsonrpc":"2.0","result":5,"id":1}
Use case: Long-running tasks where immediate response is not possible.
Flow:
Code Example (Server):
// Register ASYNC method
remote.bindAsync("longTask", [](fl::ResponseSend& response, int duration) {
// Send ACK immediately (automatic)
// Simulate long task
setTimeout([&response, duration]() {
int result = duration * 2;
response.send(result); // Send result when ready
}, duration);
}, fl::RpcMode::ASYNC);
Code Example (Client - Python with streaming):
import requests
response = requests.post(
"http://localhost:8080/rpc",
headers={"Content-Type": "application/json"},
json={"jsonrpc":"2.0","method":"longTask","params":[1000],"id":2},
stream=True # Enable streaming
)
# Read chunks as they arrive
for line in response.iter_lines():
if line:
data = json.loads(line)
if "ack" in data:
print("ACK received:", data)
elif "result" in data:
print("Result received:", data)
Protocol:
Client → Server: {"jsonrpc":"2.0","method":"longTask","params":[1000],"id":2}
Server → Client: {"ack":true}
[... 1000ms delay ...]
Server → Client: {"jsonrpc":"2.0","result":2000,"id":2}
Use case: Progressive tasks with incremental updates (progress bars, data streaming).
Flow:
stop: trueCode Example (Server):
// Register ASYNC_STREAM method
remote.bindAsync("streamData", [](fl::ResponseSend& response, int count) {
// Send ACK immediately (automatic)
// Send progressive updates
for (int i = 0; i < count; i++) {
setTimeout([&response, i]() {
response.sendUpdate(i); // Send update
}, i * 100);
}
// Send final result
setTimeout([&response, count]() {
response.sendFinal(count); // Send final with "stop: true"
}, count * 100);
}, fl::RpcMode::ASYNC_STREAM);
Code Example (Client - Python with streaming):
import requests
response = requests.post(
"http://localhost:8080/rpc",
headers={"Content-Type": "application/json"},
json={"jsonrpc":"2.0","method":"streamData","params":[5],"id":3},
stream=True
)
for line in response.iter_lines():
if line:
data = json.loads(line)
if "ack" in data:
print("ACK:", data)
elif "update" in data:
print("Update:", data["update"])
elif "stop" in data and data["stop"]:
print("Final result:", data["value"])
break
Protocol:
Client → Server: {"jsonrpc":"2.0","method":"streamData","params":[5],"id":3}
Server → Client: {"ack":true}
Server → Client: {"update":0}
Server → Client: {"update":1}
Server → Client: {"update":2}
Server → Client: {"update":3}
Server → Client: {"update":4}
Server → Client: {"value":5,"stop":true}
auto transport = fl::make_shared<fl::HttpStreamServer>(8080);
transport->setOnConnect([]() {
Serial.println("Client connected");
// Optional: send welcome message, reset state, etc.
});
transport->setOnDisconnect([]() {
Serial.println("Client disconnected");
// Optional: save state, clean up resources, etc.
});
void loop() {
transport->update(millis());
if (transport->isConnected()) {
// Process normally
remote.update(millis());
} else {
// Show "waiting for connection" on LEDs
FastLED.showColor(CRGB::Red);
}
}
// Server: disconnect all clients
transport->disconnect();
// Client: disconnect and reconnect
transport->disconnect();
delay(1000);
transport->connect();
auto transport = fl::make_shared<fl::HttpStreamClient>("localhost", 8080);
if (!transport->connect()) {
Serial.println("Failed to connect to server");
// Retry logic or fallback behavior
}
transport->setOnDisconnect([]() {
Serial.println("Connection lost, auto-reconnect will trigger");
});
HTTP transport preserves JSON-RPC 2.0 error responses:
// Server: method not found (automatic)
// Client sends: {"jsonrpc":"2.0","method":"unknown","params":[],"id":1}
// Server responds: {"jsonrpc":"2.0","error":{"code":-32601,"message":"Method not found"},"id":1}
// Server: custom error
remote.bind("divide", [](int a, int b) -> fl::json {
if (b == 0) {
return fl::json::object({
{"jsonrpc", "2.0"},
{"error", fl::json::object({
{"code", -32000},
{"message", "Division by zero"}
})},
{"id", nullptr} // Client provides ID
});
}
return a / b;
});
// Configure timeout
transport->setTimeout(60000); // 60 seconds
// Timeout is detected in transport->update()
// Triggers onDisconnect callback automatically
transport->setOnDisconnect([]() {
Serial.println("Connection timeout detected");
});
int reconnectAttempts = 0;
const int MAX_RECONNECT_ATTEMPTS = 5;
transport->setOnDisconnect([&]() {
reconnectAttempts++;
if (reconnectAttempts >= MAX_RECONNECT_ATTEMPTS) {
Serial.println("Max reconnection attempts reached, giving up");
// Optional: fallback to different transport or offline mode
} else {
Serial.printf("Reconnecting... (attempt %d/%d)\n",
reconnectAttempts, MAX_RECONNECT_ATTEMPTS);
}
});
transport->setOnConnect([&]() {
reconnectAttempts = 0; // Reset on successful connection
Serial.println("Connected successfully");
});
// Reduce heartbeat interval (more frequent keepalives)
transport->setHeartbeatInterval(10000); // 10 seconds
// Reduce timeout (faster failure detection)
transport->setTimeout(20000); // 20 seconds
// Increase heartbeat interval (fewer keepalive messages)
transport->setHeartbeatInterval(120000); // 2 minutes
// Increase timeout (tolerate longer idle periods)
transport->setTimeout(240000); // 4 minutes
// HttpStreamServer automatically handles multiple clients
auto transport = fl::make_shared<fl::HttpStreamServer>(8080);
// Responses are broadcast to ALL connected clients
// Each client receives all RPC responses
// To send to specific client, use custom routing:
remote.bind("broadcast", [](const fl::json& message) {
// All clients receive this response
return message;
});
Symptom: Client cannot connect to server.
Checklist:
transport->connect() called in setup())localhost for same machine)Debug:
// Server
auto transport = fl::make_shared<fl::HttpStreamServer>(8080);
if (!transport->connect()) {
Serial.println("Failed to start server on port 8080");
Serial.println("Check: port already in use, firewall, permissions");
}
// Client
auto transport = fl::make_shared<fl::HttpStreamClient>("localhost", 8080);
transport->setOnConnect([]() { Serial.println("Connected!"); });
transport->setOnDisconnect([]() { Serial.println("Disconnected!"); });
Symptom: Connection drops after idle period.
Cause: Timeout is shorter than heartbeat interval.
Fix: Ensure timeout is at least 2x heartbeat interval.
// WRONG: timeout < heartbeat
transport->setHeartbeatInterval(60000); // 60s
transport->setTimeout(30000); // 30s (too short!)
// CORRECT: timeout ≥ 2x heartbeat
transport->setHeartbeatInterval(60000); // 60s
transport->setTimeout(120000); // 120s (safe)
Symptom: Connection drops even though both sides are active.
Checklist:
transport->update(millis()) called in loop()Debug:
// Add logging to track heartbeat
transport->setOnConnect([]() {
Serial.printf("Connected, heartbeat every %d ms\n",
transport->getHeartbeatInterval());
});
// Monitor heartbeat in update loop
uint32_t lastHeartbeat = 0;
void loop() {
uint32_t now = millis();
transport->update(now);
if (now - lastHeartbeat > transport->getHeartbeatInterval()) {
Serial.println("Heartbeat sent");
lastHeartbeat = now;
}
remote.update(now);
}
Symptom: Server fails to start with "address already in use" error.
Cause: Port is already bound by another process.
Fix:
Kill existing process using the port:
# Linux/macOS
lsof -ti:8080 | xargs kill -9
# Windows
netstat -ano | findstr :8080
taskkill /PID <PID> /F
Use a different port:
auto transport = fl::make_shared<fl::HttpStreamServer>(8081); // Use 8081 instead
Add error handling:
auto transport = fl::make_shared<fl::HttpStreamServer>(8080);
if (!transport->connect()) {
Serial.println("Port 8080 in use, trying 8081...");
transport = fl::make_shared<fl::HttpStreamServer>(8081);
transport->connect();
}
Symptom: Malformed chunk size or invalid chunk data.
Cause: Client or server not following HTTP/1.1 chunked encoding spec.
Fix: Ensure clients use proper chunked encoding format:
# Python: Use requests with stream=True
import requests
response = requests.post(
"http://localhost:8080/rpc",
headers={"Content-Type": "application/json"},
json={"jsonrpc":"2.0","method":"add","params":[2,3],"id":1},
stream=True # IMPORTANT: enables chunked encoding
)
# curl: Use --data instead of --data-binary
curl -X POST http://localhost:8080/rpc \
-H "Content-Type: application/json" \
-H "Transfer-Encoding: chunked" \
-d '{"jsonrpc":"2.0","method":"add","params":[2,3],"id":1}'
Symptom: Memory usage grows over time.
Cause: Shared pointers not cleaned up, or circular references.
Fix: Use weak pointers and ensure proper cleanup:
// WRONG: Circular reference
auto transport = fl::make_shared<fl::HttpStreamServer>(8080);
auto remote = fl::make_shared<fl::Remote>(...);
transport->setOnConnect([remote]() { // Captures shared_ptr, circular ref
remote->update(0);
});
// CORRECT: Use weak_ptr or raw pointer
auto transport = fl::make_shared<fl::HttpStreamServer>(8080);
fl::Remote remote(...);
transport->setOnConnect([&remote]() { // Captures by reference, no circular ref
remote.update(0);
});
Debug: Use AddressSanitizer (ASAN) to detect leaks:
bash test --docker --cpp loopback # Runs with ASAN enabled
Found an issue or have a question? Please file an issue at:
Contributions welcome! See CONTRIBUTING.md for guidelines.