123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- #include "kafka_comsumer.h"
- bool run_ = true;
-
- static void sigterm (int sig) {
- run_ = false;
- }
-
- kafka_consumer_client::kafka_consumer_client(const std::string& brokers, const std::string& topics, std::string groupid, int64_t offset)
- :brokers_(brokers),
- topics_(topics),
- groupid_(groupid),
- offset_(offset){
- }
-
- //kafka_consumer_client::kafka_consumer_client(){}
-
- kafka_consumer_client::~kafka_consumer_client(){
- #ifdef KAFKA
- //kafka_consumer_->poll(0);
- kafka_consumer_->stop(topic_, partition_);
- if(topic_){
- delete topic_;
- topic_ = nullptr;
- }
- if(kafka_consumer_){
- delete kafka_consumer_;
- kafka_consumer_ = nullptr;
- }
-
- /*销毁kafka实例*/
- RdKafka::wait_destroyed(5000);
- #endif
- }
-
- bool kafka_consumer_client::initClient(){
- #ifdef KAFKA
- RdKafka::Conf *conf = nullptr;
- conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
- if(!conf){
- ErrorL << "RdKafka create global conf failed" << endl;
- return false;
- }
-
- std::string errstr;
- /*设置broker list*/
- if (conf->set("bootstrap.servers", brokers_, errstr) != RdKafka::Conf::CONF_OK){
- ErrorL << "RdKafka conf set brokerlist failed :" << errstr.c_str() << endl;
- }
-
- /*设置consumer group*/
- if (conf->set("group.id", groupid_, errstr) != RdKafka::Conf::CONF_OK){
- ErrorL << "RdKafka conf set group.id failed : " << errstr.c_str() << endl;
- }
-
- std::string strfetch_num = "10240000";
- /*每次从单个分区中拉取消息的最大尺寸*/
- if(conf->set("max.partition.fetch.bytes", strfetch_num, errstr) != RdKafka::Conf::CONF_OK){
- ErrorL << "RdKafka conf set max.partition failed : " << errstr.c_str() << endl;
- }
-
- /*创建kafka consumer实例*/
- kafka_consumer_ = RdKafka::Consumer::create(conf, errstr);
- if(!kafka_consumer_){
- ErrorL << "failed to ceate consumer" << endl;
- }
- delete conf;
-
- RdKafka::Conf *tconf = nullptr;
- /*创建kafka topic的配置*/
- tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
- if(!tconf){
- ErrorL << "RdKafka create topic conf failed" << endl;
- return false;
- }
-
- /*kafka + zookeeper,当消息被消费时,会想zk提交当前groupId的consumer消费的offset信息,
- 当consumer再次启动将会从此offset开始继续消费.在consumter端配置文件中(或者是
- ConsumerConfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset),
- 有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupId下的消费者,
- 在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始
- 消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的
- 开始位置消费所有消息.*/
- if(tconf->set("auto.offset.reset", "largest", errstr) != RdKafka::Conf::CONF_OK){
- ErrorL << "RdKafka conf set auto.offset.reset failed : " << errstr.c_str() << endl;
- }
-
- topic_ = RdKafka::Topic::create(kafka_consumer_, topics_, tconf, errstr);
- if(!topic_){
- ErrorL << "RdKafka create topic failed : " << errstr.c_str() << endl;
- }
- delete tconf;
-
- RdKafka::ErrorCode resp = kafka_consumer_->start(topic_, partition_, offset_);
- if (resp != RdKafka::ERR_NO_ERROR){
- ErrorL << "failed to start consumer : " << RdKafka::err2str(resp).c_str() << endl;
- }
-
- #endif
- return true;
- }
- #ifdef KAFKA
- void kafka_consumer_client::consumer(RdKafka::Message *message, void *opt){
-
- switch(message->err()){
- case RdKafka::ERR__TIMED_OUT:
- break;
- case RdKafka::ERR_NO_ERROR:
- printf("%.*s\n", static_cast<int>(message->len()),
- static_cast <const char*>(message->payload()));
- last_offset_ = message->offset();
- break;
- case RdKafka::ERR__PARTITION_EOF:
- ErrorL << "Reached the end of the queue, offset: " << last_offset_ << std::endl;
- break;
- case RdKafka::ERR__UNKNOWN_TOPIC:
- case RdKafka::ERR__UNKNOWN_PARTITION:
- ErrorL << "Consume failed: " << message->errstr() << std::endl;
- run_ = false;
- break;
- default:
- ErrorL << "Consume failed: " << message->errstr() << std::endl;
- run_ = false;
- break;
- }
-
- }
- #endif
- bool kafka_consumer_client::consume(int timeout_ms){
- #ifdef KAFKA
- RdKafka::Message *msg = nullptr;
-
- while(run_){
- msg = kafka_consumer_->consume(topic_, partition_, timeout_ms);
- consumer(msg, nullptr);
- kafka_consumer_->poll(0);
- delete msg;
- }
-
- kafka_consumer_->stop(topic_, partition_);
- if(topic_){
- delete topic_;
- topic_ = nullptr;
- }
- if(kafka_consumer_){
- delete kafka_consumer_;
- kafka_consumer_ = nullptr;
- }
-
- /*销毁kafka实例*/
- RdKafka::wait_destroyed(5000);
- #endif
- return true;
- }
- void kafka_consumer_client::ConsumeData(std::string& data, int timeout_ms){
- #ifdef KAFKA
- RdKafka::Message *msg = nullptr;
- msg = kafka_consumer_->consume(topic_, partition_, timeout_ms);
- switch(msg->err()){
- case RdKafka::ERR__TIMED_OUT:
- break;
- case RdKafka::ERR_NO_ERROR:
- if(static_cast<int>(msg->len()) > 0) data = static_cast<const char *>(msg->payload());
- last_offset_ = msg->offset();
- break;
- case RdKafka::ERR__PARTITION_EOF:
- #ifdef DEBUG
- // WarnL << " Reached the end of the queue, offset: " << last_offset_ << std::endl;
- #endif
- break;
- case RdKafka::ERR__UNKNOWN_TOPIC:
- break;
- case RdKafka::ERR__UNKNOWN_PARTITION:
- ErrorL << "Consume failed: " << msg->errstr() << std::endl;
- run_ = false;
- break;
- default:
- ErrorL << "Consume failed: " << msg->errstr() << std::endl;
- run_ = false;
- break;
- }
- //kafka_consumer_->poll(0);
- delete msg;
- #endif
- }
|