Kafka

-
-
2024-08-16

 

kafka是什么?

消息中间件,用来储存消息队列、传递消息。

基于发布-订阅模式,可以分布式的、大吞吐量的、低延时的传递消息。

由于底层使用分布式的ZooKeeper,数据可以分布式的有冗余的存储。

所以kafka主要用于:

  1. 保存大量的消息队列数据
  2. 组件、系统、应用之间的数据通道
  3. 不同组件间的解耦

kafka特点

  • 高吞吐、低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。
  • 高伸缩性:每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。
  • 持久性、可靠性:Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据能够持久存储。
  • 容错性:允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作
  • 高并发:支持数千个客户端同时读写

基本概念:

  • Broker(代理人): 消息中间件所在的服务器就叫Broker
  • Message(消息):消息可以包含任何类型的数据,比如文本、JSON、二进制数据等。消息是 Kafka 中最基本的数据单元,每条记录都包含一个键,一个值和一个时间戳
  • Topic(主题): 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。
  • Partition(分区) : Partition是物理上的概念,体现在磁盘上面,每个Topic包含一个或多个Partition.每个分区都是一个有序的,不可变的记录序列,不断附加到结构化的提交日志中。分区中的记录每个都分配了一个称为偏移的顺序ID号,它唯一地标识分区中的每个记录。
  • Producer(生产者) : 负责发布消息到Kafka broker
  • Consumer(消费者) : 消息消费者,向Kafka broker读取消息的客户端。
  • Streams(流):允许应用程序充当流处理器,从一个或多个topics(主题)消耗的输入流,并产生一个输出流至一个或多个输出的topics(主题),有效地变换所述输入流,以输出流。
  • Connector(连接器): 许构建和运行kafka topics(主题)连接到现有的应用程序或数据系统中重用生产者或消费者。
  • offset (偏移量): 是kafka用来确定消息是否被消费过的标识,在kafka内部体现就是一个递增的数字。
  • Rebalance(重平衡)消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

主题Topic

在Apache Kafka中,主题(Topic)是消息的类别,用于对消息进行分类和组织。主题是Kafka消息系统中的核心概念,它是消息的逻辑容器,用于存储发布到Kafka集群的消息记录。

主题允许生产者将消息发布到特定的类别中,同时允许消费者订阅并从中读取消息。每个主题都可以分成多个分区,每个分区可以在Kafka集群中的不同服务器上进行复制,以提供容错和伸缩性。

主题的设计使得Kafka能够处理大量的消息,并提供高吞吐量和低延迟的特性。主题还允许消息的持久化存储,并支持消息的批量处理和流式处理。

分区Partition

在 Apache Kafka 中,分区(Partition)是主题(Topic)的一个重要概念。每个主题都可以被分成一个或多个分区,这些分区是消息存储和处理的基本单元。

每个分区都是一个有序、不可变的消息序列,其中的每条消息都会被分配一个唯一的偏移量(Offset)。这意味着消息在分区内是有序的,但在不同分区之间则不保证有序性。

分区的存在使得 Kafka 具备了高伸缩性和高吞吐量的特性。它们允许主题的数据分布在集群的多个节点上,从而实现了水平扩展。此外,分区还支持消息的并行处理,允许消费者组内的多个消费者并行地从不同分区读取消息。

分区的数量和分配策略是在创建主题时确定的,通常会根据数据量、负载均衡和并行处理需求来进行设置。在实际使用中,了解如何合理地设计和使用分区,可以帮助优化 Kafka 集群的性能和可伸缩性。

分配Assignment

在 Apache Kafka 中,分配(Assignment)通常指的是分区(Partitions)如何被分配给消费者组(Consumer Group)中的消费者(Consumers)。这个过程通常称为分区分配(Partition Assignment)。

当一个消费者组中的消费者启动或者有新的消费者加入时,Kafka 集群需要决定如何将主题的分区分配给这些消费者,以便实现负载均衡和并行处理。

Kafka 使用一种叫做"分区分配策略"的算法来决定分配的方式。这个算法通常由消费者组协调器(Consumer Group Coordinator)来执行。在分配过程中,考虑的因素包括消费者的订阅关系、已经分配的分区情况、分区的负载情况等。

一旦分配完成,每个消费者就知道自己负责消费哪些分区,从而可以开始从这些分区中读取消息。

