12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667 |
- #ifndef __KAFKA_COMMSUMER_H_
- #define __KAFKA_COMMSUMER_H_
- #include <vector>
- #include <string>
- #include <memory>
- #include <getopt.h>
- #include <csignal>
- #include <iostream>
- #include "rdkafkacpp.h"
- #include "Util/util.h"
- #include "Util/logger.h"
-
- using namespace toolkit;
- class kafka_consumer_client{
- public:
- kafka_consumer_client(const std::string& brokers, const std::string& topics, std::string groupid, int64_t offset=-1);
- //kafka_consumer_client();
- virtual ~kafka_consumer_client();
-
- bool initClient();
- bool consume(int timeout_ms);
- void finalize();
- void ConsumeData(std::string& data, int timeout_ms);
- private:
- void consumer(RdKafka::Message *msg, void *opt);
-
- std::string brokers_;
- std::string topics_;
- std::string groupid_;
-
- int64_t last_offset_ = 0;
- RdKafka::Consumer *kafka_consumer_ = nullptr;
- RdKafka::Topic *topic_ = nullptr;
- int64_t offset_ = RdKafka::Topic::OFFSET_BEGINNING;
- int32_t partition_ = 0;
- bool alive = true;
- };
- #endif
|