自带脚本使用
kafka的安装包中包含了大量的使用脚本,开发人员可以使用脚本来帮助开发
- 启动脚本 ,–daemon是表示后台启动
bin/kafka-server-start.sh –daemon config/server.properties –daemon
- 创建topic命令 ,--topic topic-demo 主题名称,--replication-factor 3 副本因子,副本的个数,--partitions 4 分区数
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic topic-demo --replication-factor 3 --partitions 4
- 查看topic数据
bin/kafka-topics.sh --zookeeper localhost: 2181/kafka --describe --topic topic-demo
- 启动消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-demo
- 启动生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-demo
角色
- broker 指kafka服务节点
- consumer 消费者,从broker消费消息
- producer 生产者,往broker上发送消息
- controller 控制器,负责管理整个kafka集群中所有分区和副本的状态
- leader 集群中的leader副本对外提供读写能力
- follower 集群中的follower副本只负责对leader副本进行数据同步,当leader副本挂掉时会参与选取
- consumer_group 是官方用来提供扩容以及集群容错的一种消费机制
生产者
kafka的客户端主要组成部分有三个,main线程,RecordAccumulator以及sender线程,main线程主要是对发送的消息进行拦截、序列化以及分区操作,操作完成之后将消息追加到RecordAccumulator中,accumulator的数据结构可以简化成key、Deque
首先检测对应分区的Deque队列的最后一个元素是否可以追加,如果不可以则重新创建一个后追加。Sender线程负责将accumulator缓存中的数据发送到borker节点中,但是在发送之前会进行一次类似网络OSI模型的一种业务包装。将包装好的<NodeId, Deque>缓存到InFlightRequests中,这里是存放已经发送但是还没有收到响应请求的重复消息。
- 拦截器
拦截器主要在消息发送之前对消息进行拦截,做一些通用操作,onSend方法是在发送之前调用,onAcknowledgement在进行ack确认的时候调用
// properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName());
public interface ProducerInterceptor<K, V> extends Configurable {
ProducerRecord<K, V> onSend(ProducerRecord<K, V> var1);
void onAcknowledgement(RecordMetadata var1, Exception var2);
void close();
}
- 序列化器
序列化器对消息进行序列化操作,方便消息网络传输,configure中可以获取kafka的配置信息,比如编码
public interface Serializer<T> extends Closeable {
void configure(Map<String, ?> var1, boolean var2);
byte[] serialize(String var1, T var2);
void close();
}
- 分区器
分区器是确定消息归属分区的作用,默认的分区器是通过轮询的方式,DefaultPartitioner会根据topic生成一个AtomicInteger消息计数器,在发送每一个消息都会对应一个消息编号,最终会根据这个消息编号对topic主题的分区数进行取余得到最终的分区。
public interface Partitioner extends Configurable, Closeable {
int partition(String var1, Object var2, byte[] var3, Object var4, byte[] var5, Cluster var6);
void close();
}
消费者
kafka消费者是单线程非线程安全的,所以要实现多线程需要自己设计方案,常见的方案有以下两种:
- 第一种将每个消费者对应一个线程,这样的话就不存在线程安全的问题,对于位移的提交来说也是最简单的,但是这样做的缺点也很明显,就是资源占用的问题,多线程之前的上下文切换,如果处理的处理的业务还包含tcp等耗时的网络请求,那么就会导致效率降低。
- 第二种就是将处理消息的业务逻辑模块使用多线程处理,这样的就规避了方案一的问题,不过需要将位移提交单独拿出来出来,并且还要使用锁来保证线程安全。
// properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName());
public interface ConsumerInterceptor<K, V> extends Configurable {
ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> var1);
void onCommit(Map<TopicPartition, OffsetAndMetadata> var1);
void close();
}
术语
- HW 高水位
- LEO log end offset即下一条即将写入的消息的位移
- LSO log stable offset
- AR assigned replicas集群中所有副本
- ISR in sync replicas与leader副本保持一定程度上同步的follower副本
- OSR out sync replicas被踢出isr集合中的follower副本组合成的一个集合
- Offset borker中称为偏移量,消费者中称为消费位移
replica.lag.time.max.ms
replica.lag.max.messages(0.9+)
数据组织结构
partition topic replica segement
工作机制
rebalance ack transcation log_delete log_compression leader_ephoc