说明一下kafka的使用情况
1.kafka apache 使用在什么场合
1、Messaging
对于一些常规的消息系统,kafka是个不错的选择;partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势.不过到目前为止,我们应该很清楚认识到,kafka并没有提供JMS中的"事务性""消息传输担保(消息确认机制)""消息分组"等企业级特性;kafka只能使用作为"常规"的消息系统,在一定程度上,尚未确保消息的发送与接收绝对可靠(比如,消息重发,消息发送丢失等)
2、Websit activity tracking
kafka可以作为"网站活性跟踪"的最佳工具;可以将网页/用户操作等信息发送到kafka中.并实时监控,或者离线统计分析等
3、Log Aggregation
kafka的特性决定它非常适合作为"日志收集中心";application可以将操作日志"批量""异步"的发送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/压缩消息等,这对producer端而言,几乎感觉不到性能的开支.此时consumer端可以使hadoop等其他系统化的存储和分析系统.
2.如何查看kafka消费者信息
在Kafak中国社区的qq群中,这个问题被提及的比例是相当高的,这也是Kafka用户最常碰到的问题之一。
本文结合Kafka源码试图对该问题相关的因素进行探讨。希望对大家有所帮助。
怎么确定分区数?“我应该选择几个分区?”——如果你在Kafka中国社区的群里,这样的问题你会经常碰到的。不过有些遗憾的是,我们似乎并没有很权威的答案能够解答这样的问题。
其实这也不奇怪,毕竟这样的问题通常都是没有固定答案的。Kafka官网上标榜自己是"high-throughputdistributedmessagingsystem",即一个高吞吐量的分布式消息引擎。
那么怎么达到高吞吐量呢?Kafka在底层摒弃了Java堆缓存机制,采用了操作系统级别的页缓存,同时将随机写操作改为顺序写,再结合Zero-Copy的特性极大地改善了IO性能。但是,这只是一个方面,毕竟单机优化的能力是有上限的。
如何通过水平扩展甚至是线性扩展来进一步提升吞吐量呢?Kafka就是使用了分区partition,通过将topic的消息打散到多个分区并分布保存在不同的broker上实现了消息处理不管是producer还是consumer的高吞吐量。Kafka的生产者和消费者都可以多线程地并行操作,而每个线程处理的是一个分区的数据。
因此分区实际上是调优Kafka并行度的最小单元。对于producer而言,它实际上是用多个线程并发地向不同分区所在的broker发起Socket连接同时给这些分区发送消息;而consumer呢,同一个消费组内的所有consumer线程都被指定topic的某一个分区进行消费具体如何确定consumer线程数目我们后面会详细说明。
所以说,如果一个topic分区越多,理论上整个集群所能达到的吞吐量就越大。但分区是否越多越好呢?显然也不是,因为每个分区都有自己的开销:一、客户端/服务器端需要使用的内存就越多先说说客户端的情况。
Kafka082之后推出了Java版的全新的producer,这个producer有个参数batchsize,默认是16KB。它会为每个分区缓存消息,一旦满了就打包将消息批量发出。
看上去这是个能够提升性能的设计。不过很显然,因为这个参数是分区级别的,如果分区数越多,这部分缓存所需的内存占用也会。
假设你有10000个分区,按照默认设置,这部分缓存需要占用约157MB的内存。而consumer端呢?我们抛开获取数据所需的内存不说,只说线程的开销。
如果还是假设有10000个分区,同时consumer线程数要匹配分区数大部分情况下是最佳的消费吞吐量配置的话,那么在consumerclient就要创建10000个线程,也需要创建大约10000个Socket去获取分区数据。这里面的线程切换的开销本身已经不容小觑了。
服务器端的开销也不小,如果阅读Kafka源码的话可以发现,服务器端的很多组件都在内存中维护了分区级别的缓存,比如controller,FetcherManager等,因此分区数越多,这种缓存的成本越久越大。二、文件句柄的开销每个分区在底层文件系统都有属于自己的一个目录。
该目录下通常会有两个文件:base_offsetlog和base_offsetindex。Kafak的controller和ReplicaManager会为每个broker都保存这两个文件句柄filehandler。
很明显,如果分区数越多,所需要保持打开状态的文件句柄数也就越多,最终可能会突破你的ulimit-n的限制。三、降低高可用性Kafka通过副本replica机制来保证高可用。
具体做法就是为每个分区保存若干个副本replica_factor指定副本数。每个副本保存在不同的broker上。
期中的一个副本充当leader副本,负责处理producer和consumer请求。其他副本充当follower角色,由Kafkacontroller负责保证与leader的同步。
如果leader所在的broker挂掉了,contorller会检测到然后在zookeeper的帮助下重选出新的leader——这中间会有短暂的不可用时间窗口,虽然大部分情况下可能只是几毫秒级别。但如果你有10000个分区,10个broker,也就是说平均每个broker上有1000个分区。
此时这个broker挂掉了,那么zookeeper和controller需要立即对这1000个分区进行leader选举。比起很少的分区leader选举而言,这必然要花更长的时间,并且通常不是线性累加的。
如果这个broker还同时是controller情况就更糟了。说了这么多“废话”,很多人肯定已经不耐烦了。
那你说到底要怎么确定分区数呢?答案就是:视情况而定。基本上你还是需要通过一系列实验和测试来确定。
当然测试的依据应该是吞吐量。虽然linkedIn这篇文章做了Kafka的基准测试,但它的结果其实对你意义不大,因为不同的硬件、软件、负载情况测试出来的结果必然不一样。
我经常碰到的问题类似于,官网说每秒能到10MB,为什么我的producer每秒才1MB?——且不说硬件条件,最后发现他使用的消息体有1KB,而官网的基准测试是用100B测出来的,因此根本没有可比性。不过你依然可以遵循一定的步骤来尝试确定分区数:创建一个只有1个分区的topic,然后测试这个topic的producer吞吐量和consumer吞吐量。
假设它们的值分别是Tp和Tc,单位可以是MB/s。然后假设总的目标吞吐量是Tt,那么分区数=Tt/maxTp,TcTp表示producer的吞吐量。
测试producer通常是很容易的,因为它的逻辑非常简单,就是直接发送消息到Kafka就好了。Tc表示。
3.如何在kafka
kafka-python:蛮荒的西部kafka-python是最受欢迎的Kafka Python客户端。
我们过去使用时从未出现过任何问题,在我的《敏捷数据科学2.0》一书中我也用过它。然而在最近这个项目中,它却出现了一个严重的问题。
我们发现,当以文档化的方式使用KafkaConsumer、Consumer迭代式地从消息队列中获取消息时,最终到达主题topic的由Consumer携带的消息通常会丢失。我们通过控制台Consumer的分析验证了这一点。
需要更详细说明的是,kafka-python和KafkaConsumer是与一个由SSL保护的Kafka服务(如Aiven Kafka)一同使用的,如下面这样:kafka_consumer = KafkaConsumer(topic,enable_auto_commit=True,group_id=group_id,bootstrap_servers=config.kafka.host,api_version=(0, 10),security_protocol='SSL',ssl_check_hostname=True,ssl_cafile=config.kafka.ca_pem,ssl_certfile=config.kafka.service_cert,ssl_keyfile=config.kafka.service_key)for message in kafka_consumer:application_message = json.loads(message.value.decode())。当以这样的推荐方式使用时,KafkaConsumer会丢失消息。
但有一个变通方案,就是保留所有消息。这个方案是Kafka服务提供商Aiven support提供给我们的。
它看起来像这样:while True:raw_messages = consumer.poll(timeout_ms=1000, max_records=5000)for topic_partition, messages in raw_messages.items():application_message = json.loads(message.value.decode())。虽然这个变通方案可能有用,但README中的方法会丢弃消息使我对其失去兴趣。
所以我找到了一个替代方案。confluent-kafka:企业支持发现coufluent-kafka Python模块时,我感到无比惊喜。
它既能做librdkafka的外封装,又非常小巧。librdkafka是一个用C语言写的kafka库,它是Go和.NET的基础。
更重要的是,它由Confluent公司支持。我爱开源,但是当“由非正式社区拥有或支持”这种方式效果不行的时候,或许该考虑给替代方案印上公章、即该由某个公司拥有或支持了。
不过,我们并未购买商业支持。我们知道有人会维护这个库的软件质量,而且可以选择买或不买商业支持,这一点真是太棒了。
用confluent-kafka替换kafka-python非常简单。confluent-kafka使用poll方法,它类似于上面提到的访问kafka-python的变通方案。
kafka_consumer = Consumer({"api.version.request": True,"enable.auto.commit": True,"group.id": group_id,"bootstrap.servers": config.kafka.host,"security.protocol": "ssl","ssl.ca.location": config.kafka.ca_pem,"ssl.certificate.location": config.kafka.service_cert,"ssl.key.location": config.kafka.service_key,"default.topic.config": {"auto.offset.reset": "smallest"}})consumer.subscribe([topic])# Now loop on the consumer to read messagesrunning = Truewhile running:message = kafka_consumer.poll()application_message = json.load(message.value.decode())kafka_consumer.close()现在我们能收到所有消息了。我并不是说kafka-python工具不好,我相信社区会对它的问题做出反应并解决。
但从现在开始,我会一直坚持使用confluent-kafka。开源治理开源是强大的,但是涉及到复杂的“大数据”和NoSQL工具时,通常需要有一家大公司在背后推动工具的开发。
这样你就知道,如果那个公司可以使用工具,那么该工具应该拥有很好的基本功能。它的出现可能是非正式的,就像某公司发布类似FOSS的项目一样,但也可能是正式的,就像某公司为工具提供商业支持一样。
当然,从另一个角度来看,如果一家与开源社区作对的公司负责开发某个工具,你便失去了控制权。你的意见可能无关紧要,除非你是付费客户。
理想情况是采取开源治理,就像Apache基金会一样,还有就是增加可用的商业支持选项。这对互联网上大部分的免费软件来说根本不可能。
限制自己只使用那些公司盖章批准后的工具将非常限制你的自由。这对于一些商店可能是正确选择,但对于我们不是。
我喜欢工具测试,如果工具很小,而且只专心做一件事,我就会使用它。信任开源对于更大型的工具,以上决策评估过程更为复杂。
通常,我会看一下提交问题和贡献者的数量,以及最后一次commit的日期。我可能会问朋友某个工具的情况,有时也会在推特上问。
当你进行嗅探检查后从Github选择了一个项目,即说明你信任社区可以产出好的工具。对于大多数工具来说,这是没问题的。
但信任社区可能存在问题。对于某个特定的工具,可能并没有充分的理由让你信任社区可以产出好的软件。
社区在目标、经验和开源项目的投入时间方面各不相同。选择工具时保持审慎态度十分重要,不要让理想蒙蔽了判断。
4.kafka 的实现依赖了哪些东西
1. 通常来说,kafka的使用是为了消息的持久化(persistent messages)
2. 吞吐量是kafka设计的主要目标
3. 关于消费的状态被记录为consumer的一部分,而不是server。这点稍微解释下,这里的server还是只broker,谁消费了多少数据都记录在消费者自己手中,不存在broker中。按理说,消费记录也是一个日志,可以放在broker中,至于为什么要这么设计,我们写下去了再说。
4. Kafka的分布式可以表现在producer、broker、consumer都可以分布在多台机器上。