kafka_comsumer.h 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. #ifndef __KAFKA_COMMSUMER_H_
  2. #define __KAFKA_COMMSUMER_H_
  3. #include <vector>
  4. #include <string>
  5. #include <memory>
  6. #include <getopt.h>
  7. #include <csignal>
  8. #include <iostream>
  9. #include "rdkafkacpp.h"
  10. #include "Util/util.h"
  11. #include "Util/logger.h"
  12. using namespace toolkit;
  13. class kafka_consumer_client{
  14. public:
  15. kafka_consumer_client(const std::string& brokers, const std::string& topics, std::string groupid, int64_t offset=-1);
  16. //kafka_consumer_client();
  17. virtual ~kafka_consumer_client();
  18. bool initClient();
  19. bool consume(int timeout_ms);
  20. void finalize();
  21. void ConsumeData(std::string& data, int timeout_ms);
  22. private:
  23. void consumer(RdKafka::Message *msg, void *opt);
  24. std::string brokers_;
  25. std::string topics_;
  26. std::string groupid_;
  27. int64_t last_offset_ = 0;
  28. RdKafka::Consumer *kafka_consumer_ = nullptr;
  29. RdKafka::Topic *topic_ = nullptr;
  30. int64_t offset_ = RdKafka::Topic::OFFSET_BEGINNING;
  31. int32_t partition_ = 0;
  32. bool alive = true;
  33. };
  34. #endif