Explorar el Código

加速动态资源的释放,优化其管理

lishengyin hace 3 años
padre
commit
f733c38f85

BIN
lib/libmodules.so


+ 2 - 0
modules/inference/include/inference.h

@@ -27,6 +27,7 @@
 #include "Network/TcpClient.h"
 #include "Poller/Timer.h"
 #include "TCPClient.h"
+#include <mutex>
 
 #include <list>
 
@@ -79,6 +80,7 @@ namespace MIVA{
         // 动态管理源
         gboolean add_sources (int source_Id, std::string uri);
         void stop_release_source (gint source_id);
+        void stop_release_source1();
     };
 }
 

+ 62 - 9
modules/inference/src/inference.cpp

@@ -46,6 +46,7 @@ GstElement *g_streammux = NULL;
 
 namespace MIVA{
     std::shared_ptr<Inference> infer = NULL;
+    ThreadPool pool(6,ThreadPool::PRIORITY_HIGHEST, false);
     std::shared_ptr<Inference> Inference::CreateNew()
     {
         if(infer == NULL) infer = std::make_shared<Inference>();
@@ -207,18 +208,21 @@ namespace MIVA{
         InfoL << "Now Pause";
         gst_element_set_state(this->pipeline, GST_STATE_PAUSED);
     }
+    
     void Inference::StopTask()
     {
-        int sourceId = 0;
-        std::vector<DataSource>::iterator iter;
-        for (iter = this->m_DataList.begin(); iter != this->m_DataList.end(); iter++){
-            if(iter->Play == true){
-                this->stop_release_source(sourceId);
-                iter->Play = false;
-                iter->source_bin = NULL;
+        pool.async([&](){
+            int sourceId = 0;
+            std::vector<DataSource>::iterator iter;
+            for (iter = this->m_DataList.begin(); iter != this->m_DataList.end(); iter++){
+                if(iter->Play){
+                    this->stop_release_source(sourceId);
+                    sourceId++;
+                    iter->Play = false;
+                }
             }
-            sourceId++;
-        }
+        });
+        pool.start();
         WarnL << "释放资源成功";
     }
     void Inference::RestartTask()
@@ -371,7 +375,54 @@ namespace MIVA{
         GstPad *sinkpad = NULL;
 
         state_return = gst_element_set_state (m_DataList[source_id].source_bin, GST_STATE_NULL);
+        switch (state_return) {
+            case GST_STATE_CHANGE_SUCCESS:
+
+                InfoL << "STATE CHANGE SUCCESS";
+                g_snprintf (pad_name, 15, "sink_%u", source_id);
+                sinkpad = gst_element_get_static_pad (streammux, pad_name);
+                gst_pad_send_event (sinkpad, gst_event_new_flush_stop (FALSE));
+                gst_element_release_request_pad (streammux, sinkpad);
+                InfoL << "STATE CHANGE SUCCESS:" << source_id;
+                gst_object_unref (sinkpad);
+                gst_bin_remove (GST_BIN (this->pipeline), m_DataList[source_id].source_bin);
+                break;
+            case GST_STATE_CHANGE_FAILURE:
+                ErrorL << "STATE CHANGE FAILURE";
+                break;
+            case GST_STATE_CHANGE_ASYNC:
+                InfoL << "STATE CHANGE ASYNC";
+                state_return =
+                    gst_element_get_state (m_DataList[source_id].source_bin, NULL, NULL,
+                    GST_CLOCK_TIME_NONE);
+                    g_snprintf (pad_name, 15, "sink_%u", source_id);
+                    sinkpad = gst_element_get_static_pad (streammux, pad_name);
+                    gst_pad_send_event (sinkpad, gst_event_new_flush_stop (FALSE));
+                    gst_element_release_request_pad (streammux, sinkpad);
+                    g_print ("STATE CHANGE ASYNC %p\n\n", sinkpad);
+                    gst_object_unref (sinkpad);
+                    gst_bin_remove (GST_BIN (this->pipeline), m_DataList[source_id].source_bin);
+                break;
+            case GST_STATE_CHANGE_NO_PREROLL:
+                InfoL << "STATE CHANGE NO PREROLL";
+                break;
+            default:
+                break;
+        }
+    }
+
+    void Inference::stop_release_source1()
+    {
+        static int source_id = 0;
+        if(source_id == m_DataList.size()){
+            source_id = 0;
+        }
+
+        GstStateChangeReturn state_return;
+        gchar pad_name[16];
+        GstPad *sinkpad = NULL;
 
+        state_return = gst_element_set_state (m_DataList[source_id].source_bin, GST_STATE_NULL);
         switch (state_return) {
             case GST_STATE_CHANGE_SUCCESS:
                 InfoL << "STATE CHANGE SUCCESS";
@@ -382,6 +433,7 @@ namespace MIVA{
                 InfoL << "STATE CHANGE SUCCESS:" << source_id;
                 gst_object_unref (sinkpad);
                 gst_bin_remove (GST_BIN (this->pipeline), m_DataList[source_id].source_bin);
+                m_DataList[source_id].source_bin = NULL;
                 break;
             case GST_STATE_CHANGE_FAILURE:
                 ErrorL << "STATE CHANGE FAILURE";
@@ -405,6 +457,7 @@ namespace MIVA{
             default:
                 break;
         }
+        source_id++;
     }
 
     void Inference::cb_newpad (GstElement * decodebin, GstPad * pad, gpointer data)

BIN
source/bin/main