合理的分区分配可以确保消费者组内的消费者能够有效地并行处理消息,同时避免分区的不均衡导致的性能问题。 Kafka 提供了多种默认的分配策略,同时也支持用户自定义分配策略,以满足不同的业务需求。

生产者Productor

在 Apache Kafka 中,生产者(Producer)是指向 Kafka 集群发送消息的应用程序或组件。生产者负责将消息发布到指定的主题(Topic),并且负责确保消息被安全地传递到 Kafka 集群中的分区。

生产者通常将消息发送到主题的一个或多个分区中,而 Kafka 集群会负责将这些消息持久化并进行分发。

生产者主要功能:

  •  消息发布: 生产者负责将消息发布到 Kafka 集群中的指定主题。
  •  负载均衡: 生产者可以向多个分区发布消息,并且 Kafka 集群会负责将这些消息均匀地分发到各个分区中,实现负载均衡。
  •  消息确认: 生产者可以选择是否需要消息的确认。确认机制可以确保消息被成功写入到指定的分区中,以及是否被成功复制到备份节点中。
  •  分区选择: 生产者可以选择将消息发送到指定的分区,或者让 Kafka 集群根据分区策略自动选择分区。
  •  性能调优: 生产者可以通过配置参数来调优性能,包括批量发送、压缩、异步发送等。

生产者是 Kafka 中非常重要的组件,它们使得应用程序能够将大量的消息高效地发送到 Kafka 集群中,从而实现了可靠的消息传递和处理。

消费者Consumer

在 Apache Kafka 中,消费者(Consumer)是指从 Kafka 集群订阅消息并处理这些消息的应用程序或组件。消费者从一个或多个主题(Topic)中读取消息,并对这些消息进行处理。

以下是一些消费者的关键特性和功能:

  •  订阅主题: 消费者可以订阅一个或多个主题,从中接收消息。一旦订阅了主题,消费者就可以开始接收这些主题中的消息。
  •  分区分配: 消费者可以被分配到一个或多个分区,以便并行地处理消息。Kafka 使用分区分配策略来决定如何将分区分配给消费者。
  •  消息处理: 消费者负责从分配给自己的分区中读取消息,并对这些消息进行处理。处理消息的方式可以是存储到数据库、进行实时计算、触发事件等。
  •  偏移量管理: 消费者需要管理自己消费的位置,即偏移量(Offset)。偏移量指示了消费者在分区中的位置,消费者需要定期地提交偏移量,以确保在重启或重新分配分区时能够继续从上次的位置消费。
  •  消费者组: 多个消费者可以组成一个消费者组,每个分区只能被消费者组内的一个消费者消费。这样可以实现消息的负载均衡和水平扩展。
  •  消息确认: 消费者可以选择是否需要确认消息的处理结果。确认机制可以确保消息被成功处理,避免消息丢失或重复处理。

消费者是 Kafka 中非常重要的组件,它们使得应用程序能够高效地从 Kafka 集群中接收消息,并进行相应的处理。消费者的健壮性和高吞吐量是 Kafka 消息系统的关键特点之一。

数据流Streams

在 Apache Kafka 中,Kafka Streams 是一个用于构建实时流处理应用程序的客户端库。它允许开发人员利用 Kafka 的消息传递能力来处理输入流,并生成输出流,从而实现实时数据处理和分析。

Kafka 中 Streams 的一些功能:

  1.  实时流处理: Kafka Streams 提供了一种简单而强大的方式来处理实时数据流。它可以从一个或多个主题中读取输入数据,并将处理后的数据写入一个或多个输出主题中。
  2.  状态管理: Kafka Streams 允许开发人员在处理数据时维护本地状态。这样可以方便地进行聚合、连接、过滤等操作,而无需外部存储或数据库。
  3.  容错性: Kafka Streams 提供了内置的容错机制,能够在发生故障时自动恢复。它通过使用 Kafka 主题中的消息来保证处理的幂等性和容错性。
  4.  事件时间处理: Kafka Streams 支持事件时间处理,可以处理事件发生的时间而不仅仅是处理消息到达的时间。这对于实时数据处理和窗口操作非常重要。
  5.  灵活性: Kafka Streams 可以与其他 Kafka 生态系统的组件无缝集成,比如连接到外部数据库、调用外部服务、与其他流处理引擎协同工作等。
  6.  水平扩展: Kafka Streams 应用程序可以很容易地水平扩展,以处理大规模的数据流,同时保持高吞吐量和低延迟。

