package eu.radoop.spark;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.spark.InternalAccumulator;
import org.apache.spark.SparkContext;
import org.apache.spark.scheduler.AccumulableInfo;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
import scala.Tuple4;
import scala.collection.JavaConversions;
import scala.collection.Seq;

/* loaded from: input_file:lib/radoop-spark3.jar:eu/radoop/spark/ExecutorGCMonitor.class */
public class ExecutorGCMonitor extends SparkListener {
    private SparkContext sc;
    private Long monitoringStartTime;
    private Integer lookbackSecs;
    private Double gcTreshold;
    private boolean isJobKilled;
    private boolean isListening;
    private ArrayList<String> logArr;
    private Map<String, List<Long[]>> heartBeatHistory;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/radoop-spark3.jar:eu/radoop/spark/ExecutorGCMonitor$HeartBeatInfo.class */
    public static class HeartBeatInfo {
        public String hostname;
        public Long taskId;
        public Integer stageId;
        public Integer attemptId;
        public Long gcTime;
        public long time;

        private HeartBeatInfo() {
        }
    }

    public Integer getLookbackSecs() {
        return this.lookbackSecs;
    }

    public void setLookbackSecs(Integer num) {
        this.lookbackSecs = num;
    }

    public Double getGcTreshold() {
        return this.gcTreshold;
    }

    public void setGcTreshold(Double d) {
        this.gcTreshold = d;
    }

    public boolean isJobKilled() {
        return this.isJobKilled;
    }

    public boolean isListening() {
        return this.isListening;
    }

    private void log(String str) {
        System.out.println(str);
        System.out.flush();
        this.logArr.add(str);
    }

    public ArrayList<String> getLog() {
        return this.logArr;
    }

    public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate sparkListenerExecutorMetricsUpdate) {
        if (this.isJobKilled) {
            return;
        }
        for (Tuple4 tuple4 : JavaConversions.seqAsJavaList(sparkListenerExecutorMetricsUpdate.accumUpdates())) {
            for (AccumulableInfo accumulableInfo : JavaConversions.seqAsJavaList((Seq) tuple4._4())) {
                if (accumulableInfo.name().isDefined() && InternalAccumulator.JVM_GC_TIME().equals(accumulableInfo.name().get())) {
                    HeartBeatInfo heartBeatInfo = new HeartBeatInfo();
                    heartBeatInfo.taskId = (Long) tuple4._1();
                    heartBeatInfo.stageId = (Integer) tuple4._2();
                    heartBeatInfo.attemptId = (Integer) tuple4._3();
                    heartBeatInfo.hostname = sparkListenerExecutorMetricsUpdate.execId();
                    heartBeatInfo.gcTime = accumulableInfo.update().isEmpty() ? 123L : (Long) accumulableInfo.update().get();
                    heartBeatInfo.time = System.currentTimeMillis() - this.monitoringStartTime.longValue();
                    if (!isExecutionHealthy(registerHeartBeat(heartBeatInfo)).booleanValue()) {
                        log("ExecutorGCMonitor: Killing job as executor got into an unhealthy state");
                        System.out.flush();
                        this.isJobKilled = true;
                        this.sc.cancelAllJobs();
                    }
                }
            }
        }
    }

    public ExecutorGCMonitor(Integer num, Double d) {
        this();
        if (num.intValue() < 1) {
            throw new IllegalArgumentException("ExecutorGCMonitor: lookbackSecs must be a positive number");
        }
        if (d.doubleValue() < 0.0d || d.doubleValue() > 1.0d) {
            throw new IllegalArgumentException("ExecutorGCMonitor: gcTreshold must be a number between 0 and 1");
        }
        setLookbackSecs(num);
        setGcTreshold(d);
        log("ExecutorGCMonitor: Setting ExecutorGCMonitor options to lookbackSecs=" + this.lookbackSecs + ", gcTreshold=" + this.gcTreshold);
    }

    public ExecutorGCMonitor() {
        this.lookbackSecs = 300;
        this.gcTreshold = Double.valueOf(0.98d);
        this.isJobKilled = false;
        this.isListening = false;
        this.logArr = new ArrayList<>();
        this.heartBeatHistory = new HashMap();
    }

    public void listen(SparkContext sparkContext) {
        this.sc = sparkContext;
        sparkContext.addSparkListener(this);
        log("ExecutorGCMonitor: Registered Listener. lookbackSecs=" + this.lookbackSecs + ", gcTreshold=" + this.gcTreshold);
        this.monitoringStartTime = Long.valueOf(System.currentTimeMillis());
    }

    private List<Long[]> registerHeartBeat(HeartBeatInfo heartBeatInfo) {
        String str = heartBeatInfo.hostname + "-" + heartBeatInfo.taskId + "-" + heartBeatInfo.stageId + "-" + heartBeatInfo.attemptId;
        Long[] lArr = {Long.valueOf(heartBeatInfo.time), heartBeatInfo.gcTime};
        if (!this.heartBeatHistory.containsKey(str)) {
            this.heartBeatHistory.put(str, new ArrayList());
        }
        List<Long[]> list = this.heartBeatHistory.get(str);
        list.add(lArr);
        return list;
    }

    private Boolean isExecutionHealthy(List<Long[]> list) {
        if (list == null || list.size() == 0) {
            log("Warning: ExecutionGCMonitor.isExecutionHealthy: History is null or  size == 0");
            return true;
        }
        Integer valueOf = Integer.valueOf(list.size());
        Long l = list.get(valueOf.intValue() - 1)[0];
        Long l2 = list.get(valueOf.intValue() - 1)[1];
        Long l3 = l2;
        Long l4 = 0L;
        Integer valueOf2 = Integer.valueOf(this.lookbackSecs.intValue() * 1000);
        for (Integer valueOf3 = Integer.valueOf(valueOf.intValue() - 1); valueOf3.intValue() >= 0 && l4.longValue() < valueOf2.intValue(); valueOf3 = Integer.valueOf(valueOf3.intValue() - 1)) {
            Long l5 = list.get(valueOf3.intValue())[0];
            l3 = list.get(valueOf3.intValue())[1];
            l4 = Long.valueOf(l.longValue() - l5.longValue());
        }
        double longValue = l4.longValue() > 0 ? (l2.longValue() - l3.longValue()) / l4.longValue() : 0.0d;
        if (l4.longValue() < valueOf2.intValue()) {
            log("ExecutorGCMonitor: SKIPCHECK after " + (l4.longValue() / 1000) + " secs, time spent with GC: " + this + ", timeframe: " + (l2.longValue() - l3.longValue()) + ", GC ratio: " + this);
            return true;
        }
        log("ExecutorGCMonitor: GC HEALTH STAT time spent with GC: " + (l2.longValue() - l3.longValue()) + ", timeframe: " + this + ", GC ratio: " + l4);
        return Boolean.valueOf(longValue < this.gcTreshold.doubleValue());
    }
}
