A Kafka Connect source connector that establishes a WebSocket connection to a remote server, optionally sends a subscription message, and streams incoming messages into a Kafka topic.
- Connects to a specified WebSocket URL.
- Sends an initial subscription message if configured.
- Periodically sends application-level ping messages
- Matches incoming pong messages using a specified regex pattern to log and exclude from the message stream
- Converts incoming WebSocket messages into Kafka
SourceRecordentries. - Fully compatible with Kafka Connect standalone and distributed modes.
- Supports integration testing using Testcontainers.
- Stream real-time market data, events, or telemetry into Kafka.
- Bridge WebSocket-based APIs with Kafka consumers.
- Integrate with public or private WebSocket feeds.
mvn clean packageThe resulting JAR will be located in target/.
Place the connector JAR into your Kafka Connect plugin.path, e.g.:
mkdir -p /path/to/connect-plugins/websocket-connector
cp kafka-websocket-source-connector-*.jar /path/to/connect-plugins/websocket-connector/Ensure your Kafka Connect worker config includes:
plugin.path=/path/to/connect-pluginsExample contents of the JSON config file for the connector:
{
"name": "websocket-source-connector",
"config": {
"connector.class": "com.kcharkseliani.kafka.connect.websocket.WebSocketSourceConnector",
"tasks.max": "1",
"websocket.url": "wss://example.com/feed",
"topic": "websocket-topic",
"websocket.subscription.message": "{\"type\": \"subscribe\"}",
"websocket.ping.message": "{\"method\":\"ping\"}",
"websocket.ping.interval.ms": 20000,
"websocket.pong.pattern": "\\\"method\\\"\\s*:\\s*\\\"pong\\\""
}
}Once Kafka Connect is running, you can deploy the WebSocket source connector using a configuration file with the contents from the section above:
curl -X POST http://<CONNECT_HOST>:8083/connectors \
-H "Content-Type: application/json" \
-d @connector-config.jsonAlternatively, you can provide the connector configuration inline as a JSON string:
curl -X POST http://<CONNECT_HOST>:8083/connectors \
-H "Content-Type: application/json" \
-d '<JSON_STRING_HERE>'| Property | Required | Description |
|---|---|---|
websocket.url |
Yes | WebSocket server URL to connect to. |
topic |
Yes | Kafka topic to publish received messages. |
websocket.subscription.message |
No | Message to send after connection (e.g., subscription). |
websocket.ping.message |
No | Optional ping message to send periodically to keep the WebSocket connection alive. |
websocket.ping.interval.ms |
No | Interval in milliseconds between each ping message. Defaults to 20000 (20 seconds) |
websocket.pong.pattern |
No | Regex pattern to detect pong responses in incoming WebSocket messages. If not provided, pong responses will be sent alongside other messages. |
This project uses:
- Java 17
- Maven
- Kafka Connect API
- Java-WebSocket
- Testcontainers for integration testing
mvn clean verifyThis runs a full build and executes all tests, including integration tests that use Testcontainers to spin up Kafka, Kafka Connect, and a local WebSocket server to validate full end-to-end behavior.
To run integration tests separately:
mvn failsafe:integration-test failsafe:verifyNote: Docker must be available and running for integration tests to work.
WebSocketSourceConnector: Main entry point implementingSourceConnector.WebSocketSourceTask: Handles data streaming logic from WebSocket to Kafka.WebSocketSourceConnectorConfig: HandlesConfigDefinitialisation and definition and validation of connector configurationWebSocketClientFactory: Interface for factories instantiating websocket clients.DefaultWebSocketClientFactory: Creates WebSocket client instances.MessageHandler: Functional interface for handling messages received from a WebSocket connection.
You can release the connector as:
- A GitHub Release (upload the
.jar)
MIT License. See LICENSE for details.