Kafka Streams 提供了一种简单而强大的方式来构建实时流处理应用程序,它使得开发人员可以利用 Kafka 的消息传递能力来处理实时数据,并且无需引入额外的复杂的基础设施。

连接器Connector

在 Apache Kafka 生态系统中,Connectors 是用于将 Kafka 主题与外部数据存储、应用程序或数据处理系统进行连接的组件。它们允许可靠地将数据从 Kafka 主题导出到外部系统,或者将外部系统的数据导入到 Kafka 主题中。这种连接是双向的,使得数据能够在不同系统之间自由流动。

Connector 的功能特性:

  •  可伸缩性: Connectors 可以水平扩展,以处理大规模的数据流。它们可以并行地处理多个分区的数据,从而实现高吞吐量和低延迟。
  •  可插拔性: Kafka Connect 提供了一种插件化的架构,允许开发人员轻松地编写自定义的 Connectors,并将它们部署到 Kafka Connect 集群中。这种可插拔性使得 Connectors 能够与各种不同的数据系统集成,如数据库、文件系统、消息队列等。
  •  数据转换: Connectors 可以进行数据格式的转换,使得从 Kafka 主题到外部系统,或者从外部系统到 Kafka 主题的数据传输过程中,能够进行格式的调整和映射。
  •  偏移量管理: Connectors 负责管理数据传输的偏移量,确保数据能够准确地传输,避免重复处理和数据丢失。
  •  内置 Connectors: Kafka Connect 提供了一些内置的 Connectors,用于连接常见的数据源和数据目的地,如文件系统、数据库、HDFS 等。这些内置 Connectors 可以快速配置和部署,使得与常见的数据系统进行集成变得更加容易。

重平衡

在 Apache Kafka 中,重平衡(rebalancing)是指在消费者组中的消费者实例发生变化时,Kafka 重新分配分区以确保负载均衡的过程。这种变化可能是由于新的消费者加入消费者组,或者现有的消费者离开了消费者组。重平衡的目的是确保每个消费者实例负责处理的分区数量是大致相等的,从而实现消费者组内部的负载均衡。

重平衡的过程大致如下:

  1.  消费者加入或离开: 当有新的消费者加入消费者组,或者现有的消费者离开消费者组时,触发重平衡过程。
  2.  重新分配分区: Kafka 会根据消费者组内部的消费者数量和分区的分布情况,重新计算并分配分区给每个消费者实例。
  3.  消费者重新分配: 每个消费者实例接收到新的分配结果后,会停止处理之前负责的分区,并开始处理新分配的分区。

重平衡的过程是自动进行的,Kafka 负责管理和协调整个过程,确保消费者组内部的负载均衡。重平衡的频率取决于消费者组内部的变化情况,一般情况下,当有新的消费者加入或离开消费者组时才会触发重平衡。

kafka 安装、配置&启动 

Archlinux下可以直接安装使用extra/kafka包,默认已经配置好,且包含了zookeeper服务,可直接启动运行。kafka运行在9092端口,zookeeper运行在2181端口。

$sudo pacman -S kafka
$sudo systemctl start kafka
$sudo systemctl status kafka
● kafka.service - Kafka server
     Loaded: loaded (/usr/lib/systemd/system/kafka.service; disabled; preset: disabled)      
     Active: active (running) since Fri 2024-08-16 10:05:47 CST; 43min ago
 Invocation: 4df1882481e74cf59988c8867e1d5d5c
   Main PID: 2271 (java)
      Tasks: 78 (limit: 4697)
     Memory: 429.9M
     CGroup: /system.slice/kafka.service

kafka安装

下载安装

到官网http://kafka.apache.org/download

:由于Kafka控制台脚本对于基于Unix和Windows的平台是不同的,

因此在Windows平台上使用binwindows 而不是bin/ 将脚本扩展名更改为.bat。

# wget http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
# tar -C /data/ -xvf kafka_2.11-2.1.0.tgz 
# cd /data/kafka_2.11-2.1.0/

 

配置启动zookeeper

kafka运行依赖zookeeper;需要启动zookeeper才能正常使用Kafka

zookeeper需要java环境:

# yum -y install java-1.8.0

这里kafka下载包已经包括zookeeper服务,所以只需修改配置文件,启动即可。

