|
@@ -35,31 +35,31 @@ bool kafka_consumer_client::initClient(){
|
|
|
RdKafka::Conf *conf = nullptr;
|
|
|
conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
|
|
|
if(!conf){
|
|
|
- ErrorL << "RdKafka create global conf failed" << endl;
|
|
|
+ ErrorL << "RdKafka create global conf failed" << std::endl;
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
std::string errstr;
|
|
|
/*设置broker list*/
|
|
|
if (conf->set("bootstrap.servers", brokers_, errstr) != RdKafka::Conf::CONF_OK){
|
|
|
- ErrorL << "RdKafka conf set brokerlist failed :" << errstr.c_str() << endl;
|
|
|
+ ErrorL << "RdKafka conf set brokerlist failed :" << errstr.c_str() << std::endl;
|
|
|
}
|
|
|
|
|
|
/*设置consumer group*/
|
|
|
if (conf->set("group.id", groupid_, errstr) != RdKafka::Conf::CONF_OK){
|
|
|
- ErrorL << "RdKafka conf set group.id failed : " << errstr.c_str() << endl;
|
|
|
+ ErrorL << "RdKafka conf set group.id failed : " << errstr.c_str() << std::endl;
|
|
|
}
|
|
|
|
|
|
std::string strfetch_num = "10240000";
|
|
|
/*每次从单个分区中拉取消息的最大尺寸*/
|
|
|
if(conf->set("max.partition.fetch.bytes", strfetch_num, errstr) != RdKafka::Conf::CONF_OK){
|
|
|
- ErrorL << "RdKafka conf set max.partition failed : " << errstr.c_str() << endl;
|
|
|
+ ErrorL << "RdKafka conf set max.partition failed : " << errstr.c_str() << std::endl;
|
|
|
}
|
|
|
|
|
|
/*创建kafka consumer实例*/
|
|
|
kafka_consumer_ = RdKafka::Consumer::create(conf, errstr);
|
|
|
if(!kafka_consumer_){
|
|
|
- ErrorL << "failed to ceate consumer" << endl;
|
|
|
+ ErrorL << "failed to ceate consumer" << std::endl;
|
|
|
}
|
|
|
delete conf;
|
|
|
|
|
@@ -67,7 +67,7 @@ bool kafka_consumer_client::initClient(){
|
|
|
/*创建kafka topic的配置*/
|
|
|
tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
|
|
|
if(!tconf){
|
|
|
- ErrorL << "RdKafka create topic conf failed" << endl;
|
|
|
+ ErrorL << "RdKafka create topic conf failed" << std::endl;
|
|
|
return false;
|
|
|
}
|
|
|
|
|
@@ -79,18 +79,18 @@ bool kafka_consumer_client::initClient(){
|
|
|
消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的
|
|
|
开始位置消费所有消息.*/
|
|
|
if(tconf->set("auto.offset.reset", "largest", errstr) != RdKafka::Conf::CONF_OK){
|
|
|
- ErrorL << "RdKafka conf set auto.offset.reset failed : " << errstr.c_str() << endl;
|
|
|
+ ErrorL << "RdKafka conf set auto.offset.reset failed : " << errstr.c_str() << std::endl;
|
|
|
}
|
|
|
|
|
|
topic_ = RdKafka::Topic::create(kafka_consumer_, topics_, tconf, errstr);
|
|
|
if(!topic_){
|
|
|
- ErrorL << "RdKafka create topic failed : " << errstr.c_str() << endl;
|
|
|
+ ErrorL << "RdKafka create topic failed : " << errstr.c_str() << std::endl;
|
|
|
}
|
|
|
delete tconf;
|
|
|
|
|
|
RdKafka::ErrorCode resp = kafka_consumer_->start(topic_, partition_, offset_);
|
|
|
if (resp != RdKafka::ERR_NO_ERROR){
|
|
|
- ErrorL << "failed to start consumer : " << RdKafka::err2str(resp).c_str() << endl;
|
|
|
+ ErrorL << "failed to start consumer : " << RdKafka::err2str(resp).c_str() << std::endl;
|
|
|
}
|
|
|
|
|
|
return true;
|