#include "kafka_comsumer.h" bool run_ = true; static void sigterm (int sig) { run_ = false; } kafka_consumer_client::kafka_consumer_client(const std::string& brokers, const std::string& topics, std::string groupid, int64_t offset) :brokers_(brokers), topics_(topics), groupid_(groupid), offset_(offset){ } //kafka_consumer_client::kafka_consumer_client(){} kafka_consumer_client::~kafka_consumer_client(){ #ifdef KAFKA //kafka_consumer_->poll(0); kafka_consumer_->stop(topic_, partition_); if(topic_){ delete topic_; topic_ = nullptr; } if(kafka_consumer_){ delete kafka_consumer_; kafka_consumer_ = nullptr; } /*销毁kafka实例*/ RdKafka::wait_destroyed(5000); #endif } bool kafka_consumer_client::initClient(){ #ifdef KAFKA RdKafka::Conf *conf = nullptr; conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); if(!conf){ ErrorL << "RdKafka create global conf failed" << endl; return false; } std::string errstr; /*设置broker list*/ if (conf->set("bootstrap.servers", brokers_, errstr) != RdKafka::Conf::CONF_OK){ ErrorL << "RdKafka conf set brokerlist failed :" << errstr.c_str() << endl; } /*设置consumer group*/ if (conf->set("group.id", groupid_, errstr) != RdKafka::Conf::CONF_OK){ ErrorL << "RdKafka conf set group.id failed : " << errstr.c_str() << endl; } std::string strfetch_num = "10240000"; /*每次从单个分区中拉取消息的最大尺寸*/ if(conf->set("max.partition.fetch.bytes", strfetch_num, errstr) != RdKafka::Conf::CONF_OK){ ErrorL << "RdKafka conf set max.partition failed : " << errstr.c_str() << endl; } /*创建kafka consumer实例*/ kafka_consumer_ = RdKafka::Consumer::create(conf, errstr); if(!kafka_consumer_){ ErrorL << "failed to ceate consumer" << endl; } delete conf; RdKafka::Conf *tconf = nullptr; /*创建kafka topic的配置*/ tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); if(!tconf){ ErrorL << "RdKafka create topic conf failed" << endl; return false; } /*kafka + zookeeper,当消息被消费时,会想zk提交当前groupId的consumer消费的offset信息, 当consumer再次启动将会从此offset开始继续消费.在consumter端配置文件中(或者是 ConsumerConfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset), 有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupId下的消费者, 在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始 消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的 开始位置消费所有消息.*/ if(tconf->set("auto.offset.reset", "largest", errstr) != RdKafka::Conf::CONF_OK){ ErrorL << "RdKafka conf set auto.offset.reset failed : " << errstr.c_str() << endl; } topic_ = RdKafka::Topic::create(kafka_consumer_, topics_, tconf, errstr); if(!topic_){ ErrorL << "RdKafka create topic failed : " << errstr.c_str() << endl; } delete tconf; RdKafka::ErrorCode resp = kafka_consumer_->start(topic_, partition_, offset_); if (resp != RdKafka::ERR_NO_ERROR){ ErrorL << "failed to start consumer : " << RdKafka::err2str(resp).c_str() << endl; } #endif return true; } #ifdef KAFKA void kafka_consumer_client::consumer(RdKafka::Message *message, void *opt){ switch(message->err()){ case RdKafka::ERR__TIMED_OUT: break; case RdKafka::ERR_NO_ERROR: printf("%.*s\n", static_cast(message->len()), static_cast (message->payload())); last_offset_ = message->offset(); break; case RdKafka::ERR__PARTITION_EOF: ErrorL << "Reached the end of the queue, offset: " << last_offset_ << std::endl; break; case RdKafka::ERR__UNKNOWN_TOPIC: case RdKafka::ERR__UNKNOWN_PARTITION: ErrorL << "Consume failed: " << message->errstr() << std::endl; run_ = false; break; default: ErrorL << "Consume failed: " << message->errstr() << std::endl; run_ = false; break; } } #endif bool kafka_consumer_client::consume(int timeout_ms){ #ifdef KAFKA RdKafka::Message *msg = nullptr; while(run_){ msg = kafka_consumer_->consume(topic_, partition_, timeout_ms); consumer(msg, nullptr); kafka_consumer_->poll(0); delete msg; } kafka_consumer_->stop(topic_, partition_); if(topic_){ delete topic_; topic_ = nullptr; } if(kafka_consumer_){ delete kafka_consumer_; kafka_consumer_ = nullptr; } /*销毁kafka实例*/ RdKafka::wait_destroyed(5000); #endif return true; } void kafka_consumer_client::ConsumeData(std::string& data, int timeout_ms){ #ifdef KAFKA RdKafka::Message *msg = nullptr; msg = kafka_consumer_->consume(topic_, partition_, timeout_ms); switch(msg->err()){ case RdKafka::ERR__TIMED_OUT: break; case RdKafka::ERR_NO_ERROR: if(static_cast(msg->len()) > 0) data = static_cast(msg->payload()); last_offset_ = msg->offset(); break; case RdKafka::ERR__PARTITION_EOF: #ifdef DEBUG // WarnL << " Reached the end of the queue, offset: " << last_offset_ << std::endl; #endif break; case RdKafka::ERR__UNKNOWN_TOPIC: break; case RdKafka::ERR__UNKNOWN_PARTITION: ErrorL << "Consume failed: " << msg->errstr() << std::endl; run_ = false; break; default: ErrorL << "Consume failed: " << msg->errstr() << std::endl; run_ = false; break; } //kafka_consumer_->poll(0); delete msg; #endif }