Browse Source

- 加入AuditPlugin组件, 实现各个模块的定时自检

Your Name 2 years ago
parent
commit
dacfdc2437

BIN
lib/libgsd_modules.so


BIN
lib/libgsd_plugins.so


+ 1 - 3
modules/Monitor/src/kafka_comsumer.cpp

@@ -92,12 +92,11 @@ bool kafka_consumer_client::initClient(){
     if (resp != RdKafka::ERR_NO_ERROR){
         ErrorL << "failed to start consumer : " << RdKafka::err2str(resp).c_str() << endl;
     }
-
+    
     return true;
 }
  
 void kafka_consumer_client::consumer(RdKafka::Message *message, void *opt){
-    
     switch(message->err()){
         case RdKafka::ERR__TIMED_OUT:
             this->alive = false;
@@ -131,7 +130,6 @@ bool kafka_consumer_client::consume(int timeout_ms){
         kafka_consumer_->poll(0);
         delete msg;
     }
- 
     kafka_consumer_->stop(topic_, partition_);
     if(topic_){
         delete topic_;

+ 1 - 1
plugins/AuditPlugin/src/AuditPlugin.cpp

@@ -26,7 +26,7 @@ namespace gsd
      * @return {*}
      */    
     bool AuditPlugin::StartTask(){
-        if(this->timer == nullptr) this->timer = std::make_shared<toolkit::Timer>(60.0f, [&](){
+        if(this->timer == nullptr) this->timer = std::make_shared<toolkit::Timer>(10.0f, [&](){
             this->PluginTaskCheck();
             return true;
         }, nullptr);

+ 26 - 8
plugins/MonitorPlugin/src/MonitorPlugin.cpp

@@ -2,6 +2,8 @@
 
 namespace gsd
 {
+    static Socket::Ptr sockRecv = Socket::createSocket();
+
     /**
      * @description: CreateNew
      * @return {*}
@@ -18,11 +20,17 @@ namespace gsd
      */        
     bool MonitorPlugin::Init(){
         if(this->monitor == nullptr) this->monitor = Monitor::CreateNew("localhost", "CnstreamData_0", "cnstream-group");
+        if(this->pool == nullptr) this->pool = std::make_shared<ThreadPool>(1,ThreadPool::PRIORITY_HIGHEST, false);
+        if(sockRecv->listen(9092)){
+            this->PluginAlive = false;
+            sockRecv->closeSock();
+            return true;
+        }
         if(!monitor->Init()) {
             ErrorL << "Monitor's init failed" << endl;
             return false;
         }
-        if(this->pool == nullptr) this->pool = std::make_shared<ThreadPool>(1,ThreadPool::PRIORITY_HIGHEST, false);
+        this->PluginAlive = true;
         return true;
     }
 
@@ -34,7 +42,7 @@ namespace gsd
         if(this->pool == nullptr) return false;
         this->pool->async([&](){
             while(!this->stop_){
-                this->MonitorProThrd();
+                if(this->PluginAlive) this->MonitorProThrd();
             }
         });
         this->pool->start();
@@ -96,7 +104,7 @@ namespace gsd
         nettyClientResultMsg.objectToJson(json);
         json += "\r\n";
         std::string sql = result->ObjectToSql();
-
+ 
         if(TcpPlugin::getPtr()->Alive()){
             TcpPlugin::getPtr()->sendRequest(RequestId, NettyClientCommandEnum().bird_info, json, [&, sql](int status,std::string pbuf){
                 #ifndef DEBUG
@@ -116,10 +124,8 @@ namespace gsd
      */
     bool MonitorPlugin::ConsumeData(FrameInferData::Ptr& result){
         std::shared_ptr<CNStreamInferData> cnstreamInferData = std::make_shared<CNStreamInferData>();
+        if(this->monitor == nullptr) return false;
         auto result_data = monitor->ConsumeData(cnstreamInferData);
-        if(result_data.first == false && result_data.second == "ERR__TIMED_OUT"){
-            this->PluginAlive = false;
-        }
         if(result_data.first){
             // 纠察器
             if(config::getPtr()->InferChecker){
@@ -213,7 +219,7 @@ namespace gsd
         InfoL;
         stop_ = true;
     }
-
+    
     /**
      * @description: 
      * @return {*}
@@ -227,7 +233,19 @@ namespace gsd
      * @return {*}
      */    
     bool MonitorPlugin::RestPlugin(){
+        // 判断端口是否被占用
+        if(sockRecv->listen(9092)){
+            sockRecv->closeSock();
+            return false;
+        }
+        if(this->monitor == nullptr) this->monitor = Monitor::CreateNew("localhost", "CnstreamData_0", "cnstream-group");
+        if(this->monitor != nullptr){
+            if(!this->monitor->Init()){
+                ErrorL << "Monitor's init failed" << endl;
+                return false;
+            }
+            this->PluginAlive = true;
+        } 
         return true;
     }
-
 } // namespace gsd