网站首页 > 基础教程 正文
Apache Kafka是目前非常流行的。它被作为一个消息代理来利用,但可以通过额外的工具来扩展,成为一个完整的消息处理平台。它经常被用来以异步的方式沟通微服务,但也用来处理消息流。每个工具都有其注意事项。小的配置失误可能会导致大的灾难。在这篇博客中,我们将关注7个最常见的错误以及如何避免它们。
错误1 - 让我们使用默认设置
向不存在的Kafka主题发送消息,默认情况下会导致其创建。不幸的是,默认设置定义了一个单一的分区和复制因子1。由于可能的数据丢失和有限的可扩展性,它们对于生产使用来说是完全不可接受的。更重要的是,这些设置会影响各种Kafka "特殊 "主题,例如Kafka流使用的中间主题或Kafka Connect在分布式模式下利用的主题。
如何避免这种情况?
- 首先,你可以禁用auto.create.themes.enable(在Broker层面),这样每个主题都需要明确地手动创建。
- 另一种方法是覆盖default.replication.factor和num.partitions(在broker层面),以改变自动创建的主题的默认值。
错误2--现在先设置一个分区,以后再改。
什么是正确的分区数?这是一个相当困难的问题,当他们需要为新主题定义配置时,每个人都会问这个问题。不幸的是,没有一个简单的答案,只有你需要遵循的建议。
首先,我们来定义下限。分区的数量定义了来自一个消费者组的最大消费者数量。如果我们定义了2个分区,那么同一组的2个消费者就可以消费这些信息。更重要的是,如果我们定义的数字太小,那么分区可能不会位于所有可能的Broker上,导致集群利用率不均匀。基本上,这意味着最大的消费速度和生产速度受到可能的分区数量范围的下限的影响。
更高的界限是什么?我们什么时候知道有太多的分区?嗯,大的分区数量会影响各种进程。发布者会在每个分区缓冲发送至Kafka的消息批次。更多的分区数量=不同键的消息落在同一个分区的概率更低,这意味着获得更大批次的概率更低。此外,更多的分区意味着更多的独立缓冲区=更多的内存。这意味着太大的分区数量会影响生产者的吞吐量和内存使用。
Kafka 2.4.0引入了Sticky Partitioner的概念(KIP-480).默认的分区器,对于没有明确密钥的消息是使用Round Robin算法。在Sticky Partitioner的情况下,生产者的目标是为单个分区填充批处理,一旦满了,就开始把消息放入另一个批处理,用于下一个分区。
在Broker方面,更多的分区意味着更多的打开的文件处理。不过这并不令人担心。另一方面,每个分区都有一个领导者,由Kafka控制器选择。如果同时需要选择多个新的领导者(例如,由于代理失败),那么控制器可能会产生更大的延迟的 "决定",分区可能会有更长的时间无法使用。例如,1000个分区的领导者选举过程大约需要5秒。
如果你决定将一些数字设置为 "临时",以后再调整分区的数量,那该怎么办?是的,那是可能的。然而,默认的分区器对于定义了key的信息,会根据密钥的模数来计算分区的哈希值。当分区的数量发生变化时,具有相同key的信息可能开始被放置到另一个分区。你需要记住这一点,因为根据业务用例,这种变化可能会导致消息的顺序改变(对于给定的Key),在主题调整的时候。
那么,什么是最好的数字呢?实际上,你应该做一些性能测试。模拟负载(比方说未来几年的预测),连接消费者并测试什么数量的分区可以达到所需的性能。记住要使用与生产中可用的相同的硬件规格。
错误3 -让我们使用发布者的默认配置
为了让Kafka Producer工作,实际上只需要定义3个配置键--引导服务器、键和值序列化器。然而,这往往是不够的。Kafka包括很多设置,这些设置可能会影响信息传递的顺序、性能或数据丢失的概率。比如说。
- acks - 定义有多少个同步复制体需要确认信息(默认情况下,只有分区领导,这可能会导致数据丢失,因为领导不会等待数据被写入磁盘,只是写入文件系统缓存)。
- retries - 定义发送失败时的重试次数
- max.in.flight.request - 定义了客户端可能正在处理的未确认的请求的最大数量。在实践中可能会影响消息的排序。
更重要的是,在生产者下面,它将所有被命令发送的消息分批发送。它为每个分区创建缓冲区。这也意味着,当发送回调被调用或适当的Future完成时,消息实际上被发送。
错误4 -让我们使用基本的Java消费者
Kafka的Java客户端相当强大,但是,并没有提供最好的API。使用Java Consumer是相当痛苦的。首先,KafkaConsumer类只能由一个单线程使用。然后,它需要定义一个 "无限 "的while循环,它将轮询经纪人的消息。然而,最重要的是超时和心跳是如何工作的。
心跳是由一个额外的线程处理的,它定期向代理发送一个消息,以显示它正在工作。Kafka还需要一个东西。max.poll.interval.ms(默认5分钟)定义了轮询调用之间的最长时间。如果没有达到这个要求,那么消费者就会离开消费者组。这一点极其重要!比方说,你消费消息并将它们发送到一些外部的HTTP API。在失败的情况下,你可以利用指数反推法。如果超过5分钟,那么......你的消费者将离开组,消息将被传递到另一个实例。这很可怕,也很难维护。避免这种情况的最简单的方法是限制在一次轮询调用中获取的记录数量。
除了普通的Java客户端,还有什么其他选择?有几个。
- Spring for Apache Kafka
- Alpakka Kafka
- FS2 Kafka — Scala lib for Kafka integration with FS2
- Micronaut Kafka
- Quarkus Kafka
- 和其他
大多数库会通过明确的消费暂停来自动管理对轮询间隔的要求。在选择你的库之前,请检查它是否被正确处理。
错误5--我们的业务当然需要确切的一次性语义
通常,当你问你的客户什么样的交付/处理语义是企业可以接受的,你会听到这样的答案:正好一次。然后,你做了一些谷歌搜索,你看到了精彩的标题,说Kafka支持精确的一次!这部分是正确的,从理论上讲,没有这样的说法。这是部分正确的,从理论的角度来看,没有所谓的完全一次性交付......因为,通常它是一个至少一次的去重,什么是有效的完全一次性。
在Kafka的案例中,有一些围绕exactly-once的限制。你需要在生产者端启用特殊的设置(例如,启用idempotence,这需要为每个连接的最大飞行请求、重试和acks设置特定的值)。在消费者方面,这就比较困难了。由于故障,你仍然可以处理超过一次的消息,那么解决方案是重复数据,例如做数据库的上载。
另一个解决方案是使用Kafka Streams,它明确地定义了 "精确一次 "的设置,但实际上这意味着你只能通过从Kafka到Kafka获得精确一次。
错误6--当系统工作时,谁会关心监控?
Kafka Cluster是一个分布式系统。在这样的架构中,很多事情实际上都会出错。监控是必要的,以了解是否一切都在正常工作。观察系统和定义警报很重要。
我曾经为一个客户工作过,他实际上有一个相当不错的Kafka指标的仪表盘。然而,它只是红色的,因为没有人完全理解其中的数值。其中一个发现的问题是复制分区的问题,其中有几百个分区不同步。在最坏的情况下,这可能导致服务不可用,甚至数据丢失。
如果你真的不想丢失数据或失去可用性,你应该真正利用Kafka的指标。有各种工具可以很容易地导出它们(请看 JMX 或 Kafka 导出器).指标可以让你防止灾难的发生。
错误7 -不看发行说明就升级项目的依赖性
自从Kafka 0.11以来,客户端通常与Kafka经纪人向前和向后兼容。当新版本发布时,升级过程非常简单。然而,单一的版本碰撞可能会导致很大的问题。作为一个例子,让我们看一下2.1.0版本。就是这个版本,它引入了 KIP-91 直观的生产者超时影响了Producer retry参数的默认值。它被从0改为Integer.MAX_VALUE。此外,还增加了新的生产者超时。这个变化并没有有效地引入无限重试,它只是默认启用了重试,加上max.in.flight.request.per.connection>1(默认为5)会导致消息重新排序
结论
Kafka是非常强大的,然而,你需要明智地使用它。避免简单的错误可以避免将来出现意外的问题。仔细检查Broker、主题、生产者和消费者使用了哪些设置。在部署之后,围绕最重要的Kafka和业务指标观察和定义警报。这样你就可以确保系统按照你的意图运行。
猜你喜欢
- 2024-10-12 Scala初学者入门指南!涵盖20多个基本技巧
- 2024-10-12 Scala循环性能问题,为了性能,你愿意牺牲代码的可维护性么?
- 2024-10-12 scala中为什么不建议用return scala for until
- 2024-10-12 scala——泛型方法、类、特质的使用,泛型边界、协变逆变非变
- 2024-10-12 程序员构建总是出问题,怎么办? 程序员构建总是出问题,怎么办呢
- 2024-10-12 scala——列表、元祖、列表相关知识
- 2024-10-12 scala语言基础图解-第一阶段(变量-条件-循环-方法-函数-集合)
- 2024-10-12 Scala入门视频已更新至88讲,后续还有大约20讲左右的视频就结束
- 2024-10-12 Scala快速入门 - 环境安装篇 scala安装步骤
- 2024-10-12 Scala 安装及环境配置 scala安装及环境配置
- 最近发表
- 标签列表
-
- gitpush (61)
- pythonif (68)
- location.href (57)
- tail-f (57)
- pythonifelse (59)
- deletesql (62)
- c++模板 (62)
- css3动画 (57)
- c#event (59)
- linuxgzip (68)
- 字符串连接 (73)
- nginx配置文件详解 (61)
- html标签 (69)
- c++初始化列表 (64)
- exec命令 (59)
- canvasfilltext (58)
- mysqlinnodbmyisam区别 (63)
- arraylistadd (66)
- node教程 (59)
- console.table (62)
- c++time_t (58)
- phpcookie (58)
- mysqldatesub函数 (63)
- window10java环境变量设置 (66)
- c++虚函数和纯虚函数的区别 (66)