kafkahandler_default.cpp 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. /*************************************************************************
  2. * Copyright (C) [2021] by Cambricon, Inc. All rights reserved
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * The above copyright notice and this permission notice shall be included in
  11. * all copies or substantial portions of the Software.
  12. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
  13. * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  14. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  15. * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  16. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  17. * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  18. * THE SOFTWARE.
  19. *************************************************************************/
  20. #include <memory>
  21. #include <string>
  22. #include "kafka_client.h"
  23. #include "rapidjson/prettywriter.h"
  24. #include "cnstream_logging.hpp"
  25. #include "kafka_handler.hpp"
  26. #include "cnstream_frame_va.hpp"
  27. #include "opencv2/highgui/highgui.hpp"
  28. #include "opencv2/imgproc/imgproc.hpp"
  29. #include "Base64.h"
  30. class DefaultKafkaHandler : public cnstream::KafkaHandler {
  31. public:
  32. ~DefaultKafkaHandler() {}
  33. int UpdateFrame(const cnstream::CNFrameInfoPtr& data) override;
  34. DECLARE_REFLEX_OBJECT_EX(DefaultKafkaHandler, cnstream::KafkaHandler)
  35. private:
  36. rapidjson::StringBuffer buffer_;
  37. rapidjson::PrettyWriter<rapidjson::StringBuffer> writer_;
  38. };
  39. IMPLEMENT_REFLEX_OBJECT_EX(DefaultKafkaHandler, cnstream::KafkaHandler)
  40. int DefaultKafkaHandler::UpdateFrame(const std::shared_ptr<cnstream::CNFrameInfo>& data) {
  41. buffer_.Clear();
  42. writer_.Reset(buffer_);
  43. std::string json_str;
  44. // Generate stream ID to JSON string.
  45. std::string stream_id = data->stream_id;
  46. writer_.StartObject();
  47. writer_.String("StreamName");
  48. #if RAPIDJSON_HAS_STDSTRING
  49. writer_.String(stream_id);
  50. #else
  51. writer_.String(stream_id.c_str(), static_cast<rapidjson::SizeType>(stream_id.length()));
  52. #endif
  53. // Generate frame count to JSON string if exist.
  54. if (data->collection.HasValue(cnstream::kCNDataFrameTag)) {
  55. auto dataframe = data->collection.Get<cnstream::CNDataFramePtr>(cnstream::kCNDataFrameTag);
  56. auto frame_count = dataframe->frame_id;
  57. writer_.String("FrameCount");
  58. writer_.Uint64(frame_count);
  59. }
  60. cnstream::CNDataFramePtr frame = data->collection.Get<cnstream::CNDataFramePtr>(cnstream::kCNDataFrameTag);
  61. if (frame->width < 0 || frame->height < 0) {
  62. LOGE(Kafka) << "Kafka module processed illegal frame: width or height may < 0.";
  63. return -1;
  64. }
  65. writer_.Key("width");
  66. writer_.Int(frame->width);
  67. writer_.Key("height");
  68. writer_.Int(frame->height);
  69. int num = 0;
  70. // If there are inference objects, stringify them to JSON strings.
  71. if (data->collection.HasValue(cnstream::kCNInferObjsTag)) {
  72. writer_.String("Objects");
  73. writer_.StartArray();
  74. auto objs_holder = data->collection.Get<cnstream::CNInferObjsPtr>(cnstream::kCNInferObjsTag);
  75. for (auto infer_object : objs_holder->objs_) {
  76. writer_.StartObject();
  77. writer_.String("Label");
  78. #if RAPIDJSON_HAS_STDSTRING
  79. writer_.String(infer_object->id);
  80. #else
  81. writer_.String(infer_object->id.c_str(), static_cast<rapidjson::SizeType>(infer_object->id.length()));
  82. #endif
  83. writer_.String("Score");
  84. writer_.Double(infer_object->score);
  85. writer_.String("BBox");
  86. writer_.StartArray();
  87. // x,y,w,h
  88. writer_.Double(infer_object->bbox.x);
  89. writer_.Double(infer_object->bbox.y);
  90. writer_.Double(infer_object->bbox.w);
  91. writer_.Double(infer_object->bbox.h);
  92. writer_.EndArray();
  93. writer_.EndObject();
  94. num++;
  95. }
  96. writer_.EndArray();
  97. }
  98. if(num == 0) return 0;
  99. writer_.Key("videoPath");
  100. writer_.String(data->videoPath.c_str());
  101. writer_.Key("ImageBase64");
  102. cv::Mat dst = frame->ImageBGR_NO_OSD();
  103. cv::resize(dst, dst, cv::Size(960, 540));
  104. std::vector<uint8_t> buf;
  105. cv::imencode(".jpg",dst,buf);
  106. auto *enc_msg = reinterpret_cast<unsigned char*>(buf.data());
  107. writer_.String(base64_encode(enc_msg, buf.size()).c_str());
  108. writer_.EndObject();
  109. json_str = buffer_.GetString();
  110. if (json_str.length() <= 2) {
  111. LOGW(DEFAULTKAFKAHANDLER) << "Produce Kafka message failed!";
  112. return 0;
  113. }
  114. // Produce Kafka Message
  115. if (!Produce(json_str)) {
  116. LOGE(DEFAULTKAFKAHANDLER) << "Produce Kafka message failed!";
  117. return -1;
  118. }
  119. return 0;
  120. }