#ifndef __KAFKA_COMMSUMER_H_ #define __KAFKA_COMMSUMER_H_ #include #include #include #include #include #include #include "rdkafkacpp.h" #include "Util/util.h" #include "Util/logger.h" using namespace toolkit; class kafka_consumer_client{ public: kafka_consumer_client(const std::string& brokers, const std::string& topics, std::string groupid, int64_t offset=-1); //kafka_consumer_client(); virtual ~kafka_consumer_client(); bool initClient(); bool consume(int timeout_ms); void finalize(); void ConsumeData(std::string& data, int timeout_ms); private: void consumer(RdKafka::Message *msg, void *opt); std::string brokers_; std::string topics_; std::string groupid_; int64_t last_offset_ = 0; RdKafka::Consumer *kafka_consumer_ = nullptr; RdKafka::Topic *topic_ = nullptr; int64_t offset_ = RdKafka::Topic::OFFSET_BEGINNING; int32_t partition_ = 0; bool alive = true; }; #endif