Kafka入门知识点

自带脚本使用

kafka的安装包中包含了大量的使用脚本,开发人员可以使用脚本来帮助开发

  1. 启动脚本 ,–daemon是表示后台启动
    bin/kafka-server-start.sh –daemon config/server.properties –daemon
  2. 创建topic命令 ,--topic topic-demo 主题名称,--replication-factor 3 副本因子,副本的个数,--partitions 4 分区数
    bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic topic-demo --replication-factor 3 --partitions 4
  3. 查看topic数据
    bin/kafka-topics.sh --zookeeper localhost: 2181/kafka --describe --topic topic-demo
  4. 启动消费者
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-demo
  5. 启动生产者
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-demo

角色

  1. broker 指kafka服务节点
  2. consumer 消费者,从broker消费消息
  3. producer 生产者,往broker上发送消息
  4. controller 控制器,负责管理整个kafka集群中所有分区和副本的状态
  5. leader 集群中的leader副本对外提供读写能力
  6. follower 集群中的follower副本只负责对leader副本进行数据同步,当leader副本挂掉时会参与选取
  7. consumer_group 是官方用来提供扩容以及集群容错的一种消费机制

生产者


kafka的客户端主要组成部分有三个,main线程,RecordAccumulator以及sender线程,main线程主要是对发送的消息进行拦截、序列化以及分区操作,操作完成之后将消息追加到RecordAccumulator中,accumulator的数据结构可以简化成key、Deque的键值对集合,当有新的消息插入时,
首先检测对应分区的Deque队列的最后一个元素是否可以追加,如果不可以则重新创建一个后追加。Sender线程负责将accumulator缓存中的数据发送到borker节点中,但是在发送之前会进行一次类似网络OSI模型的一种业务包装。将包装好的<NodeId, Deque>缓存到InFlightRequests中,这里是存放已经发送但是还没有收到响应请求的重复消息。

  1. 拦截器
    拦截器主要在消息发送之前对消息进行拦截,做一些通用操作,onSend方法是在发送之前调用,onAcknowledgement在进行ack确认的时候调用
// properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName());
public interface ProducerInterceptor<K, V> extends Configurable {
    ProducerRecord<K, V> onSend(ProducerRecord<K, V> var1);
    void onAcknowledgement(RecordMetadata var1, Exception var2);
    void close();
}
  1. 序列化器
    序列化器对消息进行序列化操作,方便消息网络传输,configure中可以获取kafka的配置信息,比如编码
public interface Serializer<T> extends Closeable {
    void configure(Map<String, ?> var1, boolean var2);
    byte[] serialize(String var1, T var2);
    void close();
}
  1. 分区器
    分区器是确定消息归属分区的作用,默认的分区器是通过轮询的方式,DefaultPartitioner会根据topic生成一个AtomicInteger消息计数器,在发送每一个消息都会对应一个消息编号,最终会根据这个消息编号对topic主题的分区数进行取余得到最终的分区。
public interface Partitioner extends Configurable, Closeable {
    int partition(String var1, Object var2, byte[] var3, Object var4, byte[] var5, Cluster var6);
    void close();
}

消费者

kafka消费者是单线程非线程安全的,所以要实现多线程需要自己设计方案,常见的方案有以下两种:

  • 第一种将每个消费者对应一个线程,这样的话就不存在线程安全的问题,对于位移的提交来说也是最简单的,但是这样做的缺点也很明显,就是资源占用的问题,多线程之前的上下文切换,如果处理的处理的业务还包含tcp等耗时的网络请求,那么就会导致效率降低。
  • 第二种就是将处理消息的业务逻辑模块使用多线程处理,这样的就规避了方案一的问题,不过需要将位移提交单独拿出来出来,并且还要使用锁来保证线程安全。
// properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName());
public interface ConsumerInterceptor<K, V> extends Configurable {
    ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> var1);
    void onCommit(Map<TopicPartition, OffsetAndMetadata> var1);
    void close();
}

术语

  1. HW 高水位
  2. LEO log end offset即下一条即将写入的消息的位移
  3. LSO log stable offset
  4. AR assigned replicas集群中所有副本
  5. ISR in sync replicas与leader副本保持一定程度上同步的follower副本
  6. OSR out sync replicas被踢出isr集合中的follower副本组合成的一个集合
  7. Offset borker中称为偏移量,消费者中称为消费位移

replica.lag.time.max.ms
replica.lag.max.messages(0.9+)

数据组织结构

partition topic replica segement

工作机制

rebalance ack transcation log_delete log_compression leader_ephoc