123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- /*************************************************************************
- * Copyright (C) [2021] by Cambricon, Inc. All rights reserved
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
- * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
- * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- * THE SOFTWARE.
- *************************************************************************/
- #include <memory>
- #include <string>
- #include "kafka_client.h"
- #include "rapidjson/prettywriter.h"
- #include "cnstream_logging.hpp"
- #include "kafka_handler.hpp"
- #include "cnstream_frame_va.hpp"
- #include "opencv2/highgui/highgui.hpp"
- #include "opencv2/imgproc/imgproc.hpp"
- #include "Base64.h"
- class DefaultKafkaHandler : public cnstream::KafkaHandler {
- public:
- ~DefaultKafkaHandler() {}
- int UpdateFrame(const cnstream::CNFrameInfoPtr& data) override;
- DECLARE_REFLEX_OBJECT_EX(DefaultKafkaHandler, cnstream::KafkaHandler)
- private:
- rapidjson::StringBuffer buffer_;
- rapidjson::PrettyWriter<rapidjson::StringBuffer> writer_;
- };
- IMPLEMENT_REFLEX_OBJECT_EX(DefaultKafkaHandler, cnstream::KafkaHandler)
- int DefaultKafkaHandler::UpdateFrame(const std::shared_ptr<cnstream::CNFrameInfo>& data) {
- buffer_.Clear();
- writer_.Reset(buffer_);
- std::string json_str;
- // Generate stream ID to JSON string.
- std::string stream_id = data->stream_id;
- writer_.StartObject();
- writer_.String("StreamName");
- #if RAPIDJSON_HAS_STDSTRING
- writer_.String(stream_id);
- #else
- writer_.String(stream_id.c_str(), static_cast<rapidjson::SizeType>(stream_id.length()));
- #endif
- // Generate frame count to JSON string if exist.
- if (data->collection.HasValue(cnstream::kCNDataFrameTag)) {
- auto dataframe = data->collection.Get<cnstream::CNDataFramePtr>(cnstream::kCNDataFrameTag);
- auto frame_count = dataframe->frame_id;
- writer_.String("FrameCount");
- writer_.Uint64(frame_count);
- }
-
- cnstream::CNDataFramePtr frame = data->collection.Get<cnstream::CNDataFramePtr>(cnstream::kCNDataFrameTag);
-
- if (frame->width < 0 || frame->height < 0) {
- LOGE(Kafka) << "Kafka module processed illegal frame: width or height may < 0.";
- return -1;
- }
- writer_.Key("width");
- writer_.Int(frame->width);
- writer_.Key("height");
- writer_.Int(frame->height);
- int num = 0;
- // If there are inference objects, stringify them to JSON strings.
- if (data->collection.HasValue(cnstream::kCNInferObjsTag)) {
- writer_.String("Objects");
- writer_.StartArray();
-
- auto objs_holder = data->collection.Get<cnstream::CNInferObjsPtr>(cnstream::kCNInferObjsTag);
- for (auto infer_object : objs_holder->objs_) {
- writer_.StartObject();
- writer_.String("Label");
- #if RAPIDJSON_HAS_STDSTRING
- writer_.String(infer_object->id);
- #else
- writer_.String(infer_object->id.c_str(), static_cast<rapidjson::SizeType>(infer_object->id.length()));
- #endif
- writer_.String("Score");
- writer_.Double(infer_object->score);
- writer_.String("BBox");
- writer_.StartArray();
- // x,y,w,h
- writer_.Double(infer_object->bbox.x);
- writer_.Double(infer_object->bbox.y);
- writer_.Double(infer_object->bbox.w);
- writer_.Double(infer_object->bbox.h);
- writer_.EndArray();
-
- writer_.EndObject();
- num++;
- }
- writer_.EndArray();
- }
- if(num == 0) return 0;
- writer_.Key("videoPath");
- writer_.String(data->videoPath.c_str());
- writer_.Key("ImageBase64");
- cv::Mat dst = frame->ImageBGR_NO_OSD();
- cv::resize(dst, dst, cv::Size(960, 540));
- std::vector<uint8_t> buf;
- cv::imencode(".jpg",dst,buf);
- auto *enc_msg = reinterpret_cast<unsigned char*>(buf.data());
- writer_.String(base64_encode(enc_msg, buf.size()).c_str());
- writer_.EndObject();
- json_str = buffer_.GetString();
- if (json_str.length() <= 2) {
- LOGW(DEFAULTKAFKAHANDLER) << "Produce Kafka message failed!";
- return 0;
- }
- // Produce Kafka Message
- if (!Produce(json_str)) {
- LOGE(DEFAULTKAFKAHANDLER) << "Produce Kafka message failed!";
- return -1;
- }
- return 0;
- }
|