/************************************************************************* * Copyright (C) [2021] by Cambricon, Inc. All rights reserved * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. *************************************************************************/ #include #include #include "kafka_client.h" #include "rapidjson/prettywriter.h" #include "cnstream_logging.hpp" #include "kafka_handler.hpp" #include "cnstream_frame_va.hpp" #include "opencv2/highgui/highgui.hpp" #include "opencv2/imgproc/imgproc.hpp" #include "Base64.h" class DefaultKafkaHandler : public cnstream::KafkaHandler { public: ~DefaultKafkaHandler() {} int UpdateFrame(const cnstream::CNFrameInfoPtr& data) override; DECLARE_REFLEX_OBJECT_EX(DefaultKafkaHandler, cnstream::KafkaHandler) private: rapidjson::StringBuffer buffer_; rapidjson::PrettyWriter writer_; }; IMPLEMENT_REFLEX_OBJECT_EX(DefaultKafkaHandler, cnstream::KafkaHandler) int DefaultKafkaHandler::UpdateFrame(const std::shared_ptr& data) { buffer_.Clear(); writer_.Reset(buffer_); std::string json_str; // Generate stream ID to JSON string. std::string stream_id = data->stream_id; writer_.StartObject(); writer_.String("StreamName"); #if RAPIDJSON_HAS_STDSTRING writer_.String(stream_id); #else writer_.String(stream_id.c_str(), static_cast(stream_id.length())); #endif // Generate frame count to JSON string if exist. if (data->collection.HasValue(cnstream::kCNDataFrameTag)) { auto dataframe = data->collection.Get(cnstream::kCNDataFrameTag); auto frame_count = dataframe->frame_id; writer_.String("FrameCount"); writer_.Uint64(frame_count); } cnstream::CNDataFramePtr frame = data->collection.Get(cnstream::kCNDataFrameTag); if (frame->width < 0 || frame->height < 0) { LOGE(Kafka) << "Kafka module processed illegal frame: width or height may < 0."; return -1; } writer_.Key("width"); writer_.Int(frame->width); writer_.Key("height"); writer_.Int(frame->height); int num = 0; // If there are inference objects, stringify them to JSON strings. if (data->collection.HasValue(cnstream::kCNInferObjsTag)) { writer_.String("Objects"); writer_.StartArray(); auto objs_holder = data->collection.Get(cnstream::kCNInferObjsTag); for (auto infer_object : objs_holder->objs_) { writer_.StartObject(); writer_.String("Label"); #if RAPIDJSON_HAS_STDSTRING writer_.String(infer_object->id); #else writer_.String(infer_object->id.c_str(), static_cast(infer_object->id.length())); #endif writer_.String("Score"); writer_.Double(infer_object->score); writer_.String("BBox"); writer_.StartArray(); // x,y,w,h writer_.Double(infer_object->bbox.x); writer_.Double(infer_object->bbox.y); writer_.Double(infer_object->bbox.w); writer_.Double(infer_object->bbox.h); writer_.EndArray(); writer_.EndObject(); num++; } writer_.EndArray(); } if(num == 0) return 0; writer_.Key("videoPath"); writer_.String(data->videoPath.c_str()); writer_.Key("ImageBase64"); cv::Mat dst = frame->ImageBGR_NO_OSD(); cv::resize(dst, dst, cv::Size(960, 540)); std::vector buf; cv::imencode(".jpg",dst,buf); auto *enc_msg = reinterpret_cast(buf.data()); writer_.String(base64_encode(enc_msg, buf.size()).c_str()); writer_.EndObject(); json_str = buffer_.GetString(); if (json_str.length() <= 2) { LOGW(DEFAULTKAFKAHANDLER) << "Produce Kafka message failed!"; return 0; } // Produce Kafka Message if (!Produce(json_str)) { LOGE(DEFAULTKAFKAHANDLER) << "Produce Kafka message failed!"; return -1; } return 0; }