Explorar el Código

添加按需拉流、动态删减数据源

lishengyin hace 3 años
padre
commit
e06a26b568

BIN
lib/libmodules.so


+ 3 - 0
modules/dataType/include/DataSource.h

@@ -1,5 +1,6 @@
 #pragma once
 #include <iostream>
+#include <gst/gst.h>
 
 
 class DataSource
@@ -10,4 +11,6 @@ public:
     ~DataSource(){};
     int Id;
     std::string uri;
+    GstElement *source_bin = NULL;
+    bool Play = false;
 };

+ 12 - 3
modules/inference/include/inference.h

@@ -39,7 +39,7 @@ namespace MIVA{
     private:
         // Deepstream
         GMainLoop *loop = NULL;
-        GstElement *pipeline = NULL,*streammux = NULL, *sink = NULL, *pgie = NULL,
+        GstElement *pipeline = NULL,*streammux =NULL, *sink = NULL, *pgie = NULL,
         *queue1, *queue2, *queue3, *queue4, *queue5, *nvvidconv = NULL,
         *nvosd = NULL, *tiler = NULL;
 
@@ -51,6 +51,8 @@ namespace MIVA{
         guint tiler_rows, tiler_columns;
         guint pgie_batch_size;
 
+        std::vector<DataSource> m_DataList;
+
     public:
         static std::shared_ptr<Inference> CreateNew();
         Inference();
@@ -61,15 +63,22 @@ namespace MIVA{
         void ReadyTask();
         void StartTask();
         void PauseTask();
+        void StopTask();
+        void RestartTask();
+
     private:
         // static 
         static GstPadProbeReturn tiler_src_pad_buffer_probe(GstPad * pad, GstPadProbeInfo * info, gpointer u_data);
         static gboolean bus_call (GstBus * bus, GstMessage * msg, gpointer data);
-        static void cb_newpad (GstElement * decodebin, GstPad * decoder_src_pad, gpointer data);
         static void decodebin_child_added (GstChildProxy * child_proxy, GObject * object,
             gchar * name, gpointer user_data);
 
-        static GstElement* create_source_bin(guint index, gchar * uri);
+        static GstElement * create_uridecode_bin (guint index, gchar * filename);
+        static void cb_newpad (GstElement * decodebin, GstPad * pad, gpointer data);
+
+        // 动态管理源
+        gboolean add_sources (int source_Id, std::string uri);
+        void stop_release_source (gint source_id);
     };
 }
 

+ 169 - 113
modules/inference/src/inference.cpp

@@ -31,8 +31,18 @@
  * pads having this capability will push GstBuffers containing cuda buffers. */
 #define GST_CAPS_FEATURES_NVMM "memory:NVMM"
 
+#define MAX_NUM_SOURCES 30
+
 gint frame_number = 0;
 
+gint g_num_sources = 0;
+gint g_source_id_list[MAX_NUM_SOURCES];
+gboolean g_eos_list[MAX_NUM_SOURCES];
+gboolean g_source_enabled[MAX_NUM_SOURCES];
+
+GMutex eos_lock;
+
+GstElement *g_streammux = NULL;
 
 namespace MIVA{
     std::shared_ptr<Inference> infer = NULL;
@@ -61,45 +71,28 @@ namespace MIVA{
         // 创建批处理器
         this->streammux = gst_element_factory_make ("nvstreammux", "stream-muxer");
 
+        g_streammux = this->streammux;
+
         if(this->pipeline == NULL || this->streammux == NULL){
             ErrorL << "One element could not be created. Exiting.";
             return ERR;
         }
-        gst_bin_add (GST_BIN (this->pipeline), this->streammux);
+        gst_bin_add (GST_BIN (this->pipeline), streammux);
 
+        this->m_DataList = DataList;
         // 创建数据源
         std::vector<DataSource>::iterator iter;
-        int i = 0;
-        for(iter = DataList.begin(); iter != DataList.end(); iter++){
-            GstPad *sinkpad, *srcpad;
-            gchar pad_name[16] = { };
-            
-            GstElement *source_bin = create_source_bin ((*iter).Id, (gchar*)((*iter).uri).c_str());
-
+        g_num_sources = 0;
+        for(iter = m_DataList.begin(); iter != m_DataList.end(); iter++){
+            GstElement *source_bin = create_uridecode_bin (g_num_sources, (gchar*)((*iter).uri).c_str());
             if (!source_bin) {
                 ErrorL << "Failed to create source bin. Exiting."; 
                 return ERR;
             }
             gst_bin_add(GST_BIN (this->pipeline), source_bin);
-            g_snprintf (pad_name, 15, "sink_%u", i);
-            sinkpad = gst_element_get_request_pad (this->streammux, pad_name);
-            if(!sinkpad){
-                ErrorL << "Streammux request sink pad failed. Exiting.";
-                return ERR;
-            }
-            srcpad = gst_element_get_static_pad(source_bin, "src");
-            if(!srcpad){
-                ErrorL << "Failed to get src pad of source bin. Exiting.";
-                return ERR;
-            }
-            if(gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK){
-                ErrorL << "Failed to link source bin to stream muxer. Exiting.";
-                return ERR;
-            }
-            gst_object_unref (srcpad);
-            gst_object_unref (sinkpad);
-
-            i++;
+            iter->source_bin = source_bin;
+            iter->Play = true;
+            g_num_sources++;
         }
         /* Use nvinfer to infer on batched frame. */
         this->pgie = gst_element_factory_make("nvinfer", "primary-nvinference-engine");
@@ -111,8 +104,6 @@ namespace MIVA{
         this->queue4 = gst_element_factory_make ("queue", "queue4");
         this->queue5 = gst_element_factory_make ("queue", "queue5");
 
-        // g_object_set (G_OBJECT(this->queue1), "max-size-buffers",  20, NULL);
-
         /* Use nvtiler to composite the batched frames into a 2D tiled array based
          * on the source of the frames. */
         this->tiler = gst_element_factory_make ("nvmultistreamtiler", "nvtiler");
@@ -132,7 +123,6 @@ namespace MIVA{
             ErrorL << "One element could not be created. Exiting.";
             return -1;
         }
-
         #ifdef PLATFORM_TEGRA
             if(!this->transform) {
                 ErrorL << "One tegra element could not be created. Exiting.";
@@ -140,9 +130,9 @@ namespace MIVA{
             }
         #endif
 
-        g_object_set(G_OBJECT(this->streammux), "batch-size", i, NULL);
+        g_object_set(G_OBJECT(streammux), "batch-size", g_num_sources, NULL);
 
-        g_object_set (G_OBJECT (this->streammux), "width", MUXER_OUTPUT_WIDTH, "height",MUXER_OUTPUT_HEIGHT,
+        g_object_set (G_OBJECT (streammux), "width", MUXER_OUTPUT_WIDTH, "height",MUXER_OUTPUT_HEIGHT,
             "batched-push-timeout", MUXER_BATCH_TIMEOUT_USEC, NULL);
 
         /* Configure the nvinfer element using the nvinfer config file. */
@@ -152,13 +142,13 @@ namespace MIVA{
         /* Override the batch-size set in the config file with the number of sources. */
         g_object_get (G_OBJECT (this->pgie), "batch-size", &(this->pgie_batch_size), NULL);
 
-        if (this->pgie_batch_size != i) {
-            WarnL << "WARNING: Overriding infer-config batch-size:" << this->pgie_batch_size << "with number of sources ("<<  i << ")";
-            g_object_set (G_OBJECT (this->pgie), "batch-size", i, NULL);
+        if (this->pgie_batch_size != g_num_sources) {
+            WarnL << "WARNING: Overriding infer-config batch-size:" << this->pgie_batch_size << "with number of sources ("<<  g_num_sources << ")";
+            g_object_set (G_OBJECT (this->pgie), "batch-size", g_num_sources, NULL);
         }
 
-        this->tiler_rows = (guint) sqrt (i);
-        this->tiler_columns = (guint) ceil (1.0 * i / this->tiler_rows);
+        this->tiler_rows = (guint) sqrt (g_num_sources);
+        this->tiler_columns = (guint) ceil (1.0 * g_num_sources / this->tiler_rows);
 
         /* we set the tiler properties here */
         g_object_set (G_OBJECT (this->tiler), "rows", this->tiler_rows, "columns", this->tiler_columns,
@@ -177,22 +167,11 @@ namespace MIVA{
         gst_bin_add_many (GST_BIN (this->pipeline), this->queue1, this->pgie, this->queue2, this->tiler, this->queue3,
             this->nvvidconv, this->queue4, this->nvosd, this->queue5, this->transform, this->sink, NULL);
 
-        gst_bin_add_many (GST_BIN (this->pipeline), this->queue2, this->tiler, this->queue3,
-            this->nvvidconv, this->queue4, this->nvosd, this->queue5, this->transform, this->sink, NULL);
-
-        //     /* we link the elements together
-        //     * nvstreammux -> nvinfer -> nvtiler -> nvvidconv -> nvosd -> video-renderer */
-
-        if (!gst_element_link_many (this->streammux, this->queue2, this->tiler, this->queue3,
+        if (!gst_element_link_many (streammux, this->queue1, this->pgie, this->queue2, this->tiler, this->queue3,
                 this->nvvidconv, this->queue4, this->nvosd, this->queue5, this->transform, this->sink, NULL)) {
             ErrorL << "Elements could not be linked. Exiting.";
             return -1;
         }
-        // if (!gst_element_link_many (this->streammux, this->queue1, this->pgie, this->queue2, this->tiler, this->queue3,
-        //         this->nvvidconv, this->queue4, this->nvosd, this->queue5, this->transform, this->sink, NULL)) {
-        //     ErrorL << "Elements could not be linked. Exiting.";
-        //     return -1;
-        // }
 
         this->tiler_src_pad = gst_element_get_static_pad(this->pgie, "src");
         if (!this->tiler_src_pad)
@@ -213,7 +192,13 @@ namespace MIVA{
     // 启动任务
     void Inference::StartTask()
     {
+        static int ret = 0;
         InfoL << "Now palying";
+        if(ret != 0){
+            this->RestartTask();
+        }else{
+            ret++;
+        }
         gst_element_set_state(this->pipeline, GST_STATE_PLAYING);
     }
     // 暂停任务
@@ -222,6 +207,31 @@ namespace MIVA{
         InfoL << "Now Pause";
         gst_element_set_state(this->pipeline, GST_STATE_PAUSED);
     }
+    void Inference::StopTask()
+    {
+        int sourceId = 0;
+        std::vector<DataSource>::iterator iter;
+        for (iter = this->m_DataList.begin(); iter != this->m_DataList.end(); iter++){
+            if(iter->Play == true){
+                this->stop_release_source(sourceId);
+                iter->Play = false;
+                iter->source_bin = NULL;
+            }
+            sourceId++;
+        }
+    }
+    void Inference::RestartTask()
+    {
+        int sourceId = 0;
+        std::vector<DataSource>::iterator iter;
+        for (iter = this->m_DataList.begin(); iter != this->m_DataList.end(); iter++){
+            if(iter->Play == false){
+                this->add_sources(sourceId, iter->uri);
+                iter->Play = true;
+            }
+            sourceId++;
+        }
+    }
     // 销毁对象
     void Inference::Destory()
     {
@@ -342,89 +352,135 @@ namespace MIVA{
         return TRUE;
     }
 
-    void Inference::cb_newpad (GstElement * decodebin, GstPad * decoder_src_pad, gpointer data)
+    void Inference::decodebin_child_added (GstChildProxy * child_proxy, GObject * object,
+        gchar * name, gpointer user_data)
     {
-        InfoL << "In cb_newpad";
-        GstCaps *caps = gst_pad_get_current_caps (decoder_src_pad);
+
+        InfoL << "Decodebin child added: " << name;
+        if (g_strrstr (name, "decodebin") == name) {
+            g_signal_connect (G_OBJECT (object), "child-added",
+                G_CALLBACK (decodebin_child_added), user_data);
+        }
+    }
+
+    void Inference::stop_release_source (gint source_id)
+    {
+        GstStateChangeReturn state_return;
+        gchar pad_name[16];
+        GstPad *sinkpad = NULL;
+
+        state_return = gst_element_set_state (m_DataList[source_id].source_bin, GST_STATE_NULL);
+
+        switch (state_return) {
+            case GST_STATE_CHANGE_SUCCESS:
+                InfoL << "STATE CHANGE SUCCESS";
+                g_snprintf (pad_name, 15, "sink_%u", source_id);
+                sinkpad = gst_element_get_static_pad (streammux, pad_name);
+                gst_pad_send_event (sinkpad, gst_event_new_flush_stop (FALSE));
+                gst_element_release_request_pad (streammux, sinkpad);
+                InfoL << "STATE CHANGE SUCCESS:" << source_id;
+                gst_object_unref (sinkpad);
+                gst_bin_remove (GST_BIN (this->pipeline), m_DataList[source_id].source_bin);
+                break;
+            case GST_STATE_CHANGE_FAILURE:
+                ErrorL << "STATE CHANGE FAILURE";
+                break;
+            case GST_STATE_CHANGE_ASYNC:
+                InfoL << "STATE CHANGE ASYNC";
+                state_return =
+                    gst_element_get_state (m_DataList[source_id].source_bin, NULL, NULL,
+                    GST_CLOCK_TIME_NONE);
+                    g_snprintf (pad_name, 15, "sink_%u", source_id);
+                    sinkpad = gst_element_get_static_pad (streammux, pad_name);
+                    gst_pad_send_event (sinkpad, gst_event_new_flush_stop (FALSE));
+                    gst_element_release_request_pad (streammux, sinkpad);
+                    g_print ("STATE CHANGE ASYNC %p\n\n", sinkpad);
+                    gst_object_unref (sinkpad);
+                    gst_bin_remove (GST_BIN (this->pipeline), m_DataList[source_id].source_bin);
+                break;
+            case GST_STATE_CHANGE_NO_PREROLL:
+                InfoL << "STATE CHANGE NO PREROLL";
+                break;
+            default:
+                break;
+        }
+    }
+
+    void Inference::cb_newpad (GstElement * decodebin, GstPad * pad, gpointer data)
+    {
+        GstCaps *caps = gst_pad_query_caps (pad, NULL);
         const GstStructure *str = gst_caps_get_structure (caps, 0);
         const gchar *name = gst_structure_get_name (str);
-        GstElement *source_bin = (GstElement *) data;
-        GstCapsFeatures *features = gst_caps_get_features (caps, 0);
 
-        /* Need to check if the pad created by the decodebin is for video and not
-        * audio. */
+        g_print ("decodebin new pad %s\n", name);
         if (!strncmp (name, "video", 5)) {
-            /* Link the decodebin pad only if decodebin has picked nvidia
-            * decoder plugin nvdec_*. We do this by checking if the pad caps contain
-            * NVMM memory features. */
-            if (gst_caps_features_contains (features, GST_CAPS_FEATURES_NVMM)) {
-            /* Get the source bin ghost pad */
-            GstPad *bin_ghost_pad = gst_element_get_static_pad (source_bin, "src");
-            if (!gst_ghost_pad_set_target (GST_GHOST_PAD (bin_ghost_pad),
-                    decoder_src_pad)) {
-                ErrorL << "Failed to link decoder src pad to source bin ghost pad";
-            }
-            gst_object_unref (bin_ghost_pad);
+            gint source_id = (*(gint *) data);
+            gchar pad_name[16] = { 0 };
+            GstPad *sinkpad = NULL;
+            g_snprintf (pad_name, 15, "sink_%u", source_id);
+            sinkpad = gst_element_get_request_pad (g_streammux, pad_name);
+            if (gst_pad_link (pad, sinkpad) != GST_PAD_LINK_OK) {
+                g_print ("Failed to link decodebin to pipeline\n");
             } else {
-                ErrorL << "Error: Decodebin did not pick nvidia decoder plugin.";
+                g_print ("Decodebin linked to pipeline\n");
             }
+            gst_object_unref (sinkpad);
         }
     }
 
-    void Inference::decodebin_child_added (GstChildProxy * child_proxy, GObject * object,
-        gchar * name, gpointer user_data)
+    GstElement* Inference::create_uridecode_bin (guint index, gchar * filename)
     {
+        GstElement *bin = NULL;
+        gchar bin_name[16] = { };
 
-        InfoL << "Decodebin child added: " << name;
-        if (g_strrstr (name, "decodebin") == name) {
-            g_signal_connect (G_OBJECT (object), "child-added",
-                G_CALLBACK (decodebin_child_added), user_data);
-        }
+        g_print ("creating uridecodebin for [%s]\n", filename);
+        g_source_id_list[index] = index;
+        g_snprintf (bin_name, 15, "source-bin-%02d", index);
+        bin = gst_element_factory_make ("uridecodebin", bin_name);
+        g_object_set (G_OBJECT (bin), "uri", filename, NULL);
+        g_signal_connect (G_OBJECT (bin), "pad-added",
+            G_CALLBACK (cb_newpad), &g_source_id_list[index]);
+        g_signal_connect (G_OBJECT (bin), "child-added",
+            G_CALLBACK (decodebin_child_added), &g_source_id_list[index]);
+        g_source_enabled[index] = TRUE;
+
+        return bin;
     }
 
-    GstElement* Inference::create_source_bin(guint index, gchar * uri)
+    // 增加数据源
+    gboolean Inference::add_sources (int source_Id, std::string uri)
     {
-        GstElement *bin = NULL, *uri_decode_bin = NULL;
-        gchar bin_name[16] = { };
+        GstElement *source_bin;
+        GstStateChangeReturn state_return;
 
-        g_snprintf (bin_name, 15, "source-bin-%02d", index);
-        /* Create a source GstBin to abstract this bin's content from the rest of the
-        * pipeline */
-        bin = gst_bin_new (bin_name);
-
-        /* Source element for reading from the uri.
-        * We will use decodebin and let it figure out the container format of the
-        * stream and the codec and plug the appropriate demux and decode plugins. */
-        uri_decode_bin = gst_element_factory_make ("uridecodebin", "uri-decode-bin");
-
-        if (!bin || !uri_decode_bin) {
-            ErrorL << "One element in source bin could not be created.";
-            return NULL;
+        InfoL << "Calling Start " << source_Id;
+        source_bin = create_uridecode_bin (source_Id, (gchar *)uri.c_str());
+        if (!source_bin) {
+            ErrorL << "Failed to create source bin. Exiting.";
+            return -1;
         }
+        m_DataList[source_Id].source_bin = source_bin;
+        gst_bin_add (GST_BIN (this->pipeline), source_bin);
+        state_return = gst_element_set_state (source_bin, GST_STATE_PLAYING);
 
-        /* We set the input uri to the source element */
-        g_object_set (G_OBJECT (uri_decode_bin), "uri", uri, NULL);
-
-        /* Connect to the "pad-added" signal of the decodebin which generates a
-        * callback once a new pad for raw data has beed created by the decodebin */
-        g_signal_connect (G_OBJECT (uri_decode_bin), "pad-added",
-        G_CALLBACK (cb_newpad), bin);
-        g_signal_connect (G_OBJECT (uri_decode_bin), "child-added",
-        G_CALLBACK (decodebin_child_added), bin);
-
-        gst_bin_add (GST_BIN (bin), uri_decode_bin);
-
-        /* We need to create a ghost pad for the source bin which will act as a proxy
-        * for the video decoder src pad. The ghost pad will not have a target right
-        * now. Once the decode bin creates the video decoder and generates the
-        * cb_newpad callback, we will set the ghost pad target to the video decoder
-        * src pad. */
-        if (!gst_element_add_pad (bin, gst_ghost_pad_new_no_target ("src",
-                GST_PAD_SRC))) {
-            ErrorL << "Failed to add ghost pad in source bin";
-            return NULL;
+        switch (state_return) {
+            case GST_STATE_CHANGE_SUCCESS:
+                InfoL << "STATE CHANGE SUCCESS.";
+                break;
+            case GST_STATE_CHANGE_FAILURE:
+                InfoL << "STATE CHANGE FAILURE";
+                break;
+            case GST_STATE_CHANGE_ASYNC:
+                InfoL << "STATE CHANGE ASYNC";
+                state_return = gst_element_get_state (source_bin, NULL, NULL,GST_CLOCK_TIME_NONE);
+                break;
+            case GST_STATE_CHANGE_NO_PREROLL:
+                InfoL << "STATE CHANGE NO PREROLL";
+                break;
+            default:
+                break;
         }
-        return bin;
+        return TRUE;
     }
 }
 

+ 5 - 4
modules/userApp/src/user_app.cpp

@@ -11,7 +11,7 @@ namespace MIVA
     UserApp::UserApp(){
         
     }
-    
+
     UserApp::~UserApp(){
         Destroy();
     }
@@ -169,8 +169,9 @@ namespace MIVA
     void UserApp::ListenInfer(int Source_id, int num)
     {   
         if(this->play == true){
-            this->m_timer2 = std::make_shared<Timer>(3.0f,[&](){
-                this->m_Infer->PauseTask();
+            this->m_timer2 = std::make_shared<Timer>(5.0f,[&](){
+                this->m_Infer->StopTask();
+                // this->m_Infer->PauseTask();
                 // 推理结束发布InferData事件
                 NoticeCenter::Instance().emitEvent(NOTICE_INFERDATA);
                 return false;
@@ -192,7 +193,7 @@ namespace MIVA
         if(this->m_tcpClient == NULL) this->m_tcpClient = TCPClient::Ptr(new TCPClient());
         this->m_tcpClient->startConnect(this->Netty_ip, this->Netty_port);
     }
-
+    
     // 上报识别结果
     void UserApp::ReportData()
     {

BIN
source/bin/main