# config/zookeeper.properties
------------------------------------------
dataDir=/tmp/zookeeper   #数据存储目录
clientPort=2181   #zookeeper端口
maxClientCnxns=0

 

配置kafka

一、修改配置文件

# config/server.properties
--------------------------------------------
broker.id=0
listeners=PLAINTEXT://localhost:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

可根据自己需求修改配置文件

 broker.id:#唯一标识ID
 listeners=PLAINTEXT://localhost:9092:#kafka服务监听地址和端口
 log.dirs:#日志存储目录
 zookeeper.connect:#指定zookeeper服务

二、配置环境变量

# /etc/profile.d/kafka.sh
-------------------
export KAFKA_HOME="/data/kafka_2.11-2.1.0"
export PATH="${KAFKA_HOME}/bin:$PATH"
-------------------
# source /etc/profile.d/kafka.sh

三、配置服务启动脚本

# /etc/init.d/kafka
------------
#!/bin/sh
#
# chkconfig: 345 99 01
# description: Kafka
#
# File : Kafka
#
# Description: Starts and stops the Kafka server
#
 
source /etc/rc.d/init.d/functions
 
KAFKA_HOME=/data/kafka_2.11-2.1.0
KAFKA_USER=root
export LOG_DIR=/tmp/kafka-logs
 
[ -e /etc/sysconfig/kafka ] && . /etc/sysconfig/kafka
 
# See how we were called.
case "$1" in
 
  start)
    echo -n "Starting Kafka:"
    /sbin/runuser -s /bin/sh $KAFKA_USER -c "nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties > $LOG_DIR/server.out 2> $LOG_DIR/server.err &"
    echo " done."
    exit 0
    ;;
 
  stop)
    echo -n "Stopping Kafka: "
    /sbin/runuser -s /bin/sh $KAFKA_USER  -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}' | xargs kill -9"
    echo " done."
    exit 0
    ;;
  hardstop)
    echo -n "Stopping (hard) Kafka: "
    /sbin/runuser -s /bin/sh $KAFKA_USER  -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}' | xargs kill -9"
    echo " done."
    exit 0
    ;;
 
  status)
    c_pid=`ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}'`
    if [ "$c_pid" = "" ] ; then
      echo "Stopped"
      exit 3
    else
      echo "Running $c_pid"
      exit 0
    fi
    ;;
 
  restart)
    stop
    start
    ;;
 
  *)
    echo "Usage: kafka {start|stop|hardstop|status|restart}"
    exit 1
    ;;
 
esac

启动kafka服务

一、后台启动zookeeper服务

# nohup zookeeper-server-start.sh /data/kafka_2.11-2.1.0/config/zookeeper.properties &

二、启动kafka服务

# systemctl start kafka
# systemctl status kafka 
● kafka.service - Kafka server
     Loaded: loaded (/usr/lib/systemd/system/kafka.service; disabled; preset: disabled)      
     Active: active (running) since Fri 2024-08-16 10:05:47 CST; 43min ago
 Invocation: 4df1882481e74cf59988c8867e1d5d5c
   Main PID: 2271 (java)
      Tasks: 78 (limit: 4697)
     Memory: 429.9M
     CGroup: /system.slice/kafka.service             

简单的使用

创建Topic

创建一个名为“alone”的主题,它只包含一个分区,只有一个副本:

$ kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic alone
Created topic alone.

如果我们运行list topic命令,我们现在可以看到该主题:

$ kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
alone

发送一些消息

Kafka附带一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到Kafka集群。默认情况下,每行将作为单独的消息发送。

运行生产者,然后在控制台中键入一些消息以发送到服务器。

$ kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic alone
>Hallo
>This is a message

消费一下消息

Kafka还有一个命令行使用者,它会将消息转储到标准输出。

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic alone --from-beginning
Hallo
This is message

所有命令行工具都有其他选项; 运行不带参数的命令将显示更详细地记录它们的使用信息。

kafka-topics.sh --describe --topic alone --bootstrap-server 127.0.0.1:9092

[2024-08-15 18:10:53,804] WARN [AdminClient clientId=adminclient-1] The DescribeTopicPartitions API is not supported, using Metadata API to describe topics. (org
.apache.kafka.clients.admin.KafkaAdminClient)
Topic: alone    TopicId: XGb_Eq1RTsSraqR8e-owmg PartitionCount: 1       ReplicationFactor: 1                                                                    C
onfigs:
        Topic: alone    Partition: 0    Leader: 0       Replicas: 0     Isr: 0 Elr: N/A                                                                         L
