|
@@ -0,0 +1,977 @@
|
|
|
+package com.sunwin.metro;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSONArray;
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.alibaba.fastjson.serializer.SerializerFeature;
|
|
|
+import com.google.common.collect.Lists;
|
|
|
+import com.sunwin.metro.bean.*;
|
|
|
+import com.sunwin.metro.constant.Descriptors;
|
|
|
+import com.sunwin.metro.drools.MetroProcessor;
|
|
|
+import com.sunwin.metro.drools.NormalRuleProcess;
|
|
|
+import com.sunwin.metro.governance.bean.Expression;
|
|
|
+import com.sunwin.metro.governance.bean.Integrity;
|
|
|
+import com.sunwin.metro.governance.bean.Timeliness;
|
|
|
+import com.sunwin.metro.governance.bean.Uniformity;
|
|
|
+import com.sunwin.metro.process.BroadcastMeterMes;
|
|
|
+import com.sunwin.metro.process.ErrorMetersProcess;
|
|
|
+import com.sunwin.metro.process.PrometheusProcess;
|
|
|
+import com.sunwin.metro.source.CreateMetroMes;
|
|
|
+import com.sunwin.metro.source.MetroSubSource;
|
|
|
+
|
|
|
+import com.sunwin.metro.utils.*;
|
|
|
+import lombok.SneakyThrows;
|
|
|
+import org.apache.flink.api.common.functions.FilterFunction;
|
|
|
+import org.apache.flink.api.common.functions.FlatMapFunction;
|
|
|
+import org.apache.flink.api.common.functions.MapFunction;
|
|
|
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
|
|
|
+import org.apache.flink.api.common.state.BroadcastState;
|
|
|
+import org.apache.flink.api.common.state.MapState;
|
|
|
+import org.apache.flink.api.common.state.MapStateDescriptor;
|
|
|
+import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
|
|
|
+import org.apache.flink.api.common.typeinfo.TypeInformation;
|
|
|
+import org.apache.flink.api.java.functions.KeySelector;
|
|
|
+import org.apache.flink.api.java.tuple.*;
|
|
|
+import org.apache.flink.configuration.Configuration;
|
|
|
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
|
|
|
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
|
|
|
+import org.apache.flink.streaming.api.CheckpointingMode;
|
|
|
+import org.apache.flink.streaming.api.datastream.*;
|
|
|
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
|
|
|
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
|
|
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
|
|
|
+import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
|
|
|
+import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
|
|
|
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
|
|
|
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
|
|
|
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
|
|
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
|
|
|
+import org.apache.flink.streaming.api.windowing.time.Time;
|
|
|
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
|
|
+import org.apache.flink.table.api.EnvironmentSettings;
|
|
|
+import org.apache.flink.table.api.Table;
|
|
|
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
|
|
|
+import org.apache.flink.types.Row;
|
|
|
+import org.apache.flink.util.Collector;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.stringtemplate.v4.ST;
|
|
|
+import org.stringtemplate.v4.STGroup;
|
|
|
+import org.stringtemplate.v4.STGroupFile;
|
|
|
+import pro.husk.mysql.MySQL;
|
|
|
+
|
|
|
+
|
|
|
+import java.math.BigDecimal;
|
|
|
+import java.sql.*;
|
|
|
+import java.text.DecimalFormat;
|
|
|
+import java.text.SimpleDateFormat;
|
|
|
+import java.util.*;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.function.Consumer;
|
|
|
+import java.util.function.Function;
|
|
|
+import java.util.function.Predicate;
|
|
|
+
|
|
|
+import static org.apache.flink.table.api.Expressions.$;
|
|
|
+
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author xuYJ
|
|
|
+ * @description: 程序入口 区间隧道风机 BAS_TVF(环境与设备监控系统) 排热风机 BAS_TEF(环境与设备监控系统) 不间断电源 ups(不间断电源系统) ups巡检仪 UPS_XJY(不间断电源系统) 雨水泵 BAS_AT(环境与设备监控系统)
|
|
|
+ * @create: 2020-08-26 14:10
|
|
|
+ */
|
|
|
+public class StartRule {
|
|
|
+ private static final Logger logger = LoggerFactory.getLogger(StartRule.class);
|
|
|
+
|
|
|
+ public static void main(String[] args) throws Exception {
|
|
|
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
|
|
+ // 设置重启机制(固定重启机制,重启5次,每隔3秒)(测试过程可以关闭重启机制)
|
|
|
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, org.apache.flink.api.common.time.Time.of(5, TimeUnit.SECONDS)));
|
|
|
+ // env.setStateBackend(new RocksDBStateBackend("hdfs://192.168.20.63:8020/flink/flink-checkpoint/rule/", true).getCheckpointBackend());
|
|
|
+ env.setStateBackend(new EmbeddedRocksDBStateBackend());
|
|
|
+ env.getCheckpointConfig().setCheckpointStorage("hdfs://192.168.20.63:8020/flink/flink-checkpoint/rules/");
|
|
|
+ // 每隔1分钟进行启动一个检查点
|
|
|
+ env.enableCheckpointing(10000 * 6);
|
|
|
+ env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000 * 3);
|
|
|
+ env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
|
|
|
+ env.getCheckpointConfig().setCheckpointTimeout(60000 * 10);
|
|
|
+ env.getCheckpointConfig().setTolerableCheckpointFailureNumber(100);
|
|
|
+ env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
|
|
|
+ env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
|
|
|
+ EnvironmentSettings build = EnvironmentSettings.newInstance()
|
|
|
+ .useBlinkPlanner()
|
|
|
+ .inStreamingMode()
|
|
|
+ .build();
|
|
|
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, build);
|
|
|
+ //生成广播流
|
|
|
+ DataStreamSource<Map<String, List<String>>> addSource = env
|
|
|
+ .addSource(new MetroSubSource());
|
|
|
+ //用来删除Prometheus失效的节点
|
|
|
+ addSource.keyBy(new MeterKey()).process(new DeletePush()).print();
|
|
|
+ BroadcastStream<Map<String, List<String>>> broadcast = addSource
|
|
|
+ .setParallelism(1).broadcast(Descriptors.DESCRIPTOR);
|
|
|
+ DataStreamSource<String> source = env.addSource(new CreateMetroMes());
|
|
|
+ SingleOutputStreamOperator<Tuple2<String, String>> connect = source
|
|
|
+ .connect(broadcast)
|
|
|
+ .process(new BroadcastMeterMes());
|
|
|
+ SingleOutputStreamOperator<Tuple2<String, String>> process = connect.process(new ErrorMetersProcess(Descriptors.ERROR_METER));
|
|
|
+ connect.filter(new FilterFunction<Tuple2<String, String>>() {
|
|
|
+ @Override
|
|
|
+ public boolean filter(Tuple2<String, String> value) throws Exception {
|
|
|
+ return !(value.f1.contains("true"));
|
|
|
+ }
|
|
|
+ })
|
|
|
+ .keyBy((KeySelector<Tuple2<String, String>, String>) tuple2 -> tuple2.f0)
|
|
|
+ //整理后的数据发送到Prometheus
|
|
|
+ .process(new PrometheusProcess()).print();
|
|
|
+
|
|
|
+ //DataStream<Tuple2<String, String>> sideOutput = process.getSideOutput(Descriptors.ERROR_METER);
|
|
|
+ //SingleOutputStreamOperator<Tuple2<String, String>> filter = process.filter(new FilterFunction<Tuple2<String, String>>() {
|
|
|
+ // @Override
|
|
|
+ // public boolean filter(Tuple2<String, String> value) throws Exception {
|
|
|
+ // return "ups巡检仪".equals(value.f0);
|
|
|
+ // }
|
|
|
+ //});
|
|
|
+ //filter.addSink(org.apache.flink.connector.jdbc.JdbcSink.sink(
|
|
|
+ // "insert ignore ups_xjy_rule (name,create_time , zdl , zdy , wd ) values (?,?,?,?,?)",
|
|
|
+ // (ps, t) -> {
|
|
|
+ // UpsXjyMes upsXjyMes = JSONObject.parseObject(t.f1, UpsXjyMes.class);
|
|
|
+ // ps.setString(1, upsXjyMes.getNum() + "_" + upsXjyMes.getStation() + "_" + upsXjyMes.getName());
|
|
|
+ // ps.setString(2, upsXjyMes.getTime());
|
|
|
+ // ps.setFloat(3, BigDecimal.valueOf(upsXjyMes.getZdl()).floatValue());
|
|
|
+ // ps.setFloat(4, BigDecimal.valueOf(upsXjyMes.getZdy()).floatValue());
|
|
|
+ // ps.setFloat(5, BigDecimal.valueOf(upsXjyMes.getWd()).floatValue());
|
|
|
+ //
|
|
|
+ // },
|
|
|
+ // new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
|
|
|
+ // .withUrl("jdbc:mysql://192.168.20.61:3306/rule-engine?characterEncoding=utf-8&serverTimezone=UTC")
|
|
|
+ // .withDriverName("com.mysql.jdbc.Driver")
|
|
|
+ // .withPassword("root")
|
|
|
+ // .withUsername("root")
|
|
|
+ // .build()));
|
|
|
+
|
|
|
+ //SingleOutputStreamOperator<Tuple2<String, String>> filter2 = process.filter(new FilterFunction<Tuple2<String, String>>() {
|
|
|
+ // @Override
|
|
|
+ // public boolean filter(Tuple2<String, String> value) throws Exception {
|
|
|
+ // return "排热风机".equals(value.f0);
|
|
|
+ // }
|
|
|
+ //});
|
|
|
+ //filter2.addSink(org.apache.flink.connector.jdbc.JdbcSink.sink(
|
|
|
+ // "insert ignore bas_tvf_rule (name,create_time , x_vibr , y_vibr , faxlet ) values (?,?,?,?,?)",
|
|
|
+ // (ps, t) -> {
|
|
|
+ // BasTvfMes basTvfMes = JSONObject.parseObject(t.f1, BasTvfMes.class);
|
|
|
+ // ps.setString(1, basTvfMes.getNum() + "_" + basTvfMes.getStation() + "_" + basTvfMes.getName());
|
|
|
+ // ps.setString(2, basTvfMes.getTime());
|
|
|
+ // ps.setFloat(3, BigDecimal.valueOf(basTvfMes.getAi_x_vibr()).floatValue());
|
|
|
+ // ps.setFloat(4, BigDecimal.valueOf(basTvfMes.getAi_y_vibr()).floatValue());
|
|
|
+ // ps.setFloat(5, BigDecimal.valueOf(basTvfMes.getAi_faxlet()).floatValue());
|
|
|
+ //
|
|
|
+ // },
|
|
|
+ // new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
|
|
|
+ // .withUrl("jdbc:mysql://192.168.20.61:3306/rule-engine?characterEncoding=utf-8&serverTimezone=UTC")
|
|
|
+ // .withDriverName("com.mysql.jdbc.Driver")
|
|
|
+ // .withPassword("root")
|
|
|
+ // .withUsername("root")
|
|
|
+ // .build()));
|
|
|
+
|
|
|
+ String source1 =
|
|
|
+ "CREATE TABLE mes_governance (" +
|
|
|
+ " meter_type STRING ," +
|
|
|
+ " meter_mes STRING " +
|
|
|
+ ") WITH (" +
|
|
|
+ "'connector' = 'mysql-cdc'," +
|
|
|
+ "'hostname' = '192.168.20.61'," +
|
|
|
+ "'port' = '3306'," +
|
|
|
+ "'username' = 'root'," +
|
|
|
+ "'password' = 'root'," +
|
|
|
+ "'database-name' = 'rule-engine'," +
|
|
|
+ "'table-name' = 'mes_governance'" +
|
|
|
+ ")";
|
|
|
+
|
|
|
+ tableEnv.executeSql(source1);
|
|
|
+ Table testSink = tableEnv.from("mes_governance");
|
|
|
+ DataStream<Tuple2<Boolean, Row>> stream = tableEnv.toRetractStream(testSink, Row.class);
|
|
|
+ SingleOutputStreamOperator<Tuple2<String, String>> map = stream.filter((FilterFunction<Tuple2<Boolean, Row>>) value -> value.f0)
|
|
|
+ .map(new MapFunction<Tuple2<Boolean, Row>, Tuple2<String, String>>() {
|
|
|
+ @Override
|
|
|
+ public Tuple2<String, String> map(Tuple2<Boolean, Row> value) throws Exception {
|
|
|
+ return new Tuple2<>(Objects.requireNonNull(value.f1.getField("meter_type")).toString(), Objects.requireNonNull(value.f1.getField("meter_mes")).toString());
|
|
|
+ }
|
|
|
+ });
|
|
|
+ DataStream<Tuple2<String, String>> union = process.union(map);
|
|
|
+ long window = 60 * 15;
|
|
|
+ //long window = 60;
|
|
|
+ //专门适应采集及时性
|
|
|
+ SingleOutputStreamOperator<List<Tuple4<String, String, Long, String>>> process3 = union.map(new MapFunction<Tuple2<String, String>, Tuple4<String, String, Long, String>>() {
|
|
|
+ @Override
|
|
|
+ public Tuple4<String, String, Long, String> map(Tuple2<String, String> value) throws Exception {
|
|
|
+ String appType = JSONObject.parseObject(value.f1).getString("appType");
|
|
|
+ return new Tuple4<>(value.f0 + "_" + appType, value.f1, System.currentTimeMillis(), appType);
|
|
|
+ }
|
|
|
+ }).keyBy(new KeySelector<Tuple4<String, String, Long, String>, String>() {
|
|
|
+ @Override
|
|
|
+ public String getKey(Tuple4<String, String, Long, String> value) throws Exception {
|
|
|
+ return value.f0;
|
|
|
+ }
|
|
|
+ }).window(TumblingProcessingTimeWindows.of(Time.seconds(window))).process(new ProcessWindowFunction<Tuple4<String, String, Long, String>, List<Tuple4<String, String, Long, String>>, String, TimeWindow>() {
|
|
|
+ @Override
|
|
|
+ public void process(String s, Context context, Iterable<Tuple4<String, String, Long, String>> elements, Collector<List<Tuple4<String, String, Long, String>>> out) throws Exception {
|
|
|
+ ArrayList<Tuple4<String, String, Long, String>> arrayList = Lists.newArrayList(elements);
|
|
|
+ //窗口内部去重
|
|
|
+ List<Tuple4<String, String, Long, String>> newList = new ArrayList<>();
|
|
|
+ arrayList.stream().filter(distinctByKey(p -> p.f1))
|
|
|
+ .forEach(newList::add);
|
|
|
+ out.collect(newList);
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ DataStreamSource<Tuple2<String, JSONObject>> source2 = env.addSource(new RichSourceFunction<Tuple2<String, JSONObject>>() {
|
|
|
+ Connection connection;
|
|
|
+ PreparedStatement pStatement;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void open(Configuration parameters) throws Exception {
|
|
|
+ super.open(parameters);
|
|
|
+ connection = MysqlUtil.getCon();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close() throws Exception {
|
|
|
+ super.close();
|
|
|
+ if (pStatement != null) {
|
|
|
+ pStatement.close();
|
|
|
+ }
|
|
|
+ if (connection != null) {
|
|
|
+ connection.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run(SourceContext<Tuple2<String, JSONObject>> ctx) throws Exception {
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ pStatement = connection.prepareStatement("SELECT * FROM t_data_rule;");
|
|
|
+ ResultSet results = pStatement.executeQuery();
|
|
|
+ while (results.next()) {
|
|
|
+ JSONObject jsonObject = new JSONObject();
|
|
|
+ String ruleId = String.valueOf(results.getInt("id"));
|
|
|
+ jsonObject.put("rule_id", ruleId);
|
|
|
+ jsonObject.put("model", results.getString("target_name"));
|
|
|
+ jsonObject.put("type", getMeterName(results.getString("equipment_type_name")));
|
|
|
+ jsonObject.put("appType", String.valueOf(results.getInt("application_id")));
|
|
|
+ jsonObject.put("mes", results.getString("expr"));
|
|
|
+ jsonObject.put("flag", results.getInt("flag"));
|
|
|
+ ctx.collect(new Tuple2<>("1组", jsonObject));
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ System.out.println("连接postgres规则库异常");
|
|
|
+ e.printStackTrace();
|
|
|
+ connection = MysqlUtil.getCon2();
|
|
|
+ }
|
|
|
+ Thread.sleep(5000L);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @SneakyThrows
|
|
|
+ @Override
|
|
|
+ public void cancel() {
|
|
|
+ if (pStatement != null) {
|
|
|
+ pStatement.close();
|
|
|
+ }
|
|
|
+ if (connection != null) {
|
|
|
+ connection.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ //侧边流输出3种规则
|
|
|
+ SingleOutputStreamOperator<String> outputStreamOperator1 = source2.keyBy(new KeySelector<Tuple2<String, JSONObject>, String>() {
|
|
|
+ @Override
|
|
|
+ public String getKey(Tuple2<String, JSONObject> value) throws Exception {
|
|
|
+ return value.f0;
|
|
|
+ }
|
|
|
+ }).process(new KeyedProcessFunction<String, Tuple2<String, JSONObject>, String>() {
|
|
|
+ private transient MapState<String, String> mapState;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void open(Configuration parameters) throws Exception {
|
|
|
+ super.open(parameters);
|
|
|
+ MapStateDescriptor<String, String> descriptor = new MapStateDescriptor<>("mapState", TypeInformation.of(String.class), TypeInformation.of(String.class));
|
|
|
+ mapState = getRuntimeContext().getMapState(descriptor);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close() throws Exception {
|
|
|
+ super.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void processElement(Tuple2<String, JSONObject> value, Context ctx, Collector<String> out) throws Exception {
|
|
|
+ JSONObject object = value.f1;
|
|
|
+ String ruleId = object.getString("rule_id");
|
|
|
+ String model = object.getString("model");
|
|
|
+ boolean flag = "1".equals(object.getString("flag"));
|
|
|
+ String type = object.getString("type");
|
|
|
+ String appType = object.getString("appType");
|
|
|
+
|
|
|
+ if (mapState.isEmpty()) {
|
|
|
+ putMapState(value, ctx, object, ruleId, model, flag, type, appType);
|
|
|
+ mapState.put(ruleId, Md5Util.getMd5(value.f1.toString()));
|
|
|
+ } else {
|
|
|
+ ArrayList<String> arrayList = new ArrayList<>();
|
|
|
+ mapState.keys().forEach(arrayList::add);
|
|
|
+ if (!arrayList.contains(ruleId)) {
|
|
|
+ putMapState(value, ctx, object, ruleId, model, flag, type, appType);
|
|
|
+ } else {
|
|
|
+ String s = mapState.get(ruleId);
|
|
|
+ if (!s.equals(Md5Util.getMd5(object.toString()))) {
|
|
|
+ putMapState(value, ctx, object, ruleId, model, flag, type, appType);
|
|
|
+ } else {
|
|
|
+ System.out.println("扫描到一样的规则");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void putMapState(Tuple2<String, JSONObject> value, KeyedProcessFunction<String, Tuple2<String, JSONObject>, String>.Context ctx, JSONObject object, String ruleId, String model, boolean flag, String type, String appType) throws Exception {
|
|
|
+ if ("及时性(数据采集)".equals(model)) {
|
|
|
+ JSONObject jsonObject = JSONObject.parseObject(object.getString("mes"));
|
|
|
+ int inTime = jsonObject.getInteger("in");
|
|
|
+ ctx.output(Descriptors.IN_TIME_RULE_1, new Timeliness(ruleId, type, inTime, flag, appType));
|
|
|
+ } else if ("及时性(数据入库)".equals(model)) {
|
|
|
+ JSONObject jsonObject = JSONObject.parseObject(object.getString("mes"));
|
|
|
+ int outTime = jsonObject.getInteger("out");
|
|
|
+ ctx.output(Descriptors.OUT_TIME_RULE_1, new Timeliness(ruleId, type, outTime, flag, appType));
|
|
|
+ } else if ("完整性".equals(model)) {
|
|
|
+ List<String> mes = JSONArray.parseArray(object.getString("mes"), String.class);
|
|
|
+ ctx.output(Descriptors.COMPLETE_RULE_1, new Integrity(ruleId, type, mes, flag, appType));
|
|
|
+ } else if ("一致性".equals(model)) {
|
|
|
+ List<JSONObject> mes = JSONArray.parseArray(object.getString("mes"), JSONObject.class);
|
|
|
+ ctx.output(Descriptors.UNIFORMITY_RULE_1, new Uniformity(ruleId, type, mes, flag, appType));
|
|
|
+ } else if ("准确性".equals(model)) {
|
|
|
+ ctx.output(Descriptors.CURRENT_RULE_1, value.f1);
|
|
|
+ }
|
|
|
+ mapState.put(ruleId, Md5Util.getMd5(value.f1.toString()));
|
|
|
+ System.out.println("成功加入缓存");
|
|
|
+ }
|
|
|
+ }).setParallelism(1);
|
|
|
+ BroadcastStream<Timeliness> broadcast2 = outputStreamOperator1.getSideOutput(Descriptors.IN_TIME_RULE_1).broadcast(Descriptors.IN_TIME_RULE);
|
|
|
+ BroadcastStream<Timeliness> broadcast3 = outputStreamOperator1.getSideOutput(Descriptors.OUT_TIME_RULE_1).broadcast(Descriptors.OUT_TIME_RULE);
|
|
|
+ BroadcastStream<Integrity> broadcastStream = outputStreamOperator1.getSideOutput(Descriptors.COMPLETE_RULE_1).broadcast(Descriptors.COMPLETE_RULE);
|
|
|
+ BroadcastStream<Uniformity> uniformityBroadcastStream = outputStreamOperator1.getSideOutput(Descriptors.UNIFORMITY_RULE_1).broadcast(Descriptors.UNIFORMITY_RULE);
|
|
|
+ DataStream<JSONObject> current = outputStreamOperator1.getSideOutput(Descriptors.CURRENT_RULE_1);
|
|
|
+ //进行数据采集时间及时性判断
|
|
|
+ SingleOutputStreamOperator<Tuple5<String, String, Integer, String, String>> result2 = process3.connect(broadcast2).process(new BroadcastProcessFunction<List<Tuple4<String, String, Long, String>>, Timeliness, Tuple5<String, String, Integer, String, String>>() {
|
|
|
+ @Override
|
|
|
+ public void processElement(List<Tuple4<String, String, Long, String>> value, ReadOnlyContext ctx, Collector<Tuple5<String, String, Integer, String, String>> out) throws Exception {
|
|
|
+ ReadOnlyBroadcastState<String, List<Timeliness>> broadcastState = ctx.getBroadcastState(Descriptors.IN_TIME_RULE);
|
|
|
+ JSONArray inArray = new JSONArray();
|
|
|
+ int countAll = value.size();
|
|
|
+ for (Tuple4<String, String, Long, String> tuple2 : value) {
|
|
|
+ String[] s = tuple2.f0.split("_");
|
|
|
+ if (broadcastState.contains(getMeterName(s[0]))) {
|
|
|
+ List<Timeliness> timelinessList = broadcastState.get(getMeterName(s[0]));
|
|
|
+ JSONObject inJson = JSONObject.parseObject(tuple2.f1);
|
|
|
+ for (Timeliness item : timelinessList) {
|
|
|
+ if (item.getAppType().equals(tuple2.f3)) {
|
|
|
+ if (tuple2.f2 - inJson.getLong("eventTime") > item.getCollectTime()) {
|
|
|
+ if (inJson.containsKey("ruleId")) {
|
|
|
+ String ruleId = inJson.getString("ruleId");
|
|
|
+ inJson.put("ruleId", ruleId + "_" + item.getRuleId());
|
|
|
+ } else {
|
|
|
+ inJson.put("ruleId", item.getRuleId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (inJson.containsKey("ruleId")) {
|
|
|
+ inArray.add(inJson);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //输出准确率 + 异常数据集合
|
|
|
+ JSONObject jsonObject = new JSONObject();
|
|
|
+ jsonObject.put("记录及时性(数据采集)", getPercent(countAll - inArray.size(), countAll));
|
|
|
+ if (inArray.size() > 0) {
|
|
|
+ out.collect(new Tuple5<>(value.get(0).f0, jsonObject.toString(), countAll, inArray.toString(), value.get(0).f3));
|
|
|
+ } else {
|
|
|
+ out.collect(new Tuple5<>(value.get(0).f0, jsonObject.toString(), countAll, null, value.get(0).f3));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void processBroadcastElement(Timeliness value, Context ctx, Collector<Tuple5<String, String, Integer, String, String>> out) throws Exception {
|
|
|
+ BroadcastState<String, List<Timeliness>> ctxBroadcastState = ctx.getBroadcastState(Descriptors.IN_TIME_RULE);
|
|
|
+ if (value.isFlag()) {
|
|
|
+ if (ctxBroadcastState.contains(value.getType())) {
|
|
|
+ List<Timeliness> timelines = ctxBroadcastState.get(value.getType());
|
|
|
+ timelines.removeIf(item -> item.getRuleId().equals(value.getRuleId()));
|
|
|
+ timelines.add(value);
|
|
|
+ ctxBroadcastState.put(value.getType(), timelines);
|
|
|
+ } else {
|
|
|
+ ArrayList<Timeliness> arrayList = new ArrayList<>();
|
|
|
+ arrayList.add(value);
|
|
|
+ ctxBroadcastState.put(value.getType(), arrayList);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if (ctxBroadcastState.contains(value.getType())) {
|
|
|
+ List<Timeliness> timelines = ctxBroadcastState.get(value.getType());
|
|
|
+ timelines.removeIf(timeliness1 -> timeliness1.toString().equals(value.toString()));
|
|
|
+ ctxBroadcastState.put(value.getType(), timelines);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ //进行数据输出及时性判断
|
|
|
+ SingleOutputStreamOperator<Tuple5<String, String, Integer, String, String>> result = process3.connect(broadcast3).process(new BroadcastProcessFunction<List<Tuple4<String, String, Long, String>>, Timeliness, Tuple5<String, String, Integer, String, String>>() {
|
|
|
+ @Override
|
|
|
+ public void processElement(List<Tuple4<String, String, Long, String>> value, ReadOnlyContext ctx, Collector<Tuple5<String, String, Integer, String, String>> out) throws Exception {
|
|
|
+ ReadOnlyBroadcastState<String, List<Timeliness>> broadcastState = ctx.getBroadcastState(Descriptors.OUT_TIME_RULE);
|
|
|
+ Long timestamp = ctx.timestamp();
|
|
|
+ JSONArray outArray = new JSONArray();
|
|
|
+ int countAll = value.size();
|
|
|
+ for (Tuple4<String, String, Long, String> tuple2 : value) {
|
|
|
+ String[] s = tuple2.f0.split("_");
|
|
|
+ if (broadcastState.contains(getMeterName(s[0]))) {
|
|
|
+ List<Timeliness> timelinessList = broadcastState.get(getMeterName(s[0]));
|
|
|
+ JSONObject outJson = JSONObject.parseObject(tuple2.f1);
|
|
|
+ for (Timeliness item : timelinessList) {
|
|
|
+ if (item.getAppType().equals(tuple2.f3)) {
|
|
|
+ long l = ctx.currentProcessingTime();
|
|
|
+ if (l - timestamp > item.getCollectTime()) {
|
|
|
+ if (outJson.containsKey("ruleId")) {
|
|
|
+ String ruleId = outJson.getString("ruleId");
|
|
|
+ outJson.put("ruleId", ruleId + "_" + item.getRuleId());
|
|
|
+ } else {
|
|
|
+ outJson.put("ruleId", item.getRuleId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (outJson.containsKey("ruleId")) {
|
|
|
+ outArray.add(outJson);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //输出准确率 + 异常数据集合
|
|
|
+ JSONObject jsonObject = new JSONObject();
|
|
|
+ jsonObject.put("记录及时性(数据入库)", getPercent(countAll - outArray.size(), countAll));
|
|
|
+ if (outArray.size() > 0) {
|
|
|
+ out.collect(new Tuple5<>(value.get(0).f0, jsonObject.toString(), countAll, outArray.toString(), value.get(0).f3));
|
|
|
+ } else {
|
|
|
+ out.collect(new Tuple5<>(value.get(0).f0, jsonObject.toString(), countAll, null, value.get(0).f3));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void processBroadcastElement(Timeliness value, Context ctx, Collector<Tuple5<String, String, Integer, String, String>> out) throws Exception {
|
|
|
+ BroadcastState<String, List<Timeliness>> ctxBroadcastState = ctx.getBroadcastState(Descriptors.OUT_TIME_RULE);
|
|
|
+ if (value.isFlag()) {
|
|
|
+ if (ctxBroadcastState.contains(value.getType())) {
|
|
|
+ List<Timeliness> timelines = ctxBroadcastState.get(value.getType());
|
|
|
+ timelines.removeIf(item -> item.getRuleId().equals(value.getRuleId()));
|
|
|
+ timelines.add(value);
|
|
|
+ ctxBroadcastState.put(value.getType(), timelines);
|
|
|
+ } else {
|
|
|
+ ArrayList<Timeliness> arrayList = new ArrayList<>();
|
|
|
+ arrayList.add(value);
|
|
|
+ ctxBroadcastState.put(value.getType(), arrayList);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if (ctxBroadcastState.contains(value.getType())) {
|
|
|
+ List<Timeliness> timelines = ctxBroadcastState.get(value.getType());
|
|
|
+ timelines.removeIf(timeliness1 -> timeliness1.toString().equals(value.toString()));
|
|
|
+ ctxBroadcastState.put(value.getType(), timelines);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ //完整性
|
|
|
+ SingleOutputStreamOperator<Tuple5<String, String, Integer, String, String>> result3 = process3.connect(broadcastStream).process(new BroadcastProcessFunction<List<Tuple4<String, String, Long, String>>, Integrity, Tuple5<String, String, Integer, String, String>>() {
|
|
|
+ @Override
|
|
|
+ public void processElement(List<Tuple4<String, String, Long, String>> value, ReadOnlyContext ctx, Collector<Tuple5<String, String, Integer, String, String>> out) throws Exception {
|
|
|
+ ReadOnlyBroadcastState<String, List<Integrity>> broadcastState = ctx.getBroadcastState(Descriptors.COMPLETE_RULE);
|
|
|
+ int filedCount = 0;
|
|
|
+ int countError = 0;
|
|
|
+ JSONArray array = new JSONArray();
|
|
|
+ int countAll = value.size();
|
|
|
+ for (Tuple4<String, String, Long, String> tuple2 : value) {
|
|
|
+ String[] s = tuple2.f0.split("_");
|
|
|
+ JSONObject jsonObject = JSONObject.parseObject(tuple2.f1);
|
|
|
+ Set<String> keySet = jsonObject.keySet();
|
|
|
+ filedCount = filedCount + keySet.size();
|
|
|
+ if (broadcastState.contains(getMeterName(s[0]))) {
|
|
|
+ List<Integrity> timelinessList = broadcastState.get(getMeterName(s[0]));
|
|
|
+ for (Integrity item : timelinessList) {
|
|
|
+ if (item.getAppType().equals(tuple2.f3)) {
|
|
|
+ List<String> fields = item.getFields();
|
|
|
+ for (String field : fields) {
|
|
|
+ if (keySet.contains(field)) {
|
|
|
+ if (null == jsonObject.get(field) || jsonObject.getString(field).isEmpty() || jsonObject.getString(field).trim().isEmpty() || "null".equals(jsonObject.getString(field)) || "NULL".equals(jsonObject.getString(field))) {
|
|
|
+ jsonObject.putIfAbsent(field, null);
|
|
|
+ countError++;
|
|
|
+ if (jsonObject.containsKey("ruleId")) {
|
|
|
+ String ruleId = jsonObject.getString("ruleId");
|
|
|
+ jsonObject.put("ruleId", ruleId + "_" + item.getRuleId());
|
|
|
+ } else {
|
|
|
+ jsonObject.put("ruleId", item.getRuleId());
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ countError++;
|
|
|
+ if (jsonObject.containsKey("ruleId")) {
|
|
|
+ String ruleId = jsonObject.getString("ruleId");
|
|
|
+ jsonObject.put("ruleId", ruleId + "_" + item.getRuleId());
|
|
|
+ } else {
|
|
|
+ jsonObject.put("ruleId", item.getRuleId());
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (jsonObject.containsKey("ruleId")) {
|
|
|
+ array.add(jsonObject);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ JSONObject jsonObject = new JSONObject();
|
|
|
+ jsonObject.put("字段完整性", getPercent(filedCount - countError, filedCount));
|
|
|
+ jsonObject.put("记录完整性", getPercent(countAll - array.size(), countAll));
|
|
|
+ if (array.size() > 0) {
|
|
|
+ out.collect(new Tuple5<>(value.get(0).f0, jsonObject.toString(), countAll, array.toString(SerializerFeature.WriteMapNullValue), value.get(0).f3));
|
|
|
+ } else {
|
|
|
+ out.collect(new Tuple5<>(value.get(0).f0, jsonObject.toString(), countAll, null, value.get(0).f3));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void processBroadcastElement(Integrity value, Context ctx, Collector<Tuple5<String, String, Integer, String, String>> out) throws Exception {
|
|
|
+ BroadcastState<String, List<Integrity>> broadcastState = ctx.getBroadcastState(Descriptors.COMPLETE_RULE);
|
|
|
+ if (value.isFlag()) {
|
|
|
+ if (broadcastState.contains(value.getType())) {
|
|
|
+ List<Integrity> timelines = broadcastState.get(value.getType());
|
|
|
+ timelines.removeIf(item -> item.getRuleId().equals(value.getRuleId()));
|
|
|
+ timelines.add(value);
|
|
|
+ broadcastState.put(value.getType(), timelines);
|
|
|
+ } else {
|
|
|
+ ArrayList<Integrity> arrayList = new ArrayList<>();
|
|
|
+ arrayList.add(value);
|
|
|
+ broadcastState.put(value.getType(), arrayList);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if (broadcastState.contains(value.getType())) {
|
|
|
+ List<Integrity> timelines = broadcastState.get(value.getType());
|
|
|
+ timelines.removeIf(timeliness1 -> timeliness1.toString().equals(value.toString()));
|
|
|
+ broadcastState.put(value.getType(), timelines);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ //一致性
|
|
|
+ SingleOutputStreamOperator<Tuple3<String, List<Tuple3<String, String, String>>, String>> process2 = process3.connect(uniformityBroadcastStream).process(new BroadcastProcessFunction<List<Tuple4<String, String, Long, String>>, Uniformity, Tuple3<String, List<Tuple3<String, String, String>>, String>>() {
|
|
|
+ @Override
|
|
|
+ public void processElement(List<Tuple4<String, String, Long, String>> value, ReadOnlyContext ctx, Collector<Tuple3<String, List<Tuple3<String, String, String>>, String>> out) throws Exception {
|
|
|
+ ArrayList<Tuple3<String, String, String>> arrayList = new ArrayList<>();
|
|
|
+ ReadOnlyBroadcastState<String, List<Uniformity>> broadcastState = ctx.getBroadcastState(Descriptors.UNIFORMITY_RULE);
|
|
|
+ for (Tuple4<String, String, Long, String> tuple2 : value) {
|
|
|
+ String[] s = tuple2.f0.split("_");
|
|
|
+ JSONObject jsonObject = JSONObject.parseObject(tuple2.f1);
|
|
|
+ if (broadcastState.contains(getMeterName(s[0]))) {
|
|
|
+ JSONObject keys = new JSONObject();
|
|
|
+ List<Uniformity> timelinessList = broadcastState.get(getMeterName(s[0]));
|
|
|
+ for (Uniformity item : timelinessList) {
|
|
|
+ if (item.getAppType().equals(tuple2.f3)) {
|
|
|
+ List<JSONObject> fields = item.getFields();
|
|
|
+ for (JSONObject field : fields) {
|
|
|
+ if (jsonObject.containsKey(field.getString("dataField")) && (null != jsonObject.get(field.getString("dataField")))) {
|
|
|
+ if (field.containsKey("dataType")) {
|
|
|
+ keys.put(item.getRuleId() + "-" + field.getString("dataField") + "-" + field.getString("dataType"), jsonObject.get(field.getString("dataField")));
|
|
|
+ } else {
|
|
|
+ keys.put(item.getRuleId() + "-" + field.getString("dataField") + "-" + "all", jsonObject.get(field.getString("dataField")));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ arrayList.add(new Tuple3<>(tuple2.f0, tuple2.f1, keys.toString()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (arrayList.isEmpty()) {
|
|
|
+ out.collect(new Tuple3<>(value.get(0).f0 + "_" + value.size(), arrayList, value.get(0).f3));
|
|
|
+ } else {
|
|
|
+ out.collect(new Tuple3<>(value.get(0).f0, arrayList, value.get(0).f3));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void processBroadcastElement(Uniformity value, Context ctx, Collector<Tuple3<String, List<Tuple3<String, String, String>>, String>> out) throws Exception {
|
|
|
+ BroadcastState<String, List<Uniformity>> broadcastState = ctx.getBroadcastState(Descriptors.UNIFORMITY_RULE);
|
|
|
+ if (value.isFlag()) {
|
|
|
+ if (broadcastState.contains(value.getType())) {
|
|
|
+ List<Uniformity> timelines = broadcastState.get(value.getType());
|
|
|
+ timelines.removeIf(item -> item.getRuleId().equals(value.getRuleId()));
|
|
|
+ timelines.add(value);
|
|
|
+ broadcastState.put(value.getType(), timelines);
|
|
|
+ } else {
|
|
|
+ ArrayList<Uniformity> arrayList = new ArrayList<>();
|
|
|
+ arrayList.add(value);
|
|
|
+ broadcastState.put(value.getType(), arrayList);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if (broadcastState.contains(value.getType())) {
|
|
|
+ List<Uniformity> timelines = broadcastState.get(value.getType());
|
|
|
+ timelines.removeIf(timeliness1 -> timeliness1.toString().equals(value.toString()));
|
|
|
+ broadcastState.put(value.getType(), timelines);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ //输出一致性结果
|
|
|
+ SingleOutputStreamOperator<Tuple5<String, String, Integer, String, String>> result4 = process2.map(new MapFunction<Tuple3<String, List<Tuple3<String, String, String>>, String>, Tuple5<String, String, Integer, String, String>>() {
|
|
|
+ @Override
|
|
|
+ public Tuple5<String, String, Integer, String, String> map(Tuple3<String, List<Tuple3<String, String, String>>, String> value) throws Exception {
|
|
|
+ int filedCount = 0;
|
|
|
+ int countError = 0;
|
|
|
+ JSONArray array = new JSONArray();
|
|
|
+ int countAll = value.f1.size();
|
|
|
+ for (Tuple3<String, String, String> tuple3 : value.f1) {
|
|
|
+ JSONObject input = JSONObject.parseObject(tuple3.f2);
|
|
|
+ JSONObject output = JSONObject.parseObject(tuple3.f1);
|
|
|
+ filedCount = filedCount + output.keySet().size();
|
|
|
+ for (String s : input.keySet()) {
|
|
|
+ String[] split = s.split("-");
|
|
|
+ String ruleId = split[0];
|
|
|
+ String key = split[1];
|
|
|
+ String type = split[2];
|
|
|
+ if (output.containsKey(key)) {
|
|
|
+ if (!(output.get(key).equals(input.get(s)) && DataType.getType(output.get(key)).get("类型").equals(type)) && (!"all".equals(type))) {
|
|
|
+ countError++;
|
|
|
+ if (output.containsKey("ruleId")) {
|
|
|
+ output.put("ruleId", output.getString("ruleId") + "_" + ruleId);
|
|
|
+ } else {
|
|
|
+ output.put("ruleId", ruleId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (output.containsKey("ruleId")) {
|
|
|
+ array.add(output);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ JSONObject jsonObject = new JSONObject();
|
|
|
+ jsonObject.put("字段一致性", getPercent(filedCount - countError, filedCount));
|
|
|
+ jsonObject.put("记录一致性", getPercent(countAll - array.size(), countAll));
|
|
|
+ if (array.size() > 0) {
|
|
|
+ return new Tuple5<>(value.f0, jsonObject.toString(), countAll, array.toString(), value.f2);
|
|
|
+ } else if (value.f1.isEmpty()) {
|
|
|
+ String[] split = value.f0.split("_");
|
|
|
+ return new Tuple5<>(split[0], "{\"记录一致性\":\"100.00%\",\"字段一致性\":\"100.00%\"}", Integer.parseInt(split[2]), null, value.f2);
|
|
|
+ } else {
|
|
|
+ return new Tuple5<>(value.f0, jsonObject.toString(), countAll, null, value.f2);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+
|
|
|
+ //准确性
|
|
|
+ BroadcastStream<RuleBase> broadcast1 = current.keyBy((KeySelector<JSONObject, String>) value -> value.getString("model")).process(new KeyedProcessFunction<String, JSONObject, RuleBase>() {
|
|
|
+ private transient MapState<String, DroolsRule> mapState;
|
|
|
+ private boolean flag;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void open(Configuration parameters) throws Exception {
|
|
|
+ MapStateDescriptor<String, DroolsRule> descriptor = new MapStateDescriptor<>("mapState", TypeInformation.of(String.class), TypeInformation.of(DroolsRule.class));
|
|
|
+ mapState = getRuntimeContext().getMapState(descriptor);
|
|
|
+ flag = true;
|
|
|
+ super.open(parameters);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void processElement(JSONObject tuple2, Context ctx, Collector<RuleBase> out) throws Exception {
|
|
|
+ STGroup stg = new STGroupFile(Descriptors.RULE_ADDRESS);
|
|
|
+ if (flag) {
|
|
|
+ ST sqlTemplate = stg.getInstanceOf("sqlTemplate");
|
|
|
+ String ruleId = "0000000001";
|
|
|
+ //计算表达式
|
|
|
+ String expression = "(ai_faxlet<-1000000)";
|
|
|
+ String type = "BasTefMes";
|
|
|
+ String state = "1";
|
|
|
+ sqlTemplate.add("ruleId", ruleId);
|
|
|
+ sqlTemplate.add("expression", expression);
|
|
|
+ sqlTemplate.add("class", type);
|
|
|
+ String script = sqlTemplate.render();
|
|
|
+ mapState.put(ruleId, new DroolsRule(ruleId, "logrules", script, state));
|
|
|
+ flag = false;
|
|
|
+ }
|
|
|
+ JSONObject jsonObject = JSONObject.parseObject(tuple2.getString("mes"));
|
|
|
+ ST sqlTemplate = stg.getInstanceOf("sqlTemplate");
|
|
|
+ String ruleId = tuple2.getString("rule_id");
|
|
|
+ String expression = jsonObject.getString("expression");
|
|
|
+ String type = tuple2.getString("type");
|
|
|
+ String state = tuple2.getString("flag");
|
|
|
+ sqlTemplate.add("ruleId", ruleId);
|
|
|
+ sqlTemplate.add("expression", expression);
|
|
|
+ sqlTemplate.add("class", type);
|
|
|
+ String script = sqlTemplate.render();
|
|
|
+ if ("1".equals(state)) {
|
|
|
+ mapState.put(ruleId, new DroolsRule(ruleId, "logrules", script, state));
|
|
|
+ } else {
|
|
|
+ if (!mapState.isEmpty()) {
|
|
|
+ if (mapState.contains(ruleId)) {
|
|
|
+ mapState.remove(ruleId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ int count = 0;
|
|
|
+ if (!mapState.isEmpty()) {
|
|
|
+ Iterable<DroolsRule> values = mapState.values();
|
|
|
+ JSONArray array = new JSONArray();
|
|
|
+ for (DroolsRule value : values) {
|
|
|
+ count++;
|
|
|
+ array.add(value);
|
|
|
+ }
|
|
|
+ RuleBase ruleBase = RuleBase.createRuleBase(array);
|
|
|
+ out.collect(ruleBase);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }).setParallelism(1).broadcast(Descriptors.RULE_STATE_DESCRIPTOR);
|
|
|
+ //获取初始化规则
|
|
|
+ RuleBase ruleBase = getInitRuleBase();
|
|
|
+ logger.debug("第一次初始化规则 : " + ruleBase.toString());
|
|
|
+
|
|
|
+ SingleOutputStreamOperator<Tuple5<String, String, Integer, String, String>> result5 = process3.connect(broadcast1).process(new NormalRuleProcess(ruleBase));
|
|
|
+
|
|
|
+ DataStream<Tuple5<String, String, Integer, String, String>> unionRes = result2.union(result3).union(result4).union(result5).union(result);
|
|
|
+ SingleOutputStreamOperator<Tuple6<String, String, Integer, String, String, String>> operator = unionRes.map(new MapFunction<Tuple5<String, String, Integer, String, String>, Tuple6<String, String, Integer, String, String, String>>() {
|
|
|
+ @Override
|
|
|
+ public Tuple6<String, String, Integer, String, String, String> map(Tuple5<String, String, Integer, String, String> value) throws Exception {
|
|
|
+ return new Tuple6<>(value.f0, value.f1, value.f2, value.f3, UUID.randomUUID().toString(), value.f4);
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ operator.addSink(new RichSinkFunction<Tuple6<String, String, Integer, String, String, String>>() {
|
|
|
+ private Connection con;
|
|
|
+ private PreparedStatement statement;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void open(Configuration parameters) throws Exception {
|
|
|
+ super.open(parameters);
|
|
|
+ con = MysqlUtil.getCon();
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close() throws Exception {
|
|
|
+ super.close();
|
|
|
+ if (statement != null) {
|
|
|
+ statement.close();
|
|
|
+ }
|
|
|
+ if (con != null) {
|
|
|
+ con.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void invoke(Tuple6<String, String, Integer, String, String, String> value, Context context) throws Exception {
|
|
|
+ Date date = new Date();
|
|
|
+ SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
|
|
+ String dateStr = format.format(date);
|
|
|
+ String[] s = value.f0.split("_");
|
|
|
+ String device = s[0];
|
|
|
+ String appType = value.f5;
|
|
|
+ JSONObject jsonObject = JSONObject.parseObject(value.f1);
|
|
|
+ int count = value.f2;
|
|
|
+ String resMes = value.f3;
|
|
|
+ String uuid = value.f4;
|
|
|
+ char e = '%';
|
|
|
+ try {
|
|
|
+ String sql = "insert into governance_result (app_type,model,field_res,record_res,rule_res,mes_count,create_time,uuid,device_type) values (?,?,?,?,?,?,?,?,?) ON CONFLICT(app_type,model,create_time,device_type) DO NOTHING";
|
|
|
+ statement = con.prepareStatement(sql);
|
|
|
+ statement.setString(1, appType);
|
|
|
+ if (jsonObject.containsKey("记录一致性")) {
|
|
|
+ Float recordRes = deleteChar(jsonObject.getString("记录一致性"), e);
|
|
|
+ Float fieldRes = deleteChar(jsonObject.getString("字段一致性"), e);
|
|
|
+ statement.setString(2, "一致性");
|
|
|
+ statement.setFloat(3, fieldRes);
|
|
|
+ statement.setFloat(4, recordRes);
|
|
|
+ statement.setString(5, resMes);
|
|
|
+ } else if (jsonObject.containsKey("记录完整性")) {
|
|
|
+ Float recordRes = deleteChar(jsonObject.getString("记录完整性"), e);
|
|
|
+ Float fieldRes = deleteChar(jsonObject.getString("字段完整性"), e);
|
|
|
+ statement.setString(2, "完整性");
|
|
|
+ statement.setFloat(3, fieldRes);
|
|
|
+ statement.setFloat(4, recordRes);
|
|
|
+ statement.setString(5, resMes);
|
|
|
+ } else if (jsonObject.containsKey("记录准确性")) {
|
|
|
+ Float recordRes = deleteChar(jsonObject.getString("记录准确性"), e);
|
|
|
+ Float fieldRes = deleteChar(jsonObject.getString("字段准确性"), e);
|
|
|
+ statement.setString(2, "准确性");
|
|
|
+ statement.setFloat(3, fieldRes);
|
|
|
+ statement.setFloat(4, recordRes);
|
|
|
+ statement.setString(5, resMes);
|
|
|
+ } else if (jsonObject.containsKey("记录及时性(数据入库)")) {
|
|
|
+ statement.setString(2, "及时性(数据入库)");
|
|
|
+ Float out = deleteChar(jsonObject.getString("记录及时性(数据入库)"), e);
|
|
|
+ statement.setFloat(3, -1);
|
|
|
+ statement.setFloat(4, out);
|
|
|
+ statement.setString(5, resMes);
|
|
|
+ } else if (jsonObject.containsKey("记录及时性(数据采集)")) {
|
|
|
+ statement.setString(2, "及时性(数据采集)");
|
|
|
+ Float in = deleteChar(jsonObject.getString("记录及时性(数据采集)"), e);
|
|
|
+ statement.setFloat(3, -1);
|
|
|
+ statement.setFloat(4, in);
|
|
|
+ statement.setString(5, resMes);
|
|
|
+ }
|
|
|
+ statement.setInt(6, count);
|
|
|
+ statement.setString(7, dateStr);
|
|
|
+ statement.setString(8, uuid);
|
|
|
+ statement.setString(9, device);
|
|
|
+ statement.addBatch();
|
|
|
+ statement.executeBatch();
|
|
|
+ con.commit();
|
|
|
+ } catch (Exception e1) {
|
|
|
+ System.out.println("存储postgres规则结果异常");
|
|
|
+ e1.printStackTrace();
|
|
|
+ con = MysqlUtil.getCon2();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ operator.filter(new FilterFunction<Tuple6<String, String, Integer, String, String, String>>() {
|
|
|
+ @Override
|
|
|
+ public boolean filter(Tuple6<String, String, Integer, String, String, String> value) throws Exception {
|
|
|
+ return null != value.f3;
|
|
|
+ }
|
|
|
+ }).flatMap(new FlatMapFunction<Tuple6<String, String, Integer, String, String, String>, Tuple3<String, String, String>>() {
|
|
|
+ @Override
|
|
|
+ public void flatMap(Tuple6<String, String, Integer, String, String, String> value, Collector<Tuple3<String, String, String>> out) throws Exception {
|
|
|
+ JSONArray jsonArray = JSONArray.parseArray(value.f3);
|
|
|
+ for (Object o : jsonArray) {
|
|
|
+ JSONObject jsonObject = JSONObject.parseObject(JSONObject.toJSONString(o, SerializerFeature.WriteMapNullValue));
|
|
|
+
|
|
|
+ if (jsonObject.containsKey("ruleId")) {
|
|
|
+ String ruleIds = jsonObject.getString("ruleId");
|
|
|
+ String[] s = ruleIds.split("_");
|
|
|
+ for (String s1 : s) {
|
|
|
+ out.collect(new Tuple3<>(s1.trim(), jsonObject.toString(SerializerFeature.WriteMapNullValue), value.f4));
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ String ruleIds = jsonObject.getString("flag");
|
|
|
+ String[] s = ruleIds.split("-/");
|
|
|
+ for (String s1 : s) {
|
|
|
+ if (s1.contains("规则:")) {
|
|
|
+ s1 = s1.replace("规则:", "");
|
|
|
+ }
|
|
|
+ out.collect(new Tuple3<>(s1.trim(), jsonObject.toString(), value.f4));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }).addSink(org.apache.flink.connector.jdbc.JdbcSink.sink(
|
|
|
+ "insert into rule_mes (rule_id,rule_mes,create_time,uuid) values (?,?,?,?) ON CONFLICT(rule_id,rule_mes,uuid) DO NOTHING",
|
|
|
+ (ps, t) -> {
|
|
|
+ Date date = new Date();
|
|
|
+ SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
|
|
+ String dateStr = format.format(date);
|
|
|
+ ps.setString(1, t.f0);
|
|
|
+ ps.setString(2, t.f1);
|
|
|
+ ps.setString(3, dateStr);
|
|
|
+ ps.setString(4, t.f2);
|
|
|
+ },
|
|
|
+ new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
|
|
|
+ .withUrl("jdbc:postgresql://192.168.20.72:5432/urtmpdb?characterEncoding=utf-8&serverTimezone=UTC")
|
|
|
+ .withDriverName("org.postgresql.Driver")
|
|
|
+ .withPassword("sw12345")
|
|
|
+ .withUsername("postgres")
|
|
|
+ .build()));
|
|
|
+ env.execute();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 删除某个字符
|
|
|
+ */
|
|
|
+ public static Float deleteChar(String str, char delChar) {
|
|
|
+ StringBuilder delStr = new StringBuilder();
|
|
|
+ char[] bytes = str.toCharArray();
|
|
|
+ int iSize = bytes.length;
|
|
|
+ for (int i = bytes.length - 1; i >= 0; i--) {
|
|
|
+ if (bytes[i] == delChar) {
|
|
|
+ if (iSize - 1 - i >= 0) {
|
|
|
+ System.arraycopy(bytes, i + 1, bytes, i, iSize - 1 - i);
|
|
|
+ }
|
|
|
+ iSize--;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ for (int i = 0; i < iSize; i++) {
|
|
|
+ delStr.append(bytes[i]);
|
|
|
+ }
|
|
|
+ return Float.parseFloat(delStr.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 用来初始化规则,防止广播流规则为空
|
|
|
+ */
|
|
|
+ private static RuleBase getInitRuleBase() throws SQLException {
|
|
|
+ JSONArray array = new JSONArray();
|
|
|
+ STGroup stg = new STGroupFile(Descriptors.RULE_ADDRESS);
|
|
|
+ ST sqlTemplate = stg.getInstanceOf("sqlTemplate");
|
|
|
+ String ruleId = "0000000002";
|
|
|
+ String expression = "(ai_faxlet<-1000000)";
|
|
|
+ String type = "BasTefMes";
|
|
|
+ String state = "1";
|
|
|
+ sqlTemplate.add("ruleId", ruleId);
|
|
|
+ sqlTemplate.add("expression", expression);
|
|
|
+ sqlTemplate.add("class", type);
|
|
|
+ String script = sqlTemplate.render();
|
|
|
+ array.add(new com.sunwin.metro.bean.DroolsRule(ruleId, type, script, state));
|
|
|
+ System.out.println(array);
|
|
|
+ return RuleBase.createRuleBase(array);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 求百分比
|
|
|
+ */
|
|
|
+ public static String getPercent(int x, int y) {
|
|
|
+ double d1 = x * 1.0;
|
|
|
+ double d2 = y * 1.0;
|
|
|
+ // 设置保留几位小数, “.”后面几个零就保留几位小数,这里设置保留四位小数
|
|
|
+ DecimalFormat decimalFormat = new DecimalFormat("##.00%");
|
|
|
+ if (d1 == 0) {
|
|
|
+ return "0%";
|
|
|
+ }
|
|
|
+ return decimalFormat.format(d1 / d2);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static String getMeterName(String name) {
|
|
|
+ switch (name) {
|
|
|
+ case "隧道风机":
|
|
|
+ return "BasTefMes";
|
|
|
+ case "雨水泵":
|
|
|
+ return "BasAtMes";
|
|
|
+ case "排热风机":
|
|
|
+ return "BasTvfMes";
|
|
|
+ case "ups巡检仪":
|
|
|
+ return "UpsXjyMes";
|
|
|
+ case "不间断电源":
|
|
|
+ return "UpsMes";
|
|
|
+ default:
|
|
|
+ return name;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 集合去重
|
|
|
+ */
|
|
|
+ private static <T> Predicate<T> distinctByKey(Function<? super T, ?> keyExtractor) {
|
|
|
+ Map<Object, Boolean> seen = new ConcurrentHashMap<>(16);
|
|
|
+ //putIfAbsent方法添加键值对,如果map集合中没有该key对应的值,则直接添加,并返回null,如果已经存在对应的值,则依旧为原来的值。
|
|
|
+ //如果返回null表示添加数据成功(不重复),不重复(null==null :TRUE)
|
|
|
+ return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|