Kafka Connect 是 Kafka 提供的一个强大框架,用于将 Kafka 与其他系统集成。它简化了数据源和目标系统之间的数据流动,通过使用连接器(Connectors),可以将数据源(如数据库、文件系统)和 Kafka 进行连接,也可以将 Kafka 中的数据流向目标系统(如另一个数据库、搜索引擎)。

 

Kafka Connect 的核心概念

  1. Connector:连接器,用于定义如何从数据源读取数据(Source Connector)或将数据写入目标系统(Sink Connector)。
  2. Task:任务,一个连接器可以拆分成多个任务,以并行处理数据。
  3. Worker:工作节点,运行连接器和任务的 Kafka Connect 实例。
  4. Config:配置,定义连接器的参数。

 

下面是一个示例 Docker Compose 文件,展示如何使用最新版本的 Kafka 在 KRaft 模式下运行 Kafka 集群。

version: '3.7'
services:
  kafka:
    image: confluentinc/cp-kafka:7.3.0
    hostname: kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: localhost:9092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: ""
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_MIN_INSYNC_REPLICAS: 1
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "true"
    volumes:
      - ./data/kafka:/var/lib/kafka/data
  connect:
    image: confluentinc/cp-kafka-connect:7.3.0
    hostname: connect
    container_name: connect
    ports:
      - "8083:8083"
    depends_on:
      - kafka
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "connect-cluster"
      CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"
      CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "connect-status"
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_PLUGIN_PATH: "/usr/share/java"
    volumes:
      - ./data:/data
      - ./plugins:/usr/share/java

 

配置和运行连接器

创建连接器配置文件

创建一个名为 file-source-connector.json 的文件,用于配置 FileStream Source Connector:

{
  "name": "file-source-connector",
  "config": {
    "connector.class": "FileStreamSource",
    "tasks.max": "1",
    "file": "/data/input.txt",
    "topic": "input-topic"
  }
}

 

创建一个名为 file-sink-connector.json 的文件,用于配置 FileStream Sink Connector:

{
  "name": "file-sink-connector",
  "config": {
    "connector.class": "FileStreamSink",
    "tasks.max": "1",
    "file": "/data/output.txt",
    "topics": "input-topic"
  }
}

 

使用 REST API 创建连接器

通过 REST API 创建连接器: 

curl -X POST -H "Content-Type: application/json" --data @file-source-connector.json http://localhost:8083/connectors

 

curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors

 

5. 验证连接器工作

在 Docker 容器中 /data 目录下创建一个 input.txt 文件,并写入一些数据。例如:

echo "Hello Kafka" >> data/input.txt

 

然后检查 data/output.txt 文件,应该会看到 input.txt 文件中的数据已经被写入 output.txt。