astKnownElr: N/A

设置多代理kafka群集

到目前为止,我们一直在与一个broker运行,但这并不好玩。对于Kafka,单个代理只是一个大小为1的集群,因此除了启动一些代理实例之外没有太多变化。但是为了感受它,让我们将我们的集群扩展到三个节点(仍然在我们的本地机器上)。

准备配置文件

# cd /data/kafka_2.11-2.1.0/
# cp config/server.properties config/server-1.properties
# cp config/server.properties config/server-2.properties
# vim config/server-1.properties
------------------------
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs-1
------------------------
# vim config/server-2.properties
------------------------
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dirs=/tmp/kafka-logs-2
-------------------------

注:该broker.id 属性是群集中每个节点的唯一且永久的名称。

我们必须覆盖端口和日志目录,因为我们在同一台机器上运行这些,并且我们希望让所有代理尝试在同一端口上注册或覆盖彼此的数据。

开启集群另2个kafka服务

# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-1.properties &
# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-2.properties &
# ss -nutl
Netid State      Recv-Q Send-Q     Local Address:Port                    Peer Address:Port                          
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9092                              :::*                 
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9093                              :::*                                
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9094                              :::*

在集群中进行操作

 一、现在创建一个复制因子为3的新主题my-replicated-topic

# kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic "my-replicated-topic".

二、在一个集群中,运行“describe topics”命令查看哪个broker正在做什么

# kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 2,0,1 Isr: 2,0,1
#注释:第一行给出了所有分区的摘要,每个附加行提供有关一个分区的信息。由于我们只有一个分区用于此主题,因此只有一行。
#“leader”是负责给定分区的所有读取和写入的节点。每个节点将成为随机选择的分区部分的领导者。
#“replicas”是复制此分区日志的节点列表,无论它们是否为领导者,或者即使它们当前处于活动状态。
# “isr”是“同步”复制品的集合。这是副本列表的子集,该列表当前处于活跃状态并且已经被领导者捕获。
#请注意,Leader: 2,在我的示例中,节点2 是该主题的唯一分区的Leader。

三、可以在我们创建的原始主题上运行相同的命令,以查看它的位置

# kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic alone
Topic:along PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: alone    Partition: 0    Leader: 0   Replicas: 0 Isr: 0

四 、向我们的新主题发布一些消息:

# kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>my test message 1
>my test message 2
>^C

五、现在让我们使用这些消息:

# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test message 1
my test message 2

5.4 测试集群的容错性

一、现在让我们测试一下容错性。Broker 2 充当leader 所以让我们杀了它:

# ps aux | grep server-2.properties |awk '{print $2}'
106737
# kill -9 106737
# ss -nutl
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9092                              :::*                       
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9093                              :::*

二、leader 已切换到其中一个从属节点,节点2不再位于同步副本集中:

# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 0   Replicas: 2,0,1 Isr: 0,1

三、即使最初接受写入的leader 已经失败,这些消息仍可供消费:

# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test message 1
my test message 2

使用Kafka Connect导入/导出数据

从控制台写入数据并将其写回控制台是一个方便的起点,但有时候可能希望使用其他来源的数据或将数据从Kafka导出到其他系统。对于许多系统,您可以使用Kafka Connect导入或导出数据,而不是编写自定义集成代码。

Kafka Connect是Kafka附带的工具,用于向Kafka导入和导出数据。它是一个可扩展的工具,运行连接器,实现与外部系统交互的自定义逻辑。在本快速入门中,我们将了解如何使用简单的连接器运行Kafka Connect,这些连接器将数据从文件导入Kafka主题并将数据从Kafka主题导出到文件。

一、首先创建一些种子数据进行测试:

# echo -e "foonbar" > test.txt
或者在Windows上:
> echo foo> test.txt
> echo bar>> test.txt

二、接下来,启动两个以独立模式运行的连接器,这意味着它们在单个本地专用进程中运行。提供三个配置文件作为参数。

第一个始终是Kafka Connect流程的配置,包含常见配置,例如要连接的Kafka代理和数据的序列化格式。

其余配置文件均指定要创建的连接器。这些文件包括唯一的连接器名称,要实例化的连接器类以及连接器所需的任何其他配置。

# connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
[2019-01-16 16:16:31,884] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:67)
[2019-01-16 16:16:31,903] INFO WorkerInfo values:
... ...
#注:Kafka附带的这些示例配置文件使用您之前启动的默认本地群集配置并创建两个连接器:第一个是源连接器,它从输入文件读取行并生成每个Kafka主题,第二个是宿连接器从Kafka主题读取消息并将每个消息生成为输出文件中的一行。

三、验证是否导入成功(另起终端)

在启动过程中,您将看到许多日志消息,包括一些指示正在实例化连接器的日志消息。

① 一旦Kafka Connect进程启动,源连接器应该开始从test.txt主题读取行并将其生成到主题connect-test,并且接收器连接器应该开始从主题读取消息connect-test 并将它们写入文件test.sink.txt。我们可以通过检查输出文件的内容来验证数据是否已通过整个管道传递:

# cat test.sink.txt
foo
bar

② 请注意,数据存储在Kafka主题中connect-test,因此我们还可以运行控制台使用者来查看主题中的数据(或使用自定义使用者代码来处理它):

# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

四、继续追加数据,验证

# echo Another line>> test.txt 
# cat test.sink.txt
foo
bar
Another line
# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
{"schema":{"type":"string","optional":false},"payload":"Another line"}

其他

kafka那些脚本都是干什么的?

  1. kafka-acls.sh - 用于管理 Kafka 的访问控制列表 (ACLs),用于控制谁可以执行哪些操作。
  2. kafka-broker-api-versions.sh - 用于检索 Kafka broker 支持的 API 版本信息。
  3. kafka-client-metrics.sh - 用于收集和查看 Kafka 客户端的指标数据。
  4. kafka-cluster.sh - 用于执行 Kafka 集群级别的操作。
  5. kafka-configs.sh - 用于管理 Kafka 的配置参数。
  6. kafka-console-consumer.sh - 用于从 Kafka 主题中消费消息的命令行工具。
  7. kafka-console-producer.sh - 用于向 Kafka 主题中生产消息的命令行工具。
  8. kafka-consumer-groups.sh - 用于管理 Kafka 消费者组。
  9. kafka-consumer-perf-test.sh - 用于执行 Kafka 消费者性能测试。
  10. kafka-delegation-tokens.sh - 用于管理 Kafka 委托令牌。
  11. kafka-delete-records.sh - 用于删除 Kafka 主题中的记录。
  12. kafka-dump-log.sh - 用于将 Kafka 分区日志转储到可读的文本格式。
  13. kafka-e2e-latency.sh - 用于测试 Kafka 的端到端延迟。
  14. kafka-features.sh - 用于显示 Kafka 实例的功能列表。
  15. kafka-get-offsets.sh - 用于获取 Kafka 主题分区的偏移量信息。
  16. kafka-jmx.sh - 用于启动 Kafka JMX 监控。
  17. kafka-leader-election.sh - 用于手动触发 Kafka 分区的 leader 选举。
  18. kafka-log-dirs.sh - 用于查看和管理 Kafka broker 上的日志目录。
  19. kafka-metadata-quorum.sh - 用于管理 Kafka 元数据的复制和一致性。
  20. kafka-metadata-shell.sh - 用于与 Kafka 元数据进行交互的命令行工具。
  21. kafka-mirror-maker.sh - 用于配置和运行 Kafka 镜像制造者。
  22. kafka-producer-perf-test.sh - 用于执行 Kafka 生产者性能测试。
  23. kafka-reassign-partitions.sh - 用于重新分配 Kafka 分区。
  24. kafka-replica-verification.sh - 用于验证 Kafka 副本的一致性。
  25. kafka-run-class.sh - 用于在 Kafka 中运行 Java 类。
  26. kafka-server-start.sh - 用于启动 Kafka 服务器。
  27. kafka-server-stop.sh - 用于停止 Kafka 服务器。
  28. kafka-storage.sh - 用于执行 Kafka 存储操作。
  29. kafka-streams-application-reset.sh - 用于重置 Kafka 流应用程序的状态。
  30. kafka-topics.sh - 用于管理 Kafka 主题。
  31. kafka-transactions.sh - 用于管理 Kafka 事务。
  32. kafka-verifiable-consumer.sh - 用于验证 Kafka 消费者。
  33. kafka-verifiable-producer.sh - 用于验证 Kafka 生产者。

“您的支持是我持续分享的动力”

微信收款码
微信
支付宝收款码
支付宝

目录