From feca615f61d4cd1d0675b516bf3c9ec7ff40d9e1 Mon Sep 17 00:00:00 2001 From: hxy7yx <1595670487@qq.com> Date: Wed, 1 Apr 2026 20:10:47 -0700 Subject: [PATCH 1/2] feat:add kafka plugin --- .github/workflows/check.yml | 72 +++- CMakeLists.txt | 1 + Install-dependencies.md | 7 + cmake/aarch64-linux-gnu.cmake | 3 +- cmake/arm-linux-gnueabi.cmake | 1 + cmake/arm-linux-gnueabihf.cmake | 3 +- cmake/x86_64-linux-gnu.cmake | 1 + default_plugins.json | 3 +- package-sdk.sh | 4 + plugins/kafka/CMakeLists.txt | 62 ++++ plugins/kafka/kafka.json | 295 +++++++++++++++ plugins/kafka/kafka_config.c | 375 +++++++++++++++++++ plugins/kafka/kafka_config.h | 92 +++++ plugins/kafka/kafka_handle.c | 288 +++++++++++++++ plugins/kafka/kafka_handle.h | 47 +++ plugins/kafka/kafka_plugin.c | 37 ++ plugins/kafka/kafka_plugin.h | 187 ++++++++++ plugins/kafka/kafka_plugin_intf.c | 465 +++++++++++++++++++++++ plugins/kafka/kafka_plugin_intf.h | 27 ++ tests/ft/app/test_kafka.py | 589 ++++++++++++++++++++++++++++++ tests/ft/neuron/api.py | 4 + tests/ft/neuron/config.py | 1 + 22 files changed, 2560 insertions(+), 4 deletions(-) create mode 100644 plugins/kafka/CMakeLists.txt create mode 100644 plugins/kafka/kafka.json create mode 100644 plugins/kafka/kafka_config.c create mode 100644 plugins/kafka/kafka_config.h create mode 100644 plugins/kafka/kafka_handle.c create mode 100644 plugins/kafka/kafka_handle.h create mode 100644 plugins/kafka/kafka_plugin.c create mode 100644 plugins/kafka/kafka_plugin.h create mode 100644 plugins/kafka/kafka_plugin_intf.c create mode 100644 plugins/kafka/kafka_plugin_intf.h create mode 100644 tests/ft/app/test_kafka.py diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index 3fd583ad7..746d6ba35 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -135,6 +135,7 @@ jobs: --ignore=tests/ft/app/test_ekuiper.py \ --ignore=tests/ft/app/test_mqtt.py \ --ignore=tests/ft/app/test_azure.py \ + --ignore=tests/ft/app/test_kafka.py \ --ignore=tests/ft/driver/test_modbus.py \ --ignore=tests/ft/metrics/test_metrics.py \ --ignore=tests/ft/login/test_launch.py \ @@ -164,4 +165,73 @@ jobs: fail_ci_if_error: true token: ${{ secrets.CODECOV_TOKEN }} directory: ./cov_report/ - files: ./cov_report/cov-${{ matrix.plugin }}.info \ No newline at end of file + files: ./cov_report/cov-${{ matrix.plugin }}.info + + pft_kafka: + runs-on: ubuntu-24.04 + container: ghcr.io/neugates/build:x86_64-main + needs: [build] + services: + kafka: + image: apache/kafka:3.7.0 + env: + KAFKA_NODE_ID: "1" + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1" + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: "0" + CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk + ports: + - 9092:9092 + options: >- + --health-cmd "bash -c 'echo > /dev/tcp/localhost/9092'" + --health-interval 10s + --health-timeout 5s + --health-retries 10 + --health-start-period 30s + + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + + - name: setup dependencies + run: | + python3 -m pip install --upgrade pip + pip install -U pytest requests confluent-kafka==2.3.0 + + - uses: actions/download-artifact@v4 + with: + name: neuron-build + path: ./build + + - name: function test + env: + KAFKA_BROKER: kafka:9092 + run: | + chmod -R +x ./build/ + cd build + cmake -DUSE_GCOV=1 .. -DCMAKE_TOOLCHAIN_FILE=../cmake/x86_64-linux-gnu.cmake + cd .. + ldconfig + pytest -s -v tests/ft/app/test_kafka.py + + - name: create cov report + run: | + sudo apt-get update + sudo apt-get -y install lcov + mkdir -p cov_report + ./create_cov_report.sh kafka + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v4 + with: + verbose: true + fail_ci_if_error: true + token: ${{ secrets.CODECOV_TOKEN }} + directory: ./cov_report/ + files: ./cov_report/cov-kafka.info \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index d4a69bd61..f4ae11f82 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -196,6 +196,7 @@ add_subdirectory(plugins/ekuiper) add_subdirectory(plugins/file) add_subdirectory(plugins/monitor) add_subdirectory(plugins/datalayers) +add_subdirectory(plugins/kafka) add_subdirectory(simulator) diff --git a/Install-dependencies.md b/Install-dependencies.md index e1500ae7a..875e54893 100644 --- a/Install-dependencies.md +++ b/Install-dependencies.md @@ -74,6 +74,13 @@ $ ./configure --disable-protoc --enable-shared=no CFLAGS=-fPIC CXXFLAGS=-fPIC $ make && sudo make install ``` +[librdkafka](https://github.com/confluentinc/librdkafka.git) +```shell +$ git clone -b v2.6.1 https://github.com/confluentinc/librdkafka.git +$ cd librdkafka && mkdir build && cd build +$ cmake -DRDKAFKA_BUILD_STATIC=OFF -DRDKAFKA_BUILD_EXAMPLES=OFF -DRDKAFKA_BUILD_TESTS=OFF -DWITH_SASL=OFF -DWITH_ZSTD=OFF -DWITH_CURL=OFF -DCMAKE_BUILD_TYPE=Release .. && make && sudo make install +``` + [libxml2](https://github.com/GNOME/libxml2.git) ```shell $ git clone -b v2.9.14 https://github.com/GNOME/libxml2.git diff --git a/cmake/aarch64-linux-gnu.cmake b/cmake/aarch64-linux-gnu.cmake index 5c81eef25..eee8e77b0 100644 --- a/cmake/aarch64-linux-gnu.cmake +++ b/cmake/aarch64-linux-gnu.cmake @@ -21,4 +21,5 @@ set(CMAKE_FIND_ROOT_PATH_MODE_LIBRARY ONLY) set(CMAKE_FIND_ROOT_PATH_MODE_INCLUDE ONLY) link_directories(${CMAKE_STAGING_PREFIX}) -file(COPY ${CMAKE_STAGING_PREFIX}/lib/libzlog.so.1.2 DESTINATION /usr/local/lib) \ No newline at end of file +file(COPY ${CMAKE_STAGING_PREFIX}/lib/libzlog.so.1.2 DESTINATION /usr/local/lib) +file(COPY ${CMAKE_STAGING_PREFIX}/lib/librdkafka.so.1 DESTINATION /usr/local/lib) \ No newline at end of file diff --git a/cmake/arm-linux-gnueabi.cmake b/cmake/arm-linux-gnueabi.cmake index 812143e6f..b64f61f17 100644 --- a/cmake/arm-linux-gnueabi.cmake +++ b/cmake/arm-linux-gnueabi.cmake @@ -22,3 +22,4 @@ set(CMAKE_FIND_ROOT_PATH_MODE_INCLUDE ONLY) link_directories(${CMAKE_STAGING_PREFIX}) file(COPY ${CMAKE_STAGING_PREFIX}/lib/libzlog.so.1.2 DESTINATION /usr/local/lib) +file(COPY ${CMAKE_STAGING_PREFIX}/lib/librdkafka.so.1 DESTINATION /usr/local/lib) diff --git a/cmake/arm-linux-gnueabihf.cmake b/cmake/arm-linux-gnueabihf.cmake index 203b119c0..122046a51 100644 --- a/cmake/arm-linux-gnueabihf.cmake +++ b/cmake/arm-linux-gnueabihf.cmake @@ -21,4 +21,5 @@ set(CMAKE_FIND_ROOT_PATH_MODE_LIBRARY ONLY) set(CMAKE_FIND_ROOT_PATH_MODE_INCLUDE ONLY) link_directories(${CMAKE_STAGING_PREFIX}) -file(COPY ${CMAKE_STAGING_PREFIX}/lib/libzlog.so.1.2 DESTINATION /usr/local/lib) \ No newline at end of file +file(COPY ${CMAKE_STAGING_PREFIX}/lib/libzlog.so.1.2 DESTINATION /usr/local/lib) +file(COPY ${CMAKE_STAGING_PREFIX}/lib/librdkafka.so.1 DESTINATION /usr/local/lib) \ No newline at end of file diff --git a/cmake/x86_64-linux-gnu.cmake b/cmake/x86_64-linux-gnu.cmake index 5c97384e6..39ce518fd 100644 --- a/cmake/x86_64-linux-gnu.cmake +++ b/cmake/x86_64-linux-gnu.cmake @@ -27,3 +27,4 @@ set(CMAKE_FIND_ROOT_PATH_MODE_INCLUDE ONLY) link_directories(${CMAKE_STAGING_PREFIX}) file(COPY ${CMAKE_STAGING_PREFIX}/lib/libzlog.so.1.2 DESTINATION /usr/local/lib) +file(COPY ${CMAKE_STAGING_PREFIX}/lib/librdkafka.so.1 DESTINATION /usr/local/lib) diff --git a/default_plugins.json b/default_plugins.json index a352f67d7..9d03c2590 100644 --- a/default_plugins.json +++ b/default_plugins.json @@ -6,6 +6,7 @@ "libplugin-ekuiper.so", "libplugin-modbus-tcp.so", "libplugin-modbus-rtu.so", - "libplugin-file.so" + "libplugin-file.so", + "libplugin-kafka.so" ] } \ No newline at end of file diff --git a/package-sdk.sh b/package-sdk.sh index 44332e7a6..d8e8c7363 100755 --- a/package-sdk.sh +++ b/package-sdk.sh @@ -59,6 +59,7 @@ cp build/plugins/schema/ekuiper.json \ build/plugins/schema/mqtt.json \ build/plugins/schema/modbus-tcp.json \ build/plugins/schema/file.json \ + build/plugins/schema/kafka.json \ ${package_name}/plugins/schema/ cp build/plugins/libplugin-ekuiper.so \ @@ -66,8 +67,11 @@ cp build/plugins/libplugin-ekuiper.so \ build/plugins/libplugin-mqtt.so \ build/plugins/libplugin-modbus-tcp.so \ build/plugins/libplugin-file.so \ + build/plugins/libplugin-kafka.so \ ${package_name}/plugins/ +cp /usr/local/lib/librdkafka.so.1 ${package_name}/lib + tar czf ${package_name}-${arch}.tar.gz ${package_name}/ ls ${package_name} rm -rf ${package_name} diff --git a/plugins/kafka/CMakeLists.txt b/plugins/kafka/CMakeLists.txt new file mode 100644 index 000000000..1d9cf304c --- /dev/null +++ b/plugins/kafka/CMakeLists.txt @@ -0,0 +1,62 @@ +cmake_minimum_required(VERSION 3.12) + +project(plugin-kafka) + +set(LIBRARY_OUTPUT_PATH "${CMAKE_BINARY_DIR}/plugins") + +file(COPY ${CMAKE_SOURCE_DIR}/plugins/kafka/kafka.json + DESTINATION ${CMAKE_BINARY_DIR}/plugins/schema/) + +# --- locate librdkafka ------------------------------------------------------- +# Prefer pkg-config (works for system install and staging sysroot). +# Falls back to find_library for minimal environments. +find_package(PkgConfig QUIET) +if(PKG_CONFIG_FOUND) + pkg_check_modules(RDKAFKA IMPORTED_TARGET rdkafka) +endif() + +if(NOT RDKAFKA_FOUND) + find_path(RDKAFKA_INCLUDE_DIR librdkafka/rdkafka.h + HINTS ${CMAKE_STAGING_PREFIX}/include + /usr/local/include + /usr/include) + find_library(RDKAFKA_LIBRARY rdkafka + HINTS ${CMAKE_STAGING_PREFIX}/lib + /usr/local/lib + /usr/lib) + if(RDKAFKA_INCLUDE_DIR AND RDKAFKA_LIBRARY) + set(RDKAFKA_FOUND TRUE) + set(RDKAFKA_INCLUDE_DIRS ${RDKAFKA_INCLUDE_DIR}) + set(RDKAFKA_LIBRARIES ${RDKAFKA_LIBRARY}) + else() + message(FATAL_ERROR + "librdkafka not found. Install librdkafka-dev or set " + "CMAKE_STAGING_PREFIX / PKG_CONFIG_PATH so that rdkafka.h " + "and librdkafka.so are discoverable.") + endif() +endif() + +# --- plugin target ------------------------------------------------------------ +add_library(${PROJECT_NAME} SHARED + kafka_config.c + kafka_handle.c + kafka_plugin.c + kafka_plugin_intf.c +) + +target_include_directories(${PROJECT_NAME} PRIVATE + ${CMAKE_SOURCE_DIR}/include/neuron + ${CMAKE_SOURCE_DIR}/plugins/kafka +) + +if(TARGET PkgConfig::RDKAFKA) + target_link_libraries(${PROJECT_NAME} neuron-base PkgConfig::RDKAFKA) +else() + target_include_directories(${PROJECT_NAME} PRIVATE ${RDKAFKA_INCLUDE_DIRS}) + target_link_libraries(${PROJECT_NAME} neuron-base ${RDKAFKA_LIBRARIES}) +endif() + +target_link_libraries(${PROJECT_NAME} ${CMAKE_THREAD_LIBS_INIT}) + +# librdkafka header triggers -Wignored-qualifiers on some versions +target_compile_options(${PROJECT_NAME} PRIVATE -Wno-ignored-qualifiers) diff --git a/plugins/kafka/kafka.json b/plugins/kafka/kafka.json new file mode 100644 index 000000000..034449c23 --- /dev/null +++ b/plugins/kafka/kafka.json @@ -0,0 +1,295 @@ +{ + "broker": { + "name": "Broker Address", + "name_zh": "Broker 地址", + "description": "Kafka bootstrap server address, e.g. 127.0.0.1:9092. Multiple brokers separated by commas.", + "description_zh": "Kafka bootstrap server 地址,如 127.0.0.1:9092。多个 broker 用逗号分隔。", + "attribute": "required", + "type": "string", + "default": "127.0.0.1:9092", + "valid": { + "length": 1024 + } + }, + "topic": { + "name": "Default Topic", + "name_zh": "默认主题", + "description": "Default Kafka topic for publishing data.", + "description_zh": "发布数据的默认 Kafka 主题。", + "attribute": "required", + "type": "string", + "default": "neuron-data", + "valid": { + "length": 255 + } + }, + "format": { + "name": "Upload Format", + "name_zh": "上报数据格式", + "description": "JSON format of reported data. Values-format splits data into values and errors. Tags-format puts tags in a single array.", + "description_zh": "上报数据的 JSON 格式。Values-format 下数据分为 values 和 errors。Tags-format 下数据放在一个数组中。", + "attribute": "required", + "type": "map", + "default": 0, + "valid": { + "map": [ + { + "key": "values-format", + "value": 0 + }, + { + "key": "tags-format", + "value": 1 + } + ] + } + }, + "upload_err": { + "name": "Upload Tag Error Code", + "name_zh": "上报点位错误码", + "description": "When data tag collection reports an error, report the tag error code.", + "description_zh": "点位采集报错时,上报点位错误码。", + "attribute": "optional", + "type": "bool", + "default": true, + "valid": {} + }, + "compression": { + "name": "Compression", + "name_zh": "压缩算法", + "description": "Compression codec for message batches.", + "description_zh": "消息批次的压缩编码。", + "attribute": "optional", + "type": "map", + "default": 0, + "valid": { + "map": [ + { + "key": "none", + "value": 0 + }, + { + "key": "gzip", + "value": 1 + }, + { + "key": "snappy", + "value": 2 + }, + { + "key": "lz4", + "value": 3 + }, + { + "key": "zstd", + "value": 4 + } + ] + } +}, + "batch-max-messages": { + "name": "Batch Max Messages", + "name_zh": "批量最大消息数", + "description": "Maximum number of messages batched in one produce request.", + "description_zh": "一次发送请求中的最大消息数量。", + "attribute": "optional", + "type": "int", + "default": 10000, + "valid": { + "min": 1, + "max": 1000000 + } + }, + "linger-ms": { + "name": "Linger (ms)", + "name_zh": "发送等待时间(ms)", + "description": "Delay to wait for more messages before sending a batch. Larger values increase throughput at the cost of latency.", + "description_zh": "发送前等待更多消息的延迟。更大的值可提高吞吐量但增加延迟。", + "attribute": "optional", + "type": "int", + "default": 5, + "valid": { + "min": 0, + "max": 60000 + } + }, + "security-protocol": { + "name": "Security Protocol", + "name_zh": "安全协议", + "description": "Protocol used to communicate with Kafka brokers.", + "description_zh": "与 Kafka Broker 通信时使用的安全协议。", + "attribute": "optional", + "type": "map", + "default": 0, + "valid": { + "map": [ + { + "key": "PLAINTEXT", + "value": 0 + }, + { + "key": "SASL_PLAINTEXT", + "value": 1 + }, + { + "key": "SSL", + "value": 2 + }, + { + "key": "SASL_SSL", + "value": 3 + } + ] + } + }, + "sasl-mechanism": { + "name": "SASL Mechanism", + "name_zh": "SASL 机制", + "attribute": "optional", + "type": "map", + "default": 0, + "condition": { + "field": "security-protocol", + "values": [1, 3] + }, + "valid": { + "map": [ + { + "key": "PLAIN", + "value": 0 + }, + { + "key": "SCRAM-SHA-256", + "value": 1 + }, + { + "key": "SCRAM-SHA-512", + "value": 2 + } + ] + } + }, + "sasl-username": { + "name": "SASL Username", + "name_zh": "SASL 用户名", + "attribute": "optional", + "type": "string", + "condition": { + "field": "security-protocol", + "values": [1, 3] + }, + "default": "", + "valid": { + "length": 255 + } + }, + "sasl-password": { + "name": "SASL Password", + "name_zh": "SASL 密码", + "attribute": "optional", + "type": "string", + "condition": { + "field": "security-protocol", + "values": [1, 3] + }, + "default": "", + "valid": { + "length": 255 + } + }, + "ssl-ca": { + "name": "CA Certificate", + "name_zh": "CA 证书", + "description": "CA certificate for verifying the broker's certificate.", + "description_zh": "用于验证 Broker 证书的 CA 证书。", + "attribute": "optional", + "type": "file", + "condition": { + "field": "security-protocol", + "values": [2, 3] + }, + "valid": { + "length": 81960 + } + }, + "ssl-cert": { + "name": "Client Certificate", + "name_zh": "客户端证书", + "description": "Client certificate for mutual TLS authentication.", + "description_zh": "用于双向 TLS 认证的客户端证书。", + "attribute": "optional", + "type": "file", + "condition": { + "field": "security-protocol", + "values": [2, 3] + }, + "valid": { + "length": 81960 + } + }, + "ssl-key": { + "name": "Client Private Key", + "name_zh": "客户端私钥", + "description": "Client private key for mutual TLS authentication.", + "description_zh": "用于双向 TLS 认证的客户端私钥。", + "attribute": "optional", + "type": "file", + "condition": { + "field": "security-protocol", + "values": [2, 3] + }, + "valid": { + "length": 81960 + } + }, + "message-timeout-ms": { + "name": "Message Timeout (ms)", + "name_zh": "消息超时时间(ms)", + "description": "Local message timeout. This is the maximum time librdkafka may use to deliver a message (including retries).", + "description_zh": "本地消息超时时间。librdkafka 用于投递消息(含重试)的最大时间。", + "attribute": "optional", + "type": "int", + "default": 5000, + "valid": { + "min": 1000, + "max": 300000 + } + }, + "acks": { + "name": "Acknowledgements", + "name_zh": "确认级别", + "description": "Number of broker acknowledgements required. -1 = all in-sync replicas, 0 = no ack, 1 = leader only.", + "description_zh": "要求的 broker 确认数。-1 = 所有同步副本,0 = 无需确认,1 = 仅 leader。", + "attribute": "optional", + "type": "map", + "default": -1, + "valid": { + "map": [ + { + "key": "all", + "value": -1 + }, + { + "key": "none", + "value": 0 + }, + { + "key": "leader", + "value": 1 + } + ] + } + }, + "client-id": { + "name": "Client ID", + "name_zh": "客户端标识", + "description": "Client identifier string passed to Kafka broker for logging and monitoring.", + "description_zh": "传递给 Kafka broker 的客户端标识,用于日志和监控。", + "attribute": "optional", + "type": "string", + "default": "", + "valid": { + "length": 255 + } + } +} diff --git a/plugins/kafka/kafka_config.c b/plugins/kafka/kafka_config.c new file mode 100644 index 000000000..3e31a59d0 --- /dev/null +++ b/plugins/kafka/kafka_config.c @@ -0,0 +1,375 @@ +/** + * NEURON IIoT System for Industry 4.0 + * Copyright (C) 2020-2024 EMQ Technologies Co., Ltd All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + **/ + +#include + +#include "neuron.h" +#include "json/json.h" +#include "json/neu_json_param.h" + +#include "kafka_config.h" +#include "kafka_plugin.h" + +const char *kafka_security_protocol_str(kafka_security_protocol_e p) +{ + switch (p) { + case KAFKA_SECURITY_PLAINTEXT: + return "plaintext"; + case KAFKA_SECURITY_SASL_PLAINTEXT: + return "sasl_plaintext"; + case KAFKA_SECURITY_SSL: + return "ssl"; + case KAFKA_SECURITY_SASL_SSL: + return "sasl_ssl"; + default: + return "plaintext"; + } +} + +const char *kafka_sasl_mechanism_str(kafka_sasl_mechanism_e m) +{ + switch (m) { + case KAFKA_SASL_PLAIN: + return "PLAIN"; + case KAFKA_SASL_SCRAM_SHA_256: + return "SCRAM-SHA-256"; + case KAFKA_SASL_SCRAM_SHA_512: + return "SCRAM-SHA-512"; + default: + return "PLAIN"; + } +} + +const char *kafka_compression_str(kafka_compression_e c) +{ + switch (c) { + case KAFKA_COMPRESS_NONE: + return "none"; + case KAFKA_COMPRESS_GZIP: + return "gzip"; + case KAFKA_COMPRESS_SNAPPY: + return "snappy"; + case KAFKA_COMPRESS_LZ4: + return "lz4"; + case KAFKA_COMPRESS_ZSTD: + return "zstd"; + default: + return "none"; + } +} + +static int parse_ssl_params(neu_plugin_t *plugin, const char *setting, + kafka_config_t *config) +{ + if (config->security_protocol != KAFKA_SECURITY_SSL && + config->security_protocol != KAFKA_SECURITY_SASL_SSL) { + return 0; + } + + neu_json_elem_t ssl_ca = { .name = "ssl-ca", .t = NEU_JSON_STR }; + int ret = neu_parse_param(setting, NULL, 1, &ssl_ca); + if (0 == ret) { + int len = 0; + char *s = (char *) neu_decode64(&len, ssl_ca.v.val_str); + free(ssl_ca.v.val_str); + if (s && len > 0) { + config->ssl_ca = s; + } else { + free(s); + plog_notice(plugin, "setting ssl-ca decode fail or empty"); + } + } + + neu_json_elem_t ssl_cert = { .name = "ssl-cert", .t = NEU_JSON_STR }; + ret = neu_parse_param(setting, NULL, 1, &ssl_cert); + if (0 == ret) { + int len = 0; + char *s = (char *) neu_decode64(&len, ssl_cert.v.val_str); + free(ssl_cert.v.val_str); + if (s && len > 0) { + config->ssl_cert = s; + } else { + free(s); + plog_notice(plugin, "setting ssl-cert decode fail or empty"); + } + } + + neu_json_elem_t ssl_key = { .name = "ssl-key", .t = NEU_JSON_STR }; + ret = neu_parse_param(setting, NULL, 1, &ssl_key); + if (0 == ret) { + int len = 0; + char *s = (char *) neu_decode64(&len, ssl_key.v.val_str); + free(ssl_key.v.val_str); + if (s && len > 0) { + config->ssl_key = s; + } else { + free(s); + plog_notice(plugin, "setting ssl-key decode fail or empty"); + } + } + + return 0; +} + +static int parse_sasl_params(neu_plugin_t *plugin, const char *setting, + kafka_config_t *config) +{ + if (config->security_protocol != KAFKA_SECURITY_SASL_PLAINTEXT && + config->security_protocol != KAFKA_SECURITY_SASL_SSL) { + return 0; + } + + neu_json_elem_t mechanism = { + .name = "sasl-mechanism", + .t = NEU_JSON_INT, + .v.val_int = KAFKA_SASL_PLAIN, + .attribute = NEU_JSON_ATTRIBUTE_OPTIONAL, + }; + neu_parse_param(setting, NULL, 1, &mechanism); + config->sasl_mechanism = mechanism.v.val_int; + + neu_json_elem_t username = { .name = "sasl-username", .t = NEU_JSON_STR }; + int ret = neu_parse_param(setting, NULL, 1, &username); + if (0 == ret) { + config->sasl_username = username.v.val_str; + } else { + plog_error(plugin, "setting no sasl-username"); + return -1; + } + + neu_json_elem_t password = { .name = "sasl-password", .t = NEU_JSON_STR }; + ret = neu_parse_param(setting, NULL, 1, &password); + if (0 == ret) { + config->sasl_password = password.v.val_str; + } else { + plog_error(plugin, "setting no sasl-password"); + return -1; + } + + return 0; +} + +int kafka_config_parse(neu_plugin_t *plugin, const char *setting, + kafka_config_t *config) +{ + int ret = 0; + char * err_param = NULL; + const char *placeholder = "********"; + + neu_json_elem_t broker = { .name = "broker", .t = NEU_JSON_STR }; + neu_json_elem_t topic = { .name = "topic", .t = NEU_JSON_STR }; + neu_json_elem_t format = { .name = "format", .t = NEU_JSON_INT }; + + neu_json_elem_t upload_err = { + .name = "upload_err", + .t = NEU_JSON_BOOL, + .v.val_bool = true, + .attribute = NEU_JSON_ATTRIBUTE_OPTIONAL, + }; + neu_json_elem_t compression = { + .name = "compression", + .t = NEU_JSON_INT, + .v.val_int = KAFKA_COMPRESS_NONE, + .attribute = NEU_JSON_ATTRIBUTE_OPTIONAL, + }; + neu_json_elem_t batch_max = { + .name = "batch-max-messages", + .t = NEU_JSON_INT, + .v.val_int = 10000, + .attribute = NEU_JSON_ATTRIBUTE_OPTIONAL, + }; + neu_json_elem_t linger = { + .name = "linger-ms", + .t = NEU_JSON_INT, + .v.val_int = 5, + .attribute = NEU_JSON_ATTRIBUTE_OPTIONAL, + }; + neu_json_elem_t security = { + .name = "security-protocol", + .t = NEU_JSON_INT, + .v.val_int = KAFKA_SECURITY_PLAINTEXT, + .attribute = NEU_JSON_ATTRIBUTE_OPTIONAL, + }; + neu_json_elem_t msg_timeout = { + .name = "message-timeout-ms", + .t = NEU_JSON_INT, + .v.val_int = 5000, + .attribute = NEU_JSON_ATTRIBUTE_OPTIONAL, + }; + neu_json_elem_t acks_param = { + .name = "acks", + .t = NEU_JSON_INT, + .v.val_int = -1, + .attribute = NEU_JSON_ATTRIBUTE_OPTIONAL, + }; + neu_json_elem_t client_id = { + .name = "client-id", + .t = NEU_JSON_STR, + .attribute = NEU_JSON_ATTRIBUTE_OPTIONAL, + }; + + if (NULL == setting || NULL == config) { + plog_error(plugin, "invalid argument, null pointer"); + return -1; + } + + ret = neu_parse_param(setting, &err_param, 3, &broker, &topic, &format); + if (0 != ret) { + plog_error(plugin, "parsing setting fail, key: `%s`", err_param); + free(err_param); + free(broker.v.val_str); + free(topic.v.val_str); + return -1; + } + + config->broker = broker.v.val_str; + config->topic = topic.v.val_str; + config->format = format.v.val_int; + + if (0 == strlen(config->broker)) { + plog_error(plugin, "setting empty broker"); + goto error; + } + + if (0 == strlen(config->topic)) { + plog_error(plugin, "setting empty topic"); + goto error; + } + + if (config->format != KAFKA_UPLOAD_FORMAT_VALUES && + config->format != KAFKA_UPLOAD_FORMAT_TAGS) { + plog_error(plugin, "setting invalid format: %" PRIi64, + (int64_t) config->format); + goto error; + } + + neu_parse_param(setting, NULL, 1, &upload_err); + neu_parse_param(setting, NULL, 1, &compression); + neu_parse_param(setting, NULL, 1, &batch_max); + neu_parse_param(setting, NULL, 1, &linger); + neu_parse_param(setting, NULL, 1, &security); + neu_parse_param(setting, NULL, 1, &msg_timeout); + neu_parse_param(setting, NULL, 1, &acks_param); + neu_parse_param(setting, NULL, 1, &client_id); + + config->upload_err = upload_err.v.val_bool; + config->compression = compression.v.val_int; + config->batch_max_messages = batch_max.v.val_int; + config->linger_ms = linger.v.val_int; + config->security_protocol = security.v.val_int; + config->message_timeout_ms = msg_timeout.v.val_int; + config->acks = acks_param.v.val_int; + config->client_id = client_id.v.val_str; + + if (config->compression < KAFKA_COMPRESS_NONE || + config->compression > KAFKA_COMPRESS_ZSTD) { + plog_error(plugin, "setting invalid compression: %d", + config->compression); + goto error; + } + + if (config->batch_max_messages < 1 || + config->batch_max_messages > 1000000) { + plog_error(plugin, "setting invalid batch-max-messages: %d", + config->batch_max_messages); + goto error; + } + + if (config->linger_ms < 0 || config->linger_ms > 60000) { + plog_error(plugin, "setting invalid linger-ms: %d", config->linger_ms); + goto error; + } + + if (config->security_protocol < KAFKA_SECURITY_PLAINTEXT || + config->security_protocol > KAFKA_SECURITY_SASL_SSL) { + plog_error(plugin, "setting invalid security-protocol: %d", + config->security_protocol); + goto error; + } + + if (config->message_timeout_ms < 1000 || + config->message_timeout_ms > 300000) { + plog_error(plugin, "setting invalid message-timeout-ms: %d", + config->message_timeout_ms); + goto error; + } + + if (config->acks != -1 && config->acks != 0 && config->acks != 1) { + plog_error(plugin, "setting invalid acks: %d", config->acks); + goto error; + } + + ret = parse_sasl_params(plugin, setting, config); + if (0 != ret) { + goto error; + } + + ret = parse_ssl_params(plugin, setting, config); + if (0 != ret) { + goto error; + } + + plog_notice(plugin, "config broker : %s", config->broker); + plog_notice(plugin, "config topic : %s", config->topic); + plog_notice(plugin, "config format : %d", config->format); + plog_notice(plugin, "config upload_err : %d", config->upload_err); + plog_notice(plugin, "config compression : %s", + kafka_compression_str(config->compression)); + plog_notice(plugin, "config batch-max-messages : %d", + config->batch_max_messages); + plog_notice(plugin, "config linger-ms : %d", config->linger_ms); + plog_notice(plugin, "config security-protocol : %s", + kafka_security_protocol_str(config->security_protocol)); + if (config->sasl_username) { + plog_notice(plugin, "config sasl-username : %s", + config->sasl_username); + } + if (config->sasl_password) { + plog_notice(plugin, "config sasl-password : %s", placeholder); + } + if (config->ssl_ca) { + plog_notice(plugin, "config ssl-ca : %s", placeholder); + } + plog_notice(plugin, "config message-timeout-ms : %d", + config->message_timeout_ms); + plog_notice(plugin, "config acks : %d", config->acks); + if (config->client_id) { + plog_notice(plugin, "config client-id : %s", + config->client_id); + } + + return 0; + +error: + kafka_config_fini(config); + return -1; +} + +void kafka_config_fini(kafka_config_t *config) +{ + free(config->broker); + free(config->topic); + free(config->sasl_username); + free(config->sasl_password); + free(config->ssl_ca); + free(config->ssl_cert); + free(config->ssl_key); + free(config->client_id); + memset(config, 0, sizeof(*config)); +} diff --git a/plugins/kafka/kafka_config.h b/plugins/kafka/kafka_config.h new file mode 100644 index 000000000..fb1e94b2d --- /dev/null +++ b/plugins/kafka/kafka_config.h @@ -0,0 +1,92 @@ +/** + * NEURON IIoT System for Industry 4.0 + * Copyright (C) 2020-2024 EMQ Technologies Co., Ltd All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + **/ + +#ifndef NEURON_PLUGIN_KAFKA_CONFIG_H +#define NEURON_PLUGIN_KAFKA_CONFIG_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include + +#include "plugin.h" + +typedef enum { + KAFKA_UPLOAD_FORMAT_VALUES = 0, + KAFKA_UPLOAD_FORMAT_TAGS = 1, +} kafka_upload_format_e; + +typedef enum { + KAFKA_COMPRESS_NONE = 0, + KAFKA_COMPRESS_GZIP = 1, + KAFKA_COMPRESS_SNAPPY = 2, + KAFKA_COMPRESS_LZ4 = 3, + KAFKA_COMPRESS_ZSTD = 4, +} kafka_compression_e; + +typedef enum { + KAFKA_SECURITY_PLAINTEXT = 0, + KAFKA_SECURITY_SASL_PLAINTEXT = 1, + KAFKA_SECURITY_SSL = 2, + KAFKA_SECURITY_SASL_SSL = 3, +} kafka_security_protocol_e; + +typedef enum { + KAFKA_SASL_PLAIN = 0, + KAFKA_SASL_SCRAM_SHA_256 = 1, + KAFKA_SASL_SCRAM_SHA_512 = 2, +} kafka_sasl_mechanism_e; + +typedef struct { + char *broker; + char *topic; + + kafka_upload_format_e format; + bool upload_err; + kafka_compression_e compression; + int batch_max_messages; + int linger_ms; + kafka_security_protocol_e security_protocol; + kafka_sasl_mechanism_e sasl_mechanism; + char * sasl_username; + char * sasl_password; + char * ssl_ca; + char * ssl_cert; + char * ssl_key; + + int message_timeout_ms; + int acks; + char *client_id; +} kafka_config_t; + +int kafka_config_parse(neu_plugin_t *plugin, const char *setting, + kafka_config_t *config); +void kafka_config_fini(kafka_config_t *config); + +const char *kafka_security_protocol_str(kafka_security_protocol_e p); +const char *kafka_sasl_mechanism_str(kafka_sasl_mechanism_e m); +const char *kafka_compression_str(kafka_compression_e c); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/plugins/kafka/kafka_handle.c b/plugins/kafka/kafka_handle.c new file mode 100644 index 000000000..9e1bcf2f8 --- /dev/null +++ b/plugins/kafka/kafka_handle.c @@ -0,0 +1,288 @@ +/** + * NEURON IIoT System for Industry 4.0 + * Copyright (C) 2020-2024 EMQ Technologies Co., Ltd All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + **/ + +#include + +#include "errcodes.h" +#include "json/neu_json_fn.h" +#include "json/neu_json_rw.h" + +#include "kafka_handle.h" +#include "kafka_plugin.h" + +static int tag_values_to_json(UT_array *tags, neu_json_read_resp_t *json, + bool filter_error) +{ + int index = 0; + int n_valid = 0; + + if (filter_error) { + utarray_foreach(tags, neu_resp_tag_value_meta_t *, tv) + { + if (tv->value.type != NEU_TYPE_ERROR) { + n_valid += 1; + } + } + } else { + n_valid = utarray_len(tags); + } + + if (0 == n_valid) { + return 0; + } + + json->n_tag = n_valid; + json->tags = (neu_json_read_resp_tag_t *) calloc( + json->n_tag, sizeof(neu_json_read_resp_tag_t)); + if (NULL == json->tags) { + return -1; + } + + utarray_foreach(tags, neu_resp_tag_value_meta_t *, tv) + { + if (filter_error && tv->value.type == NEU_TYPE_ERROR) { + continue; + } + neu_tag_value_to_json(tv, &json->tags[index]); + index += 1; + } + + return 0; +} + +static char *generate_upload_json(neu_plugin_t * plugin, + neu_reqresp_trans_data_t *data, bool *skip) +{ + char *json_str = NULL; + + neu_json_read_periodic_t header = { + .group = (char *) data->group, + .node = (char *) data->driver, + .timestamp = global_timestamp, + }; + neu_json_read_resp_t json = { 0 }; + + if (0 != + tag_values_to_json(data->tags, &json, !plugin->config.upload_err)) { + plog_error(plugin, "tag_values_to_json fail"); + return NULL; + } + + if (0 == json.n_tag) { + *skip = true; + return NULL; + } + + if (plugin->config.format == KAFKA_UPLOAD_FORMAT_VALUES) { + neu_json_encode_with_mqtt(&json, neu_json_encode_read_resp1, &header, + neu_json_encode_read_periodic_resp, + &json_str); + } else { + neu_json_encode_with_mqtt(&json, neu_json_encode_read_resp2, &header, + neu_json_encode_read_periodic_resp, + &json_str); + } + + for (int i = 0; i < json.n_tag; i++) { + if (json.tags[i].n_meta > 0) { + free(json.tags[i].metas); + } + } + + if (json.tags) { + free(json.tags); + } + return json_str; +} + +static int kafka_produce(neu_plugin_t *plugin, const char *topic, char *payload, + size_t len) +{ + if (NULL == plugin->rk) { + return -1; + } + + rd_kafka_resp_err_t err = + rd_kafka_producev(plugin->rk, RD_KAFKA_V_TOPIC(topic), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + RD_KAFKA_V_VALUE(payload, len), + RD_KAFKA_V_OPAQUE(plugin), RD_KAFKA_V_END); + + if (err) { + if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) { + plog_warn(plugin, "produce queue full, dropping message"); + } else { + plog_error(plugin, "produce failed: %s", rd_kafka_err2str(err)); + } + return -1; + } + + rd_kafka_poll(plugin->rk, 0); + return 0; +} + +int handle_trans_data(neu_plugin_t *plugin, neu_reqresp_trans_data_t *data) +{ + int rv = 0; + + if (NULL == plugin->rk) { + return NEU_ERR_PLUGIN_NOT_RUNNING; + } + + const kafka_route_entry_t *route = + kafka_route_tbl_get(&plugin->route_tbl, data->driver, data->group); + + const char *topic = route ? route->topic : plugin->config.topic; + + bool skip = false; + char *json_str = generate_upload_json(plugin, data, &skip); + + if (skip) { + return 0; + } + + if (NULL == json_str) { + plog_error(plugin, "generate upload json fail for driver:%s group:%s", + data->driver, data->group); + return NEU_ERR_EINTERNAL; + } + + size_t json_len = strlen(json_str); + rv = kafka_produce(plugin, topic, json_str, json_len); + + if (0 == rv) { + NEU_PLUGIN_UPDATE_METRIC(plugin, NEU_METRIC_SEND_BYTES_5S, + (int64_t) json_len, NULL); + NEU_PLUGIN_UPDATE_METRIC(plugin, NEU_METRIC_SEND_BYTES_30S, + (int64_t) json_len, NULL); + NEU_PLUGIN_UPDATE_METRIC(plugin, NEU_METRIC_SEND_BYTES_60S, + (int64_t) json_len, NULL); + } + + free(json_str); + return rv == 0 ? NEU_ERR_SUCCESS : NEU_ERR_PLUGIN_NOT_RUNNING; +} + +int handle_subscribe_group(neu_plugin_t *plugin, neu_req_subscribe_t *sub) +{ + int rv = 0; + char *topic = NULL; + + neu_json_elem_t el = { .name = "topic", .t = NEU_JSON_STR }; + if (NULL != sub->params && + 0 == neu_parse_param(sub->params, NULL, 1, &el)) { + topic = el.v.val_str; + } else { + topic = strdup(plugin->config.topic); + } + + if (NULL == topic) { + rv = NEU_ERR_EINTERNAL; + goto end; + } + + rv = + kafka_route_tbl_add(&plugin->route_tbl, sub->driver, sub->group, topic); + if (0 != rv) { + plog_error(plugin, "route driver:%s group:%s fail", sub->driver, + sub->group); + goto end; + } + + plog_notice(plugin, "route driver:%s group:%s to topic:%s", sub->driver, + sub->group, topic); + +end: + free(sub->params); + free(sub->static_tags); + return rv; +} + +int handle_update_subscribe(neu_plugin_t *plugin, neu_req_subscribe_t *sub) +{ + int rv = 0; + + if (NULL == sub->params) { + rv = NEU_ERR_GROUP_PARAMETER_INVALID; + goto end; + } + + neu_json_elem_t el = { .name = "topic", .t = NEU_JSON_STR }; + if (0 != neu_parse_param(sub->params, NULL, 1, &el)) { + plog_error(plugin, "parse topic fail from `%s`", sub->params); + rv = NEU_ERR_GROUP_PARAMETER_INVALID; + goto end; + } + + rv = kafka_route_tbl_update(&plugin->route_tbl, sub->driver, sub->group, + el.v.val_str); + if (0 != rv) { + plog_error(plugin, "update route driver:%s group:%s fail", sub->driver, + sub->group); + goto end; + } + + plog_notice(plugin, "update route driver:%s group:%s to topic:%s", + sub->driver, sub->group, el.v.val_str); + +end: + free(sub->params); + free(sub->static_tags); + return rv; +} + +int handle_unsubscribe_group(neu_plugin_t *plugin, neu_req_unsubscribe_t *unsub) +{ + kafka_route_tbl_del(&plugin->route_tbl, unsub->driver, unsub->group); + plog_notice(plugin, "del route driver:%s group:%s", unsub->driver, + unsub->group); + return 0; +} + +int handle_update_group(neu_plugin_t *plugin, neu_req_update_group_t *req) +{ + kafka_route_tbl_update_group(&plugin->route_tbl, req->driver, req->group, + req->new_name); + plog_notice(plugin, "update route driver:%s group:%s to %s", req->driver, + req->group, req->new_name); + return 0; +} + +int handle_del_group(neu_plugin_t *plugin, neu_req_del_group_t *req) +{ + kafka_route_tbl_del(&plugin->route_tbl, req->driver, req->group); + plog_notice(plugin, "del route driver:%s group:%s", req->driver, + req->group); + return 0; +} + +int handle_update_driver(neu_plugin_t *plugin, neu_req_update_node_t *req) +{ + kafka_route_tbl_update_driver(&plugin->route_tbl, req->node, req->new_name); + plog_notice(plugin, "update route driver:%s to %s", req->node, + req->new_name); + return 0; +} + +int handle_del_driver(neu_plugin_t *plugin, neu_reqresp_node_deleted_t *req) +{ + kafka_route_tbl_del_driver(&plugin->route_tbl, req->node); + plog_notice(plugin, "del route driver:%s", req->node); + return 0; +} diff --git a/plugins/kafka/kafka_handle.h b/plugins/kafka/kafka_handle.h new file mode 100644 index 000000000..a799077c7 --- /dev/null +++ b/plugins/kafka/kafka_handle.h @@ -0,0 +1,47 @@ +/** + * NEURON IIoT System for Industry 4.0 + * Copyright (C) 2020-2024 EMQ Technologies Co., Ltd All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + **/ + +#ifndef NEURON_PLUGIN_KAFKA_HANDLE_H +#define NEURON_PLUGIN_KAFKA_HANDLE_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "kafka_config.h" +#include "neuron.h" + +int handle_trans_data(neu_plugin_t *plugin, neu_reqresp_trans_data_t *data); + +int handle_subscribe_group(neu_plugin_t *plugin, neu_req_subscribe_t *sub); +int handle_update_subscribe(neu_plugin_t *plugin, neu_req_subscribe_t *sub); +int handle_unsubscribe_group(neu_plugin_t * plugin, + neu_req_unsubscribe_t *unsub); + +int handle_update_group(neu_plugin_t *plugin, neu_req_update_group_t *req); +int handle_del_group(neu_plugin_t *plugin, neu_req_del_group_t *req); + +int handle_update_driver(neu_plugin_t *plugin, neu_req_update_node_t *req); +int handle_del_driver(neu_plugin_t *plugin, neu_reqresp_node_deleted_t *req); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/plugins/kafka/kafka_plugin.c b/plugins/kafka/kafka_plugin.c new file mode 100644 index 000000000..fb222f726 --- /dev/null +++ b/plugins/kafka/kafka_plugin.c @@ -0,0 +1,37 @@ +/** + * NEURON IIoT System for Industry 4.0 + * Copyright (C) 2020-2024 EMQ Technologies Co., Ltd All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + **/ + +#include "kafka_plugin.h" +#include "kafka_plugin_intf.h" + +#define DESCRIPTION "Northbound Kafka plugin based on librdkafka." +#define DESCRIPTION_ZH "基于 librdkafka 的北向应用 Kafka 插件" + +const neu_plugin_module_t neu_plugin_module = { + .version = NEURON_PLUGIN_VER_1_0, + .schema = "kafka", + .module_name = "Kafka", + .module_descr = DESCRIPTION, + .module_descr_zh = DESCRIPTION_ZH, + .intf_funs = &kafka_plugin_intf_funs, + .kind = NEU_PLUGIN_KIND_SYSTEM, + .type = NEU_NA_TYPE_APP, + .display = true, + .single = false, +}; diff --git a/plugins/kafka/kafka_plugin.h b/plugins/kafka/kafka_plugin.h new file mode 100644 index 000000000..25f7c53e6 --- /dev/null +++ b/plugins/kafka/kafka_plugin.h @@ -0,0 +1,187 @@ +/** + * NEURON IIoT System for Industry 4.0 + * Copyright (C) 2020-2024 EMQ Technologies Co., Ltd All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + **/ + +#ifndef NEURON_PLUGIN_KAFKA_H +#define NEURON_PLUGIN_KAFKA_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +#include "neuron.h" + +#include "kafka_config.h" + +typedef struct { + char driver[NEU_NODE_NAME_LEN]; + char group[NEU_GROUP_NAME_LEN]; +} kafka_route_key_t; + +typedef struct { + kafka_route_key_t key; + char * topic; + + UT_hash_handle hh; +} kafka_route_entry_t; + +struct neu_plugin { + neu_plugin_common_t common; + neu_events_t * events; + neu_event_timer_t * poll_timer; + kafka_config_t config; + + rd_kafka_t *rk; + bool connected; + int64_t last_reconn_check_ms; + + int64_t delivery_succ; + int64_t delivery_fail; + + kafka_route_entry_t *route_tbl; +}; + +/* route table helpers */ + +static inline void kafka_route_entry_free(kafka_route_entry_t *e) +{ + free(e->topic); + free(e); +} + +static inline void kafka_route_tbl_free(kafka_route_entry_t *tbl) +{ + kafka_route_entry_t *e = NULL, *tmp = NULL; + HASH_ITER(hh, tbl, e, tmp) + { + HASH_DEL(tbl, e); + kafka_route_entry_free(e); + } +} + +static inline kafka_route_entry_t * +kafka_route_tbl_get(kafka_route_entry_t **tbl, const char *driver, + const char *group) +{ + kafka_route_entry_t *find = NULL; + kafka_route_key_t key = { 0 }; + + strncpy(key.driver, driver, sizeof(key.driver)); + strncpy(key.group, group, sizeof(key.group)); + + HASH_FIND(hh, *tbl, &key, sizeof(key), find); + return find; +} + +static inline int kafka_route_tbl_add(kafka_route_entry_t **tbl, + const char *driver, const char *group, + char *topic) +{ + kafka_route_entry_t *find = kafka_route_tbl_get(tbl, driver, group); + if (find) { + free(topic); + return NEU_ERR_GROUP_ALREADY_SUBSCRIBED; + } + + find = calloc(1, sizeof(*find)); + if (NULL == find) { + free(topic); + return NEU_ERR_EINTERNAL; + } + + strncpy(find->key.driver, driver, sizeof(find->key.driver)); + strncpy(find->key.group, group, sizeof(find->key.group)); + find->topic = topic; + HASH_ADD(hh, *tbl, key, sizeof(find->key), find); + + return 0; +} + +static inline int kafka_route_tbl_update(kafka_route_entry_t **tbl, + const char *driver, const char *group, + char *topic) +{ + kafka_route_entry_t *find = kafka_route_tbl_get(tbl, driver, group); + if (NULL == find) { + free(topic); + return NEU_ERR_GROUP_NOT_SUBSCRIBE; + } + + free(find->topic); + find->topic = topic; + return 0; +} + +static inline void kafka_route_tbl_update_driver(kafka_route_entry_t **tbl, + const char * driver, + const char * new_name) +{ + kafka_route_entry_t *e = NULL, *tmp = NULL; + HASH_ITER(hh, *tbl, e, tmp) + { + if (0 == strcmp(e->key.driver, driver)) { + HASH_DEL(*tbl, e); + strncpy(e->key.driver, new_name, sizeof(e->key.driver)); + HASH_ADD(hh, *tbl, key, sizeof(e->key), e); + } + } +} + +static inline void kafka_route_tbl_update_group(kafka_route_entry_t **tbl, + const char * driver, + const char * group, + const char * new_name) +{ + kafka_route_entry_t *e = kafka_route_tbl_get(tbl, driver, group); + if (e) { + HASH_DEL(*tbl, e); + strncpy(e->key.group, new_name, sizeof(e->key.group)); + HASH_ADD(hh, *tbl, key, sizeof(e->key), e); + } +} + +static inline void kafka_route_tbl_del_driver(kafka_route_entry_t **tbl, + const char * driver) +{ + kafka_route_entry_t *e = NULL, *tmp = NULL; + HASH_ITER(hh, *tbl, e, tmp) + { + if (0 == strcmp(e->key.driver, driver)) { + HASH_DEL(*tbl, e); + kafka_route_entry_free(e); + } + } +} + +static inline void kafka_route_tbl_del(kafka_route_entry_t **tbl, + const char *driver, const char *group) +{ + kafka_route_entry_t *find = kafka_route_tbl_get(tbl, driver, group); + if (find) { + HASH_DEL(*tbl, find); + kafka_route_entry_free(find); + } +} + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/plugins/kafka/kafka_plugin_intf.c b/plugins/kafka/kafka_plugin_intf.c new file mode 100644 index 000000000..a4877c787 --- /dev/null +++ b/plugins/kafka/kafka_plugin_intf.c @@ -0,0 +1,465 @@ +/** + * NEURON IIoT System for Industry 4.0 + * Copyright (C) 2020-2024 EMQ Technologies Co., Ltd All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + **/ + +#include +#include +#include + +#include "errcodes.h" +#include "utils/asprintf.h" + +#include "kafka_config.h" +#include "kafka_handle.h" +#include "kafka_plugin.h" +#include "kafka_plugin_intf.h" + +extern const neu_plugin_module_t neu_plugin_module; + +/* ----------------------------- rdkafka callbacks -------------------------- */ + +static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *msg, + void *opaque) +{ + (void) rk; + neu_plugin_t *plugin = (neu_plugin_t *) opaque; + + if (msg->err) { + plugin->delivery_fail++; + NEU_PLUGIN_UPDATE_METRIC(plugin, NEU_METRIC_SEND_MSG_ERRORS_TOTAL, 1, + NULL); + if (plugin->delivery_fail % 100 == 1) { + plog_warn(plugin, + "delivery failed (%s): %s (total_fail=%" PRId64 ")", + rd_kafka_topic_name(msg->rkt), rd_kafka_err2str(msg->err), + plugin->delivery_fail); + } + } else { + plugin->delivery_succ++; + NEU_PLUGIN_UPDATE_METRIC(plugin, NEU_METRIC_SEND_MSGS_TOTAL, 1, NULL); + if (!plugin->connected) { + plugin->connected = true; + plugin->common.link_state = NEU_NODE_LINK_STATE_CONNECTED; + plog_notice(plugin, + "broker reconnected (delivery ok, succ=%" PRId64 ")", + plugin->delivery_succ); + } + } +} + +static void error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque) +{ + (void) rk; + neu_plugin_t *plugin = (neu_plugin_t *) opaque; + + if (err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN || + err == RD_KAFKA_RESP_ERR__TRANSPORT) { + if (plugin->connected) { + plugin->connected = false; + plugin->common.link_state = NEU_NODE_LINK_STATE_DISCONNECTED; + NEU_PLUGIN_UPDATE_METRIC(plugin, NEU_METRIC_DISCONNECTION_60S, 1, + NULL); + NEU_PLUGIN_UPDATE_METRIC(plugin, NEU_METRIC_DISCONNECTION_600S, 1, + NULL); + NEU_PLUGIN_UPDATE_METRIC(plugin, NEU_METRIC_DISCONNECTION_1800S, 1, + NULL); + plog_error(plugin, "broker connection lost: %s", reason); + } + } else { + plog_warn(plugin, "rdkafka error %d: %s", err, reason); + } +} + +static void stats_cb_check_connected(neu_plugin_t *plugin, rd_kafka_t *rk) +{ + (void) rk; + const rd_kafka_metadata_t *md = NULL; + rd_kafka_resp_err_t err = rd_kafka_metadata(plugin->rk, 0, NULL, &md, 2000); + + if (err == RD_KAFKA_RESP_ERR_NO_ERROR && md && md->broker_cnt > 0) { + if (!plugin->connected) { + plugin->connected = true; + plugin->common.link_state = NEU_NODE_LINK_STATE_CONNECTED; + plog_notice(plugin, "broker connected (brokers=%d)", + md->broker_cnt); + } + } + + if (md) { + rd_kafka_metadata_destroy(md); + } +} + +/* ------------------------------ poll timer -------------------------------- */ + +static int64_t current_time_ms(void) +{ + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return (int64_t) ts.tv_sec * 1000 + ts.tv_nsec / 1000000; +} + +#define RECONN_CHECK_INTERVAL_MS 5000 + +static int poll_timer_cb(void *data) +{ + neu_plugin_t *plugin = (neu_plugin_t *) data; + + if (plugin->rk) { + rd_kafka_poll(plugin->rk, 0); + + if (!plugin->connected) { + int64_t now = current_time_ms(); + if (now - plugin->last_reconn_check_ms >= + RECONN_CHECK_INTERVAL_MS) { + plugin->last_reconn_check_ms = now; + stats_cb_check_connected(plugin, plugin->rk); + } + } + } + + return 0; +} + +static int start_poll_timer(neu_plugin_t *plugin) +{ + if (NULL == plugin->events) { + plugin->events = neu_event_new(plugin->common.name); + if (NULL == plugin->events) { + plog_error(plugin, "neu_event_new fail"); + return -1; + } + } + + neu_event_timer_param_t param = { + .second = 0, + .millisecond = 200, + .cb = poll_timer_cb, + .usr_data = plugin, + }; + + neu_event_timer_t *timer = neu_event_add_timer(plugin->events, param); + if (NULL == timer) { + plog_error(plugin, "neu_event_add_timer fail"); + return -1; + } + + if (plugin->poll_timer) { + neu_event_del_timer(plugin->events, plugin->poll_timer); + } + plugin->poll_timer = timer; + return 0; +} + +static void stop_poll_timer(neu_plugin_t *plugin) +{ + if (plugin->poll_timer) { + neu_event_del_timer(plugin->events, plugin->poll_timer); + plugin->poll_timer = NULL; + } +} + +/* ----------------------------- rdkafka setup ------------------------------ */ + +static int set_conf(neu_plugin_t *plugin, rd_kafka_conf_t *conf, + const char *key, const char *val) +{ + char errstr[512]; + + if (RD_KAFKA_CONF_OK != + rd_kafka_conf_set(conf, key, val, errstr, sizeof(errstr))) { + plog_warn(plugin, "rd_kafka_conf_set %s=%s fail: %s", key, val, errstr); + return -1; + } + return 0; +} + +static void set_rdkafka_conf(neu_plugin_t *plugin, rd_kafka_conf_t *conf, + const kafka_config_t *config) +{ + char buf[64]; + + set_conf(plugin, conf, "bootstrap.servers", config->broker); + + snprintf(buf, sizeof(buf), "%d", config->batch_max_messages); + set_conf(plugin, conf, "queue.buffering.max.messages", buf); + + snprintf(buf, sizeof(buf), "%d", config->linger_ms); + set_conf(plugin, conf, "linger.ms", buf); + + set_conf(plugin, conf, "compression.codec", + kafka_compression_str(config->compression)); + + snprintf(buf, sizeof(buf), "%d", config->message_timeout_ms); + set_conf(plugin, conf, "message.timeout.ms", buf); + + snprintf(buf, sizeof(buf), "%d", config->acks); + set_conf(plugin, conf, "acks", buf); + + if (config->client_id) { + set_conf(plugin, conf, "client.id", config->client_id); + } + + set_conf(plugin, conf, "security.protocol", + kafka_security_protocol_str(config->security_protocol)); + + if (config->security_protocol == KAFKA_SECURITY_SASL_PLAINTEXT || + config->security_protocol == KAFKA_SECURITY_SASL_SSL) { + set_conf(plugin, conf, "sasl.mechanism", + kafka_sasl_mechanism_str(config->sasl_mechanism)); + if (config->sasl_username) { + set_conf(plugin, conf, "sasl.username", config->sasl_username); + } + if (config->sasl_password) { + set_conf(plugin, conf, "sasl.password", config->sasl_password); + } + } + + if (config->security_protocol == KAFKA_SECURITY_SSL || + config->security_protocol == KAFKA_SECURITY_SASL_SSL) { + if (config->ssl_ca) { + set_conf(plugin, conf, "ssl.ca.pem", config->ssl_ca); + } + if (config->ssl_cert) { + set_conf(plugin, conf, "ssl.certificate.pem", config->ssl_cert); + } + if (config->ssl_key) { + set_conf(plugin, conf, "ssl.key.pem", config->ssl_key); + } + } + + rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); + rd_kafka_conf_set_error_cb(conf, error_cb); + rd_kafka_conf_set_opaque(conf, plugin); +} + +static rd_kafka_t *create_producer(neu_plugin_t * plugin, + const kafka_config_t *config) +{ + char errstr[512]; + rd_kafka_conf_t *conf = rd_kafka_conf_new(); + + set_rdkafka_conf(plugin, conf, config); + + rd_kafka_t *rk = + rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); + if (NULL == rk) { + plog_error(plugin, "rd_kafka_new failed: %s", errstr); + return NULL; + } + + return rk; +} + +/* ---------------------------- plugin interface ---------------------------- */ + +static neu_plugin_t *kafka_plugin_open(void) +{ + neu_plugin_t *plugin = (neu_plugin_t *) calloc(1, sizeof(neu_plugin_t)); + neu_plugin_common_init(&plugin->common); + return plugin; +} + +static int kafka_plugin_close(neu_plugin_t *plugin) +{ + plog_notice(plugin, "plugin closed"); + free(plugin); + return NEU_ERR_SUCCESS; +} + +static int kafka_plugin_init(neu_plugin_t *plugin, bool load) +{ + (void) load; + + NEU_PLUGIN_REGISTER_METRIC(plugin, NEU_METRIC_TRANS_DATA_5S, 5000); + NEU_PLUGIN_REGISTER_METRIC(plugin, NEU_METRIC_TRANS_DATA_30S, 30000); + NEU_PLUGIN_REGISTER_METRIC(plugin, NEU_METRIC_TRANS_DATA_60S, 60000); + NEU_PLUGIN_REGISTER_METRIC(plugin, NEU_METRIC_SEND_MSGS_TOTAL, 0); + NEU_PLUGIN_REGISTER_METRIC(plugin, NEU_METRIC_SEND_MSG_ERRORS_TOTAL, 0); + NEU_PLUGIN_REGISTER_METRIC(plugin, NEU_METRIC_SEND_BYTES_5S, 5000); + NEU_PLUGIN_REGISTER_METRIC(plugin, NEU_METRIC_SEND_BYTES_30S, 30000); + NEU_PLUGIN_REGISTER_METRIC(plugin, NEU_METRIC_SEND_BYTES_60S, 60000); + NEU_PLUGIN_REGISTER_METRIC(plugin, NEU_METRIC_DISCONNECTION_60S, 60000); + NEU_PLUGIN_REGISTER_METRIC(plugin, NEU_METRIC_DISCONNECTION_600S, 600000); + NEU_PLUGIN_REGISTER_METRIC(plugin, NEU_METRIC_DISCONNECTION_1800S, 1800000); + + plog_notice(plugin, "plugin `%s` initialized", + neu_plugin_module.module_name); + return NEU_ERR_SUCCESS; +} + +static int kafka_plugin_uninit(neu_plugin_t *plugin) +{ + stop_poll_timer(plugin); + + if (plugin->events) { + neu_event_close(plugin->events); + plugin->events = NULL; + } + + if (plugin->rk) { + rd_kafka_flush(plugin->rk, 5000); + rd_kafka_destroy(plugin->rk); + plugin->rk = NULL; + } + + kafka_config_fini(&plugin->config); + kafka_route_tbl_free(plugin->route_tbl); + plugin->route_tbl = NULL; + + plog_notice(plugin, "plugin `%s` uninitialized", + neu_plugin_module.module_name); + return NEU_ERR_SUCCESS; +} + +static int kafka_plugin_config(neu_plugin_t *plugin, const char *setting) +{ + int rv = 0; + kafka_config_t config = { 0 }; + + rv = kafka_config_parse(plugin, setting, &config); + if (0 != rv) { + plog_error(plugin, "kafka_config_parse fail"); + return NEU_ERR_NODE_SETTING_INVALID; + } + + stop_poll_timer(plugin); + + if (plugin->rk) { + rd_kafka_flush(plugin->rk, 3000); + rd_kafka_destroy(plugin->rk); + plugin->rk = NULL; + } + + plugin->rk = create_producer(plugin, &config); + if (NULL == plugin->rk) { + plog_error(plugin, "create kafka producer fail"); + kafka_config_fini(&config); + return NEU_ERR_PLUGIN_NOT_RUNNING; + } + + if (0 != start_poll_timer(plugin)) { + plog_error(plugin, "start poll timer fail"); + rd_kafka_destroy(plugin->rk); + plugin->rk = NULL; + kafka_config_fini(&config); + return NEU_ERR_EINTERNAL; + } + + stats_cb_check_connected(plugin, plugin->rk); + + kafka_config_fini(&plugin->config); + plugin->config = config; + + plog_notice(plugin, "plugin `%s` configured", + neu_plugin_module.module_name); + return NEU_ERR_SUCCESS; +} + +static int kafka_plugin_start(neu_plugin_t *plugin) +{ + if (NULL == plugin->rk) { + plog_error(plugin, "kafka producer is NULL"); + return NEU_ERR_PLUGIN_NOT_RUNNING; + } + + if (0 != start_poll_timer(plugin)) { + return NEU_ERR_EINTERNAL; + } + + stats_cb_check_connected(plugin, plugin->rk); + + plog_notice(plugin, "plugin `%s` started", neu_plugin_module.module_name); + return NEU_ERR_SUCCESS; +} + +static int kafka_plugin_stop(neu_plugin_t *plugin) +{ + stop_poll_timer(plugin); + + if (plugin->rk) { + rd_kafka_flush(plugin->rk, 3000); + } + + plugin->connected = false; + plugin->common.link_state = NEU_NODE_LINK_STATE_DISCONNECTED; + + plog_notice(plugin, "plugin `%s` stopped", neu_plugin_module.module_name); + return NEU_ERR_SUCCESS; +} + +static int kafka_plugin_request(neu_plugin_t *plugin, neu_reqresp_head_t *head, + void *data) +{ + neu_err_code_e error = NEU_ERR_SUCCESS; + + switch (head->type) { + case NEU_REQRESP_TRANS_DATA: + if (plugin->rk) { + NEU_PLUGIN_UPDATE_METRIC(plugin, NEU_METRIC_TRANS_DATA_5S, 1, NULL); + NEU_PLUGIN_UPDATE_METRIC(plugin, NEU_METRIC_TRANS_DATA_30S, 1, + NULL); + NEU_PLUGIN_UPDATE_METRIC(plugin, NEU_METRIC_TRANS_DATA_60S, 1, + NULL); + } + error = handle_trans_data(plugin, data); + break; + case NEU_REQ_SUBSCRIBE_GROUP: + error = handle_subscribe_group(plugin, data); + break; + case NEU_REQ_UPDATE_SUBSCRIBE_GROUP: + error = handle_update_subscribe(plugin, data); + break; + case NEU_REQ_UNSUBSCRIBE_GROUP: + error = handle_unsubscribe_group(plugin, data); + break; + case NEU_REQ_UPDATE_GROUP: + error = handle_update_group(plugin, data); + break; + case NEU_REQ_DEL_GROUP: + error = handle_del_group(plugin, data); + break; + case NEU_REQ_UPDATE_NODE: + error = handle_update_driver(plugin, data); + break; + case NEU_REQRESP_NODE_DELETED: { + neu_reqresp_node_deleted_t *req = data; + if (0 != strcmp(plugin->common.name, req->node)) { + error = handle_del_driver(plugin, data); + } + break; + } + default: + error = NEU_ERR_PLUGIN_NOT_RUNNING; + break; + } + + return error; +} + +const neu_plugin_intf_funs_t kafka_plugin_intf_funs = { + .open = kafka_plugin_open, + .close = kafka_plugin_close, + .init = kafka_plugin_init, + .uninit = kafka_plugin_uninit, + .start = kafka_plugin_start, + .stop = kafka_plugin_stop, + .setting = kafka_plugin_config, + .request = kafka_plugin_request, + .try_connect = NULL, +}; diff --git a/plugins/kafka/kafka_plugin_intf.h b/plugins/kafka/kafka_plugin_intf.h new file mode 100644 index 000000000..20a517e89 --- /dev/null +++ b/plugins/kafka/kafka_plugin_intf.h @@ -0,0 +1,27 @@ +/** + * NEURON IIoT System for Industry 4.0 + * Copyright (C) 2020-2024 EMQ Technologies Co., Ltd All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + **/ + +#ifndef NEURON_PLUGIN_KAFKA_INTF_H +#define NEURON_PLUGIN_KAFKA_INTF_H + +#include "plugin.h" + +extern const neu_plugin_intf_funs_t kafka_plugin_intf_funs; + +#endif diff --git a/tests/ft/app/test_kafka.py b/tests/ft/app/test_kafka.py new file mode 100644 index 000000000..8969cdbd6 --- /dev/null +++ b/tests/ft/app/test_kafka.py @@ -0,0 +1,589 @@ +import json +import os +import time + +import pytest + +import neuron.api as api +import neuron.error as error +import neuron.config as config +from neuron.common import ( + case_time, + class_setup_and_teardown, + compare_float, + description, + process, + random_port, +) + +KAFKA_BROKER = os.environ.get("KAFKA_BROKER", "127.0.0.1:9092") +NODE = "kafka" +DRIVER = "modbus" +GROUP = "group" +INTERVAL = 100 +TOPIC = "neuron-ft-test" +FORMAT_VALUES = 0 +FORMAT_TAGS = 1 + +TAG0 = { + "name": "tag0", + "address": "1!400001", + "attribute": config.NEU_TAG_ATTRIBUTE_RW, + "type": config.NEU_TYPE_INT16, +} +TAG_ERR = { + "name": "tag_err", + "address": "100!400001", + "attribute": config.NEU_TAG_ATTRIBUTE_RW, + "type": config.NEU_TYPE_INT16, +} +TAGS = [ + { + "name": "tag1", + "address": "1!000001", + "attribute": config.NEU_TAG_ATTRIBUTE_RW, + "type": config.NEU_TYPE_BIT, + }, + { + "name": "tag2", + "address": "1!400017", + "attribute": config.NEU_TAG_ATTRIBUTE_RW, + "type": config.NEU_TYPE_FLOAT, + }, +] + + +def kafka_consume_one(broker, topic, group_id, timeout=10): + """Consume one message from kafka, return parsed JSON or None.""" + from confluent_kafka import Consumer, KafkaError + + consumer = Consumer( + { + "bootstrap.servers": broker, + "group.id": group_id, + "auto.offset.reset": "latest", + } + ) + consumer.subscribe([topic]) + consumer.poll(timeout=2.0) + + try: + start = time.time() + while time.time() - start < timeout: + raw = consumer.poll(timeout=1.0) + if raw is None: + continue + if raw.error(): + if raw.error().code() == KafkaError._PARTITION_EOF: + continue + raise Exception(raw.error()) + return json.loads(raw.value()) + return None + finally: + consumer.close() + + +def kafka_consume_until(broker, topic, group_id, predicate, timeout=10): + """Consume messages until predicate(msg) returns True.""" + from confluent_kafka import Consumer, KafkaError + + consumer = Consumer( + { + "bootstrap.servers": broker, + "group.id": group_id, + "auto.offset.reset": "latest", + } + ) + consumer.subscribe([topic]) + consumer.poll(timeout=2.0) + + try: + start = time.time() + while time.time() - start < timeout: + raw = consumer.poll(timeout=1.0) + if raw is None: + continue + if raw.error(): + if raw.error().code() == KafkaError._PARTITION_EOF: + continue + raise Exception(raw.error()) + msg = json.loads(raw.value()) + if predicate(msg): + return msg + return None + finally: + consumer.close() + + +def kafka_drain(broker, topic, group_id, seconds=3): + """Consume for a duration, return True if any message received.""" + from confluent_kafka import Consumer, KafkaError + + consumer = Consumer( + { + "bootstrap.servers": broker, + "group.id": group_id, + "auto.offset.reset": "latest", + } + ) + consumer.subscribe([topic]) + consumer.poll(timeout=2.0) + + got_msg = False + try: + start = time.time() + while time.time() - start < seconds: + raw = consumer.poll(timeout=0.5) + if raw and not raw.error(): + got_msg = True + return got_msg + finally: + consumer.close() + + +@pytest.fixture(autouse=True, scope="class") +def kafka_node(): + api.add_node_check(NODE, config.PLUGIN_KAFKA) + yield + api.del_node(NODE) + + +@pytest.fixture(autouse=True, scope="class") +def modbus_node(kafka_node): + try: + api.add_node_check(DRIVER, config.PLUGIN_MODBUS_TCP) + api.add_group_check(DRIVER, GROUP, INTERVAL) + api.add_tags_check(DRIVER, GROUP, tags=[TAG0]) + params = {"topic": TOPIC} + api.subscribe_group_check(NODE, DRIVER, GROUP, params) + port = random_port() + p = process.start_simulator( + ["./modbus_simulator", "tcp", f"{port}", "ip_v4"] + ) + response = api.modbus_tcp_node_setting(DRIVER, port) + assert 200 == response.status_code + assert error.NEU_ERR_SUCCESS == response.json()["error"] + yield + finally: + api.del_node(DRIVER) + process.stop_simulator(p) + + +class TestKafka: + + # ─── config validation ──────────────────────────────────────────── + + @description( + given="Kafka node and {conf}", + when="config node", + then="should fail", + ) + @pytest.mark.parametrize( + "conf", + [ + {"broker": "", "topic": TOPIC, "format": FORMAT_VALUES}, + {"broker": KAFKA_BROKER, "topic": "", "format": FORMAT_VALUES}, + {"broker": KAFKA_BROKER, "topic": TOPIC, "format": -1}, + {"broker": KAFKA_BROKER, "topic": TOPIC, "format": 99}, + {"broker": KAFKA_BROKER, "topic": TOPIC, "format": FORMAT_VALUES, + "acks": 2}, + ], + ids=[ + "empty-broker", + "empty-topic", + "bad-format-neg", + "bad-format-big", + "bad-acks", + ], + ) + def test_kafka_conf_invalid(self, conf): + response = api.node_setting(NODE, conf) + assert 400 == response.status_code + assert error.NEU_ERR_NODE_SETTING_INVALID == response.json()["error"] + + @description( + given="Kafka node and valid minimal config", + when="config node", + then="should succeed", + ) + def test_kafka_conf_minimal(self): + api.node_setting_check( + NODE, + {"broker": KAFKA_BROKER, "topic": TOPIC, "format": FORMAT_VALUES}, + ) + + @description( + given="Kafka node and config with all optional params", + when="config node", + then="should succeed", + ) + def test_kafka_conf_full(self): + api.node_setting_check( + NODE, + { + "broker": KAFKA_BROKER, + "topic": TOPIC, + "format": FORMAT_TAGS, + "upload_err": False, + "compression": 4, + "batch-max-messages": 5000, + "linger-ms": 10, + "message-timeout-ms": 10000, + "acks": 1, + "client-id": "neuron-ft", + }, + ) + + @description( + given="Kafka node configured", + when="read back setting", + then="values should match", + ) + def test_kafka_get_setting(self): + api.node_setting_check( + NODE, + { + "broker": KAFKA_BROKER, + "topic": TOPIC, + "format": FORMAT_VALUES, + "compression": 2, + "batch-max-messages": 8000, + "linger-ms": 20, + "message-timeout-ms": 15000, + "acks": 0, + "client-id": "neuron-test", + }, + ) + response = api.get_node_setting(NODE) + assert 200 == response.status_code + params = response.json()["params"] + assert KAFKA_BROKER == params["broker"] + assert TOPIC == params["topic"] + assert FORMAT_VALUES == params["format"] + assert 2 == params["compression"] + assert 8000 == params["batch-max-messages"] + assert 20 == params["linger-ms"] + assert 15000 == params["message-timeout-ms"] + assert 0 == params["acks"] + assert "neuron-test" == params["client-id"] + + # ─── node lifecycle ─────────────────────────────────────────────── + + @description( + given="Kafka node with valid config", + when="start and stop node", + then="state should change accordingly", + ) + def test_kafka_start_stop(self): + api.node_setting_check( + NODE, + {"broker": KAFKA_BROKER, "topic": TOPIC, "format": FORMAT_VALUES}, + ) + api.node_ctl(NODE, config.NEU_CTL_START) + response = api.get_nodes_state(NODE) + assert 200 == response.status_code + assert config.NEU_NODE_STATE_RUNNING == response.json()["running"] + + api.node_ctl(NODE, config.NEU_CTL_STOP) + response = api.get_nodes_state(NODE) + assert 200 == response.status_code + assert config.NEU_NODE_STATE_STOPPED == response.json()["running"] + + @description( + given="Kafka node running", + when="re-configure with different topic", + then="should succeed without restart", + ) + def test_kafka_reconfig(self): + api.node_setting_check( + NODE, + {"broker": KAFKA_BROKER, "topic": TOPIC, "format": FORMAT_VALUES}, + ) + api.node_ctl(NODE, config.NEU_CTL_START) + + new_topic = TOPIC + "-reconf" + api.node_setting_check( + NODE, + {"broker": KAFKA_BROKER, "topic": new_topic, "format": FORMAT_TAGS}, + ) + response = api.get_node_setting(NODE) + assert 200 == response.status_code + assert new_topic == response.json()["params"]["topic"] + assert FORMAT_TAGS == response.json()["params"]["format"] + + @description( + given="Kafka node with valid config", + when="start node and wait", + then="should connect to broker", + ) + def test_kafka_connect(self): + api.node_setting_check( + NODE, + {"broker": KAFKA_BROKER, "topic": TOPIC, "format": FORMAT_VALUES}, + ) + api.node_ctl(NODE, config.NEU_CTL_START) + + connected = False + for _ in range(10): + time.sleep(1) + response = api.get_nodes_state(NODE) + assert 200 == response.status_code + if ( + config.NEU_NODE_LINK_STATE_CONNECTED + == response.json()["link"] + ): + connected = True + break + assert connected, "kafka node did not connect to broker" + + # ─── data upload ────────────────────────────────────────────────── + + @description( + given="Kafka node with values-format", + when="driver produces data", + then="message should have values/errors structure", + ) + def test_kafka_upload_values(self): + api.node_setting_check( + NODE, + {"broker": KAFKA_BROKER, "topic": TOPIC, "format": FORMAT_VALUES}, + ) + api.node_ctl(NODE, config.NEU_CTL_START) + time.sleep(2) + + msg = kafka_consume_until( + KAFKA_BROKER, + TOPIC, + "ft-values", + lambda m: "values" in m or "errors" in m, + ) + assert msg is not None, "no values-format message received from kafka" + assert DRIVER == msg["node"] + assert GROUP == msg["group"] + assert "timestamp" in msg + assert ( + TAG0["name"] in msg.get("values", {}) + or TAG0["name"] in msg.get("errors", {}) + ) + + @description( + given="Kafka node with tags-format", + when="driver produces data", + then="message should have tags array structure", + ) + def test_kafka_upload_tags(self): + api.node_setting_check( + NODE, + {"broker": KAFKA_BROKER, "topic": TOPIC, "format": FORMAT_TAGS}, + ) + api.node_ctl(NODE, config.NEU_CTL_START) + time.sleep(2) + + msg = kafka_consume_until( + KAFKA_BROKER, + TOPIC, + "ft-tags", + lambda m: "tags" in m, + ) + assert msg is not None, "no tags-format message received from kafka" + assert DRIVER == msg["node"] + assert GROUP == msg["group"] + assert "tags" in msg + assert TAG0["name"] in [tag["name"] for tag in msg["tags"]] + + @description( + given="Kafka node with values-format and a written tag", + when="driver reports written value", + then="kafka message should contain the exact value", + ) + def test_kafka_upload_written_value(self): + api.node_setting_check( + NODE, + {"broker": KAFKA_BROKER, "topic": TOPIC, "format": FORMAT_VALUES}, + ) + api.node_ctl(NODE, config.NEU_CTL_START) + api.write_tag_check(DRIVER, GROUP, TAG0["name"], 12345) + time.sleep(INTERVAL * 5 / 1000) + + msg = kafka_consume_until( + KAFKA_BROKER, + TOPIC, + "ft-write-val", + lambda m: m.get("values", {}).get(TAG0["name"]) == 12345, + ) + assert msg is not None, "did not receive message with written value" + assert 12345 == msg["values"][TAG0["name"]] + + @description( + given="Kafka node with multiple tags", + when="driver produces data", + then="all tags should appear in message", + ) + def test_kafka_upload_multi_tags(self): + api.node_setting_check( + NODE, + {"broker": KAFKA_BROKER, "topic": TOPIC, "format": FORMAT_TAGS}, + ) + api.node_ctl(NODE, config.NEU_CTL_START) + time.sleep(2) + + try: + api.add_tags_check(DRIVER, GROUP, TAGS) + + msg = kafka_consume_until( + KAFKA_BROKER, + TOPIC, + "ft-multi", + lambda m: len(m.get("tags", [])) >= len(TAGS) + 1, + ) + assert msg is not None, "no message with all tags" + tag_names = [t["name"] for t in msg["tags"]] + assert TAG0["name"] in tag_names + for tag in TAGS: + assert tag["name"] in tag_names + finally: + api.del_tags(DRIVER, GROUP, [t["name"] for t in TAGS]) + + @description( + given="Kafka node with upload_err=false and an error tag", + when="driver reports error tag", + then="error tag should be filtered from kafka message", + ) + def test_kafka_upload_err_filter(self): + api.node_setting_check( + NODE, + { + "broker": KAFKA_BROKER, + "topic": TOPIC, + "format": FORMAT_TAGS, + "upload_err": False, + }, + ) + api.node_ctl(NODE, config.NEU_CTL_START) + time.sleep(2) + + try: + api.add_tags_check(DRIVER, GROUP, tags=[TAG_ERR]) + + msg = kafka_consume_one(KAFKA_BROKER, TOPIC, "ft-err-filter") + assert msg is not None, "no message received" + tag_names = [t["name"] for t in msg.get("tags", [])] + assert TAG_ERR["name"] not in tag_names + finally: + api.del_tags(DRIVER, GROUP, [TAG_ERR["name"]]) + + # ─── topic routing ──────────────────────────────────────────────── + + @description( + given="Kafka node with custom topic per subscription", + when="driver produces data", + then="data should arrive on the custom topic", + ) + def test_kafka_custom_topic(self): + custom_topic = TOPIC + "-custom" + + api.node_setting_check( + NODE, + {"broker": KAFKA_BROKER, "topic": TOPIC, "format": FORMAT_VALUES}, + ) + api.node_ctl(NODE, config.NEU_CTL_START) + time.sleep(2) + + params = {"topic": custom_topic} + api.subscribe_group_update(NODE, DRIVER, GROUP, params) + + try: + msg = kafka_consume_one(KAFKA_BROKER, custom_topic, "ft-custom") + assert msg is not None, "no message on custom topic" + assert DRIVER == msg["node"] + assert GROUP == msg["group"] + finally: + params = {"topic": TOPIC} + api.subscribe_group_update(NODE, DRIVER, GROUP, params) + + @description( + given="Kafka node subscribed to a driver", + when="driver is renamed", + then="data should still arrive with the new driver name", + ) + def test_kafka_driver_rename(self): + api.node_setting_check( + NODE, + {"broker": KAFKA_BROKER, "topic": TOPIC, "format": FORMAT_TAGS}, + ) + api.node_ctl(NODE, config.NEU_CTL_START) + time.sleep(2) + + new_name = DRIVER + "-new" + api.update_node_check(DRIVER, new_name) + + try: + msg = kafka_consume_until( + KAFKA_BROKER, + TOPIC, + "ft-drv-rename", + lambda m: m.get("node") == new_name, + ) + assert msg is not None, "no message after driver rename" + assert new_name == msg["node"] + assert GROUP == msg["group"] + finally: + api.update_node_check(new_name, DRIVER) + + @description( + given="Kafka node subscribed to a group", + when="group is renamed", + then="data should still arrive with the new group name", + ) + def test_kafka_group_rename(self): + api.node_setting_check( + NODE, + {"broker": KAFKA_BROKER, "topic": TOPIC, "format": FORMAT_TAGS}, + ) + api.node_ctl(NODE, config.NEU_CTL_START) + time.sleep(2) + + new_name = GROUP + "-new" + api.update_group_check(DRIVER, GROUP, new_name) + + try: + msg = kafka_consume_until( + KAFKA_BROKER, + TOPIC, + "ft-grp-rename", + lambda m: m.get("group") == new_name, + ) + assert msg is not None, "no message after group rename" + assert DRIVER == msg["node"] + assert new_name == msg["group"] + finally: + api.update_group_check(DRIVER, new_name, GROUP) + + @description( + given="Kafka node subscribed to a group", + when="unsubscribe then re-subscribe with new topic", + then="data should arrive on the new topic", + ) + def test_kafka_unsubscribe_resubscribe(self): + api.node_setting_check( + NODE, + {"broker": KAFKA_BROKER, "topic": TOPIC, "format": FORMAT_VALUES}, + ) + api.node_ctl(NODE, config.NEU_CTL_START) + time.sleep(2) + + api.unsubscribe_group(NODE, DRIVER, GROUP) + time.sleep(1) + + new_topic = TOPIC + "-resub" + params = {"topic": new_topic} + api.subscribe_group_check(NODE, DRIVER, GROUP, params) + + try: + msg = kafka_consume_one(KAFKA_BROKER, new_topic, "ft-resub") + assert msg is not None, "no message on re-subscribed topic" + assert DRIVER == msg["node"] + assert GROUP == msg["group"] + finally: + params = {"topic": TOPIC} + api.subscribe_group_update(NODE, DRIVER, GROUP, params) diff --git a/tests/ft/neuron/api.py b/tests/ft/neuron/api.py index 43ea79667..7af1d2037 100644 --- a/tests/ft/neuron/api.py +++ b/tests/ft/neuron/api.py @@ -429,6 +429,10 @@ def mqtt_node_setting(node, client_id="neuron_aBcDeF", host="broker.emqx.io", po "offline-cache": False, "cache-sync-interval": 100, "host": host, "port": port, "username": "", "password": "", "ssl": False}) +def kafka_node_setting(node, broker="127.0.0.1:9092", topic="neuron-data", fmt=0): + return node_setting(node, json={"broker": broker, "topic": topic, "format": fmt}) + + def get_nodes_disable_auth(type): return requests.get(url=config.BASE_URL + '/api/v2/node', params={"type": type}) diff --git a/tests/ft/neuron/config.py b/tests/ft/neuron/config.py index ca95adb49..9758e0003 100644 --- a/tests/ft/neuron/config.py +++ b/tests/ft/neuron/config.py @@ -91,3 +91,4 @@ PLUGIN_MODBUS_TCP = "Modbus TCP" PLUGIN_MODBUS_RTU = "Modbus RTU" PLUGIN_MQTT = "MQTT" +PLUGIN_KAFKA = "Kafka" From 7c03aba923466264e96efbaf87c0ceeb56f39dbd Mon Sep 17 00:00:00 2001 From: hxy7yx <1595670487@qq.com> Date: Sun, 12 Apr 2026 18:55:21 -0700 Subject: [PATCH 2/2] mqtt: report driver node state immediately --- plugins/mqtt/mqtt_plugin_intf.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/plugins/mqtt/mqtt_plugin_intf.c b/plugins/mqtt/mqtt_plugin_intf.c index 42c5e8599..468128775 100644 --- a/plugins/mqtt/mqtt_plugin_intf.c +++ b/plugins/mqtt/mqtt_plugin_intf.c @@ -98,6 +98,10 @@ static void connect_cb(void *data) neu_plugin_t *plugin = data; plugin->common.link_state = NEU_NODE_LINK_STATE_CONNECTED; plog_notice(plugin, "plugin `%s` connected", neu_plugin_module.module_name); + + if (plugin->heartbeat_timer) { + heartbeat_timer_cb(plugin); + } } static void disconnect_cb(void *data)