kafka_comsumer.cpp 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. #include "kafka_comsumer.h"
  2. bool run_ = true;
  3. static void sigterm (int sig) {
  4. run_ = false;
  5. }
  6. kafka_consumer_client::kafka_consumer_client(const std::string& brokers, const std::string& topics, std::string groupid, int64_t offset)
  7. :brokers_(brokers),
  8. topics_(topics),
  9. groupid_(groupid),
  10. offset_(offset){
  11. }
  12. //kafka_consumer_client::kafka_consumer_client(){}
  13. kafka_consumer_client::~kafka_consumer_client(){
  14. #ifdef KAFKA
  15. //kafka_consumer_->poll(0);
  16. kafka_consumer_->stop(topic_, partition_);
  17. if(topic_){
  18. delete topic_;
  19. topic_ = nullptr;
  20. }
  21. if(kafka_consumer_){
  22. delete kafka_consumer_;
  23. kafka_consumer_ = nullptr;
  24. }
  25. /*销毁kafka实例*/
  26. RdKafka::wait_destroyed(5000);
  27. #endif
  28. }
  29. bool kafka_consumer_client::initClient(){
  30. #ifdef KAFKA
  31. RdKafka::Conf *conf = nullptr;
  32. conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
  33. if(!conf){
  34. ErrorL << "RdKafka create global conf failed" << endl;
  35. return false;
  36. }
  37. std::string errstr;
  38. /*设置broker list*/
  39. if (conf->set("bootstrap.servers", brokers_, errstr) != RdKafka::Conf::CONF_OK){
  40. ErrorL << "RdKafka conf set brokerlist failed :" << errstr.c_str() << endl;
  41. }
  42. /*设置consumer group*/
  43. if (conf->set("group.id", groupid_, errstr) != RdKafka::Conf::CONF_OK){
  44. ErrorL << "RdKafka conf set group.id failed : " << errstr.c_str() << endl;
  45. }
  46. std::string strfetch_num = "10240000";
  47. /*每次从单个分区中拉取消息的最大尺寸*/
  48. if(conf->set("max.partition.fetch.bytes", strfetch_num, errstr) != RdKafka::Conf::CONF_OK){
  49. ErrorL << "RdKafka conf set max.partition failed : " << errstr.c_str() << endl;
  50. }
  51. /*创建kafka consumer实例*/
  52. kafka_consumer_ = RdKafka::Consumer::create(conf, errstr);
  53. if(!kafka_consumer_){
  54. ErrorL << "failed to ceate consumer" << endl;
  55. }
  56. delete conf;
  57. RdKafka::Conf *tconf = nullptr;
  58. /*创建kafka topic的配置*/
  59. tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
  60. if(!tconf){
  61. ErrorL << "RdKafka create topic conf failed" << endl;
  62. return false;
  63. }
  64. /*kafka + zookeeper,当消息被消费时,会想zk提交当前groupId的consumer消费的offset信息,
  65. 当consumer再次启动将会从此offset开始继续消费.在consumter端配置文件中(或者是
  66. ConsumerConfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset),
  67. 有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupId下的消费者,
  68. 在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始
  69. 消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的
  70. 开始位置消费所有消息.*/
  71. if(tconf->set("auto.offset.reset", "largest", errstr) != RdKafka::Conf::CONF_OK){
  72. ErrorL << "RdKafka conf set auto.offset.reset failed : " << errstr.c_str() << endl;
  73. }
  74. topic_ = RdKafka::Topic::create(kafka_consumer_, topics_, tconf, errstr);
  75. if(!topic_){
  76. ErrorL << "RdKafka create topic failed : " << errstr.c_str() << endl;
  77. }
  78. delete tconf;
  79. RdKafka::ErrorCode resp = kafka_consumer_->start(topic_, partition_, offset_);
  80. if (resp != RdKafka::ERR_NO_ERROR){
  81. ErrorL << "failed to start consumer : " << RdKafka::err2str(resp).c_str() << endl;
  82. }
  83. #endif
  84. return true;
  85. }
  86. #ifdef KAFKA
  87. void kafka_consumer_client::consumer(RdKafka::Message *message, void *opt){
  88. switch(message->err()){
  89. case RdKafka::ERR__TIMED_OUT:
  90. break;
  91. case RdKafka::ERR_NO_ERROR:
  92. printf("%.*s\n", static_cast<int>(message->len()),
  93. static_cast <const char*>(message->payload()));
  94. last_offset_ = message->offset();
  95. break;
  96. case RdKafka::ERR__PARTITION_EOF:
  97. ErrorL << "Reached the end of the queue, offset: " << last_offset_ << std::endl;
  98. break;
  99. case RdKafka::ERR__UNKNOWN_TOPIC:
  100. case RdKafka::ERR__UNKNOWN_PARTITION:
  101. ErrorL << "Consume failed: " << message->errstr() << std::endl;
  102. run_ = false;
  103. break;
  104. default:
  105. ErrorL << "Consume failed: " << message->errstr() << std::endl;
  106. run_ = false;
  107. break;
  108. }
  109. }
  110. #endif
  111. bool kafka_consumer_client::consume(int timeout_ms){
  112. #ifdef KAFKA
  113. RdKafka::Message *msg = nullptr;
  114. while(run_){
  115. msg = kafka_consumer_->consume(topic_, partition_, timeout_ms);
  116. consumer(msg, nullptr);
  117. kafka_consumer_->poll(0);
  118. delete msg;
  119. }
  120. kafka_consumer_->stop(topic_, partition_);
  121. if(topic_){
  122. delete topic_;
  123. topic_ = nullptr;
  124. }
  125. if(kafka_consumer_){
  126. delete kafka_consumer_;
  127. kafka_consumer_ = nullptr;
  128. }
  129. /*销毁kafka实例*/
  130. RdKafka::wait_destroyed(5000);
  131. #endif
  132. return true;
  133. }
  134. void kafka_consumer_client::ConsumeData(std::string& data, int timeout_ms){
  135. #ifdef KAFKA
  136. RdKafka::Message *msg = nullptr;
  137. msg = kafka_consumer_->consume(topic_, partition_, timeout_ms);
  138. switch(msg->err()){
  139. case RdKafka::ERR__TIMED_OUT:
  140. break;
  141. case RdKafka::ERR_NO_ERROR:
  142. if(static_cast<int>(msg->len()) > 0) data = static_cast<const char *>(msg->payload());
  143. last_offset_ = msg->offset();
  144. break;
  145. case RdKafka::ERR__PARTITION_EOF:
  146. #ifdef DEBUG
  147. // WarnL << " Reached the end of the queue, offset: " << last_offset_ << std::endl;
  148. #endif
  149. break;
  150. case RdKafka::ERR__UNKNOWN_TOPIC:
  151. break;
  152. case RdKafka::ERR__UNKNOWN_PARTITION:
  153. ErrorL << "Consume failed: " << msg->errstr() << std::endl;
  154. run_ = false;
  155. break;
  156. default:
  157. ErrorL << "Consume failed: " << msg->errstr() << std::endl;
  158. run_ = false;
  159. break;
  160. }
  161. //kafka_consumer_->poll(0);
  162. delete msg;
  163. #endif
  164. }