#include "inference.h" #define MAX_DISPLAY_LEN 64 #define PGIE_CLASS_ID_VEHICLE 0 #define PGIE_CLASS_ID_PERSON 2 /* By default, OSD process-mode is set to CPU_MODE. To change mode, set as: * 1: GPU mode (for Tesla only) * 2: HW mode (For Jetson only) */ #define OSD_PROCESS_MODE 0 /* By default, OSD will not display text. To display text, change this to 1 */ #define OSD_DISPLAY_TEXT 1 /* The muxer output resolution must be set if the input streams will be of * different resolution. The muxer will scale all the input frames to this * resolution. */ #define MUXER_OUTPUT_WIDTH 1920 #define MUXER_OUTPUT_HEIGHT 1080 /* Muxer batch formation timeout, for e.g. 40 millisec. Should ideally be set * based on the fastest source's framerate. */ #define MUXER_BATCH_TIMEOUT_USEC 5000 #define TILED_OUTPUT_WIDTH 1280 #define TILED_OUTPUT_HEIGHT 720 /* NVIDIA Decoder source pad memory feature. This feature signifies that source * pads having this capability will push GstBuffers containing cuda buffers. */ #define GST_CAPS_FEATURES_NVMM "memory:NVMM" #define MAX_NUM_SOURCES 30 gint frame_number = 0; gint g_num_sources = 0; gint g_source_id_list[MAX_NUM_SOURCES]; gboolean g_eos_list[MAX_NUM_SOURCES]; gboolean g_source_enabled[MAX_NUM_SOURCES]; GMutex eos_lock; GstElement *g_streammux = NULL; namespace MIVA{ std::shared_ptr infer = NULL; ThreadPool pool(3,ThreadPool::PRIORITY_HIGHEST, false); std::shared_ptr Inference::CreateNew() { if(infer == NULL) infer = std::make_shared(); return infer; } Inference::Inference() { } Inference::~Inference() { Destory(); } // Init 初始化 int32_t Inference::Init(vector DataList) { // init this->loop = g_main_loop_new (NULL, FALSE); // 创建管道 this->pipeline = gst_pipeline_new("dstest3-pipeline"); // 创建批处理器 this->streammux = gst_element_factory_make ("nvstreammux", "stream-muxer"); g_streammux = this->streammux; if(this->pipeline == NULL || this->streammux == NULL){ ErrorL << "One element could not be created. Exiting."; return ERR; } gst_bin_add (GST_BIN (this->pipeline), streammux); this->m_DataList = DataList; // 创建数据源 std::vector::iterator iter; g_num_sources = 0; for(iter = m_DataList.begin(); iter != m_DataList.end(); iter++){ GstElement *source_bin = create_uridecode_bin (g_num_sources, (gchar*)((*iter).uri).c_str()); if (!source_bin) { ErrorL << "Failed to create source bin. Exiting."; return ERR; } gst_bin_add(GST_BIN (this->pipeline), source_bin); iter->source_bin = source_bin; iter->Play = true; g_num_sources++; } /* Use nvinfer to infer on batched frame. */ this->pgie = gst_element_factory_make("nvinfer", "primary-nvinference-engine"); /* Add queue elements between every two elements */ this->queue1 = gst_element_factory_make ("queue", "queue1"); this->queue2 = gst_element_factory_make ("queue", "queue2"); this->queue3 = gst_element_factory_make ("queue", "queue3"); this->queue4 = gst_element_factory_make ("queue", "queue4"); this->queue5 = gst_element_factory_make ("queue", "queue5"); /* Use nvtiler to composite the batched frames into a 2D tiled array based * on the source of the frames. */ this->tiler = gst_element_factory_make ("nvmultistreamtiler", "nvtiler"); /* Use convertor to convert from NV12 to RGBA as required by nvosd */ this->nvvidconv = gst_element_factory_make ("nvvideoconvert", "nvvideo-converter"); this->nvosd = gst_element_factory_make ("nvdsosd", "nv-onscreendisplay"); #ifdef PLATFORM_TEGRA this->transform = gst_element_factory_make ("nvegltransform", "nvegl-transform"); #endif this->sink = gst_element_factory_make ("nveglglessink", "nvvideo-renderer"); if (!this->pgie || !this->tiler || !this->nvvidconv || !this->nvosd || !this->sink) { ErrorL << "One element could not be created. Exiting."; return -1; } #ifdef PLATFORM_TEGRA if(!this->transform) { ErrorL << "One tegra element could not be created. Exiting."; return -1; } #endif g_object_set(G_OBJECT(streammux), "batch-size", g_num_sources, NULL); g_object_set (G_OBJECT (streammux), "width", MUXER_OUTPUT_WIDTH, "height",MUXER_OUTPUT_HEIGHT, "batched-push-timeout", MUXER_BATCH_TIMEOUT_USEC, NULL); /* Configure the nvinfer element using the nvinfer config file. */ g_object_set (G_OBJECT (this->pgie), "config-file-path", "config_infer_primary_yoloV5.txt", NULL); /* Override the batch-size set in the config file with the number of sources. */ g_object_get (G_OBJECT (this->pgie), "batch-size", &(this->pgie_batch_size), NULL); if (this->pgie_batch_size != g_num_sources) { WarnL << "WARNING: Overriding infer-config batch-size:" << this->pgie_batch_size << "with number of sources ("<< g_num_sources << ")"; g_object_set (G_OBJECT (this->pgie), "batch-size", g_num_sources, NULL); } this->tiler_rows = (guint) sqrt (g_num_sources); this->tiler_columns = (guint) ceil (1.0 * g_num_sources / this->tiler_rows); /* we set the tiler properties here */ g_object_set (G_OBJECT (this->tiler), "rows", this->tiler_rows, "columns", this->tiler_columns, "width", TILED_OUTPUT_WIDTH, "height", TILED_OUTPUT_HEIGHT, NULL); g_object_set (G_OBJECT (this->nvosd), "process-mode", OSD_PROCESS_MODE, "display-text", OSD_DISPLAY_TEXT, NULL); g_object_set (G_OBJECT (this->sink), "qos", 0, NULL); this->bus = gst_pipeline_get_bus (GST_PIPELINE (this->pipeline)); this->bus_watch_id = gst_bus_add_watch (this->bus, bus_call, this->loop); gst_object_unref (this->bus); gst_bin_add_many (GST_BIN (this->pipeline), this->queue1, this->pgie, this->queue2, this->tiler, this->queue3, this->nvvidconv, this->queue4, this->nvosd, this->queue5, this->transform, this->sink, NULL); if (!gst_element_link_many (streammux, this->queue1, this->pgie, this->queue2, this->tiler, this->queue3, this->nvvidconv, this->queue4, this->nvosd, this->queue5, this->transform, this->sink, NULL)) { ErrorL << "Elements could not be linked. Exiting."; return -1; } this->tiler_src_pad = gst_element_get_static_pad(this->pgie, "src"); if (!this->tiler_src_pad) InfoL << "Unable to get src pad"; else gst_pad_add_probe (this->tiler_src_pad, GST_PAD_PROBE_TYPE_BUFFER, tiler_src_pad_buffer_probe, NULL, NULL); gst_object_unref (this->tiler_src_pad); return OK; } void Inference::ReadyTask() { InfoL << "Now ReadyTask"; gst_element_set_state(this->pipeline, GST_STATE_READY); g_main_loop_run(this->loop); } // 启动任务 void Inference::StartTask() { static int ret = 0; InfoL << "Now palying"; if(ret != 0){ this->RestartTask(); }else{ ret++; gst_element_set_state(this->pipeline, GST_STATE_PLAYING); } } // 暂停任务 void Inference::PauseTask() { InfoL << "Now Pause"; std::vector::iterator iter; for(iter = this->m_DataList.begin(); iter != this->m_DataList.end(); iter++){ if(iter->Play){ gst_element_set_state(iter->source_bin, GST_STATE_PAUSED); } } } void Inference::StopTask() { pool.async([&](){ int sourceId = 0; g_mutex_lock (&eos_lock); std::vector::iterator iter; for (iter = this->m_DataList.begin(); iter != this->m_DataList.end(); iter++){ if(iter->Play){ this->stop_release_source(sourceId); iter->Play = false; } sourceId++; } g_mutex_unlock (&eos_lock); NoticeCenter::Instance().emitEvent(NOTICE_RELEASE); }); pool.start(); } void Inference::RestartTask() { pool.async([&](){ int sourceId = 0; std::vector::iterator iter; for (iter = this->m_DataList.begin(); iter != this->m_DataList.end(); iter++){ if(iter->Play == false){ this->add_sources(sourceId, iter->uri); iter->Play = true; } sourceId++; } gst_element_set_state(this->pipeline, GST_STATE_PLAYING); }); pool.start(); } // 销毁对象 void Inference::Destory() { InfoL << "Returned, stopping playback"; gst_element_set_state(this->pipeline, GST_STATE_NULL); InfoL << "Deleting pipeline"; gst_object_unref(GST_OBJECT(this->pipeline)); g_source_remove(this->bus_watch_id); g_main_loop_unref(this->loop); infer = NULL; } GstPadProbeReturn Inference::tiler_src_pad_buffer_probe(GstPad * pad, GstPadProbeInfo * info, gpointer u_data) { //获取从管道中获取推理结果 GstBuffer *buf = (GstBuffer *) info->data; NvDsBatchMeta *batch_meta = gst_buffer_get_nvds_batch_meta (buf); //初始化要使用的数据结构 NvDsObjectMeta *obj_meta = NULL; //目标检测元数据类型变量 NvDsMetaList * l_frame = NULL; NvDsMetaList * l_obj = NULL; NvDsDisplayMeta *display_meta = NULL; for (l_frame = batch_meta->frame_meta_list; l_frame != NULL;l_frame = l_frame->next) //从批量中获取某一帧图 { NvDsFrameMeta *frame_meta = (NvDsFrameMeta *) (l_frame->data); int num = 0; for (l_obj = frame_meta->obj_meta_list; l_obj != NULL;l_obj = l_obj->next) { obj_meta = (NvDsObjectMeta *) (l_obj->data); if (obj_meta->class_id == 0) // Person { num++; } } //画左上角的统计信息 display_meta = nvds_acquire_display_meta_from_pool(batch_meta); NvOSD_TextParams *txt_params = &display_meta->text_params[0]; display_meta->num_labels = 1; txt_params->display_text = (char *)g_malloc0 (MAX_DISPLAY_LEN); snprintf(txt_params->display_text, MAX_DISPLAY_LEN, "Number of people: %d \n", num); // 推理广播 NoticeCenter::Instance().emitEvent(NOTICE_INFER,frame_meta->source_id, num); txt_params->x_offset = 30; txt_params->y_offset = 30; /* Font , font-color and font-size */ txt_params->font_params.font_name = (char *)"Serif"; txt_params->font_params.font_size = 10; txt_params->font_params.font_color.red = 1.0; txt_params->font_params.font_color.green = 1.0; txt_params->font_params.font_color.blue = 1.0; txt_params->font_params.font_color.alpha = 1.0; /* Text background color */ txt_params->set_bg_clr = 1; txt_params->text_bg_clr.red = 0.0; txt_params->text_bg_clr.green = 0.0; txt_params->text_bg_clr.blue = 0.0; txt_params->text_bg_clr.alpha = 1.0; //nvds_add_display_meta_to_frame(frame_meta, display_meta); } return GST_PAD_PROBE_OK; } gboolean Inference::bus_call (GstBus * bus, GstMessage * msg, gpointer data) { GMainLoop *loop = (GMainLoop *) data; switch (GST_MESSAGE_TYPE (msg)) { case GST_MESSAGE_EOS: InfoL << "End of stream"; g_main_loop_quit (loop); break; case GST_MESSAGE_WARNING: { gchar *debug; GError *error; gst_message_parse_warning (msg, &error, &debug); WarnL << "WARNING from element " << GST_OBJECT_NAME (msg->src) << ": " << error->message; g_free (debug); ErrorL << "Warning: " << error->message; g_error_free (error); break; } case GST_MESSAGE_ERROR: { gchar *debug; GError *error; gst_message_parse_error (msg, &error, &debug); ErrorL << "ERROR from element" << GST_OBJECT_NAME (msg->src) << ":" << error->message; if (debug) ErrorL << "Error details:" << debug; g_free (debug); g_error_free (error); g_main_loop_quit (loop); break; } #ifndef PLATFORM_TEGRA case GST_MESSAGE_ELEMENT: { if (gst_nvmessage_is_stream_eos (msg)) { guint stream_id; if (gst_nvmessage_parse_stream_eos (msg, &stream_id)) { InfoL << "Got EOS from stream " << stream_id; } } break; } #endif default: break; } return TRUE; } void Inference::decodebin_child_added (GstChildProxy * child_proxy, GObject * object, gchar * name, gpointer user_data) { InfoL << "Decodebin child added: " << name; if (g_strrstr (name, "decodebin") == name) { g_signal_connect (G_OBJECT (object), "child-added", G_CALLBACK (decodebin_child_added), user_data); } } void Inference::stop_release_source (gint source_id) { GstStateChangeReturn state_return; gchar pad_name[16]; GstPad *sinkpad = NULL; state_return = gst_element_set_state (m_DataList[source_id].source_bin, GST_STATE_NULL); switch (state_return) { case GST_STATE_CHANGE_SUCCESS: InfoL << "STATE CHANGE SUCCESS"; g_snprintf (pad_name, 15, "sink_%u", source_id); sinkpad = gst_element_get_static_pad (streammux, pad_name); gst_pad_send_event (sinkpad, gst_event_new_flush_stop (FALSE)); gst_element_release_request_pad (streammux, sinkpad); InfoL << "STATE CHANGE SUCCESS:" << source_id; gst_object_unref (sinkpad); gst_bin_remove (GST_BIN (this->pipeline), m_DataList[source_id].source_bin); break; case GST_STATE_CHANGE_FAILURE: ErrorL << "STATE CHANGE FAILURE"; break; case GST_STATE_CHANGE_ASYNC: InfoL << "STATE CHANGE ASYNC"; state_return = gst_element_get_state (m_DataList[source_id].source_bin, NULL, NULL, GST_CLOCK_TIME_NONE); g_snprintf (pad_name, 15, "sink_%u", source_id); sinkpad = gst_element_get_static_pad (streammux, pad_name); gst_pad_send_event (sinkpad, gst_event_new_flush_stop (FALSE)); gst_element_release_request_pad (streammux, sinkpad); g_print ("STATE CHANGE ASYNC %p\n\n", sinkpad); gst_object_unref (sinkpad); gst_bin_remove (GST_BIN (this->pipeline), m_DataList[source_id].source_bin); break; case GST_STATE_CHANGE_NO_PREROLL: InfoL << "STATE CHANGE NO PREROLL"; break; default: break; } } void Inference::cb_newpad (GstElement * decodebin, GstPad * pad, gpointer data) { GstCaps *caps = gst_pad_query_caps (pad, NULL); const GstStructure *str = gst_caps_get_structure (caps, 0); const gchar *name = gst_structure_get_name (str); g_print ("decodebin new pad %s\n", name); if (!strncmp (name, "video", 5)) { gint source_id = (*(gint *) data); gchar pad_name[16] = { 0 }; GstPad *sinkpad = NULL; g_snprintf (pad_name, 15, "sink_%u", source_id); sinkpad = gst_element_get_request_pad (g_streammux, pad_name); if (gst_pad_link (pad, sinkpad) != GST_PAD_LINK_OK) { g_print ("Failed to link decodebin to pipeline\n"); } else { g_print ("Decodebin linked to pipeline\n"); } gst_object_unref (sinkpad); } } GstElement* Inference::create_uridecode_bin (guint index, gchar * filename) { GstElement *bin = NULL; gchar bin_name[16] = { }; g_print ("creating uridecodebin for [%s]\n", filename); g_source_id_list[index] = index; g_snprintf (bin_name, 15, "source-bin-%02d", index); bin = gst_element_factory_make ("uridecodebin", bin_name); g_object_set (G_OBJECT (bin), "uri", filename, NULL); g_signal_connect (G_OBJECT (bin), "pad-added", G_CALLBACK (cb_newpad), &g_source_id_list[index]); g_signal_connect (G_OBJECT (bin), "child-added", G_CALLBACK (decodebin_child_added), &g_source_id_list[index]); g_source_enabled[index] = TRUE; return bin; } // 增加数据源 gboolean Inference::add_sources (int source_Id, std::string uri) { g_mutex_lock (&eos_lock); GstElement *source_bin; GstStateChangeReturn state_return; InfoL << "Calling Start " << source_Id; source_bin = create_uridecode_bin (source_Id, (gchar *)uri.c_str()); if (!source_bin) { ErrorL << "Failed to create source bin. Exiting."; return -1; } m_DataList[source_Id].source_bin = source_bin; gst_bin_add (GST_BIN (this->pipeline), source_bin); state_return = gst_element_set_state (source_bin, GST_STATE_PLAYING); switch (state_return) { case GST_STATE_CHANGE_SUCCESS: InfoL << "STATE CHANGE SUCCESS."; break; case GST_STATE_CHANGE_FAILURE: InfoL << "STATE CHANGE FAILURE"; break; case GST_STATE_CHANGE_ASYNC: InfoL << "STATE CHANGE ASYNC"; state_return = gst_element_get_state (source_bin, NULL, NULL,GST_CLOCK_TIME_NONE); break; case GST_STATE_CHANGE_NO_PREROLL: InfoL << "STATE CHANGE NO PREROLL"; break; default: break; } g_mutex_unlock (&eos_lock); return TRUE; } }