Kafka 相关 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
wangqianping
V2EX    Kafka

Kafka 相关

  •  
  •   wangqianping 2017-06-09 09:49:13 +08:00 3733 次点击
    这是一个创建于 3051 天前的主题,其中的信息可能已经有所发展或是发生改变。

    在读取 kafka 中数据过程中,发现中途 kafka 挂了,消费端会返回异常么,还是说只是没有数据,并不会异常。

    23 条回复    2017-06-09 19:11:23 +08:00
    stcasshern
        1
    stcasshern  
       2017-06-09 10:06:10 +08:00
    按照我的经验,貌似不会有异常,就是没数据(好像这样
    h3nng
        2
    h3nng  
       2017-06-09 10:19:27 +08:00
    "kafka 挂了" 具体哪种场景?有没有 replicas ?有的话挂一个还是都挂了?
    diudiu13216
        3
    diudiu13216  
       2017-06-09 10:25:37 +08:00
    会有异常抛出
    wangqianping
        4
    wangqianping  
    OP
       2017-06-09 11:00:44 +08:00 via iPhone
    @stcasshern 对,可是如果这样的话,一旦 kafka 挂了,客户端却不知道,好尴尬,有解决方法吗,就是如果 kafka 挂了,客户端会收到相关信息
    wangqianping
        5
    wangqianping  
    OP
       2017-06-09 11:00:59 +08:00 via iPhone
    @h3nng 都挂了
    wangqianping
        6
    wangqianping  
    OP
       2017-06-09 11:01:33 +08:00 via iPhone
    @diudiu13216 确定么,好想我这边没有,客户端不回检测到异常额
    diudiu13216
        7
    diudiu13216  
       2017-06-09 11:38:59 +08:00
    @wangqianping 你用的哪个客户端
    wangqianping
        8
    wangqianping  
    OP
       2017-06-09 12:41:24 +08:00
    @diudiu13216 python-kafka

    class Kafka_consumer():
    def __init__(self, kafkahost, kafkaport, kafkatopic, groupid,flag,offset_value):
    self.kafkaHost = kafkahost
    self.kafkaPort = kafkaport
    self.kafkatopic = kafkatopic
    self.groupid = groupid
    self.cOnsumer= KafkaConsumer(self.kafkatopic, group_id = self.groupid,
    bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
    kafka_host=self.kafkaHost,
    kafka_port=self.kafkaPort ),enable_auto_commit=flag, auto_offset_reset=offset_value)
    def consume_data(self):
    try:
    for message in self.consumer:
    print "partition",message.partition
    print "key",message.key
    except Exception as e:
    print "error",e
    这是我的代码,在跑的过程中,故意中断 kafka,并不会报错,但是我又想显示连接中断的信息
    zoues
        9
    zoues  
       2017-06-09 12:45:03 +08:00 via iPhone
    healthcheck ?
    wangqianping
        10
    wangqianping  
    OP
       2017-06-09 12:49:06 +08:00
    @zoues 恩,测试异常情况下的反应
    gongweixin
        11
    gongweixin  
       2017-06-09 13:06:08 +08:00
    这个自己实验一下不就好了吗
    wangqianping
        12
    wangqianping  
    OP
       2017-06-09 13:07:01 +08:00
    @gongweixin 结果是没有。。
    Buffer2Disk
        13
    Buffer2Disk  
       2017-06-09 13:26:26 +08:00
    At most once: 消息可能会丢,但绝不会重复传输
    At least once:消息绝不会丢,但可能会重复传输
    Exactly once:每条消息肯定会被传输一次且仅传输一次
    Kafka 的消息传输保障机制非常直观。当 producer 向 broker 发送消息时,一旦这条消息被 commit,由于副本机制( replication )的存在,它就不会丢失。但是如果 producer 发送数据给 broker 后,遇到的网络问题而造成通信中断,那 producer 就无法判断该条消息是否已经提交( commit )。虽然 Kafka 无法确定网络故障期间发生了什么,但是 producer 可以 retry 多次,确保消息已经正确传输到 broker 中,所以目前 Kafka 实现的是 at least once。

    consumer 从 broker 中读取消息后,可以选择 commit,该操作会在 Zookeeper 中存下该 consumer 在该 partition 下读取的消息的 offset。该 consumer 下一次再读该 partition 时会从下一条开始读取。如未 commit,下一次读取的开始位置会跟上一次 commit 之后的开始位置相同。当然也可以将 consumer 设置为 autocommit,即 consumer 一旦读取到数据立即自动 commit。如果只讨论这一读取消息的过程,那 Kafka 是确保了 exactly once, 但是如果由于前面 producer 与 broker 之间的某种原因导致消息的重复,那么这里就是 at least once。
    要做到 exactly once 就需要引入消息去重机制。
    Buffer2Disk
        14
    Buffer2Disk  
       2017-06-09 13:27:32 +08:00
    从网上摘抄的一段
    wangqianping
        15
    wangqianping  
    OP
       2017-06-09 13:34:05 +08:00
    @Buffer2Disk 我测试目的不是这个额,我测试的目的是如果 kafka 断了,consumer 不会报错,只是没有数据了,那如果 consumer 无法输出数据,我怎么知道是 kafka 断了,还是本身就没有数据。
    h3nng
        16
    h3nng  
       2017-06-09 14:03:00 +08:00
    @wangqianping #15 Java 客户端应该是有报错的,没有用过 Python 的,应该是客户端的差异。
    funky
        17
    funky  
       2017-06-09 14:08:43 +08:00
    kafka 有 offset 的
    stcasshern
        18
    stcasshern  
       2017-06-09 14:38:38 +08:00
    我理解是 kafka 相当是主动推送的,除非数据本身出了错,不然消费者不会知道是数据消费完了还是 kafka 出错了,当然我遇到的是 kafka 如果连接异常是会报错的,zookeeper 会出问题
    stcasshern
        19
    stcasshern  
       2017-06-09 14:39:42 +08:00
    而且一般不会挂吧,只是由于连接等等问题,导致请求失败那种,那种情况下会有异常
    slixurd
        20
    slixurd  
       2017-06-09 15:01:26 +08:00   2
    @stcasshern kafka 是经典的 pull 模型.不是由 Broker 去 push 消息到 client.而是 client 自行去 pull 的
    gulangyu
        21
    gulangyu  
       2017-06-09 15:18:20 +08:00 via Android
    看到 kafka, 还以为是要聊卡夫卡的小说呢....
    stcasshern
        22
    stcasshern  
       2017-06-09 17:38:10 +08:00
    @slixurd 哭笑,还好不是面试,居然记混了。。。你说的是对的
    find
        23
    find  
       2017-06-09 19:11:23 +08:00 via iPhone
    consumer 里面有一个 leader 回与 groupleader 心跳的,所以所有的 cluster 挂了的话肯定是知道哦
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     4498 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 26ms UTC 01:04 PVG 09:04 LAX 18:04 JFK 21:04
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86