
Kafka Connect 是 Kafka 提供的一个强大框架,用于将 Kafka 与其他系统集成。它简化了数据源和目标系统之间的数据流动,通过使用连接器(Connectors),可以将数据源(如数据库、文件系统)和 Kafka 进行连接,也可以将 Kafka 中的数据流向目标系统(如另一个数据库、搜索引擎)。
下面是一个示例 Docker Compose 文件,展示如何使用最新版本的 Kafka 在 KRaft 模式下运行 Kafka 集群。
version: '3.7'
services:
kafka:
image: confluentinc/cp-kafka:7.3.0
hostname: kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: localhost:9092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: ""
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_MIN_INSYNC_REPLICAS: 1
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "true"
volumes:
- ./data/kafka:/var/lib/kafka/data
connect:
image: confluentinc/cp-kafka-connect:7.3.0
hostname: connect
container_name: connect
ports:
- "8083:8083"
depends_on:
- kafka
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: "connect-cluster"
CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"
CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "connect-status"
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_PLUGIN_PATH: "/usr/share/java"
volumes:
- ./data:/data
- ./plugins:/usr/share/java
创建一个名为 file-source-connector.json
的文件,用于配置 FileStream Source Connector:
{
"name": "file-source-connector",
"config": {
"connector.class": "FileStreamSource",
"tasks.max": "1",
"file": "/data/input.txt",
"topic": "input-topic"
}
}
创建一个名为 file-sink-connector.json
的文件,用于配置 FileStream Sink Connector:
{
"name": "file-sink-connector",
"config": {
"connector.class": "FileStreamSink",
"tasks.max": "1",
"file": "/data/output.txt",
"topics": "input-topic"
}
}
通过 REST API 创建连接器:
curl -X POST -H "Content-Type: application/json" --data @file-source-connector.json http://localhost:8083/connectors
curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors
在 Docker 容器中 /data
目录下创建一个 input.txt
文件,并写入一些数据。例如:
echo "Hello Kafka" >> data/input.txt
然后检查 data/output.txt 文件,应该会看到 input.txt 文件中的数据已经被写入 output.txt。