package eu.radoop.spark.hdfs.loop;

import com.rapidminer.operator.IOObject;
import com.rapidminer.operator.IOObjectCollection;
import com.rapidminer.operator.Operator;
import com.rapidminer.operator.UserError;
import eu.radoop.RadoopConf;
import eu.radoop.spark.DebugLogger;
import eu.radoop.spark.ExecutorGCMonitor;
import eu.radoop.spark.InputParams;
import eu.radoop.spark.JobStats;
import eu.radoop.spark.LogRetrievalType;
import eu.radoop.spark.ProcessExceptionWithLogs;
import eu.radoop.spark.PushdownRunner;
import eu.radoop.spark.PushdownTools;
import eu.radoop.spark.RunnerTools;
import eu.radoop.spark.SerializationConverter;
import eu.radoop.spark.Tuple2;
import eu.radoop.spark.hdfs.HdfsFileSystemProvider;
import eu.radoop.transfer.ProcessExceptionTO;
import eu.radoop.transfer.ProcessOutputTO;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.function.Supplier;
import java.util.logging.LogRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

/* loaded from: input_file:lib/radoop-spark3.jar:eu/radoop/spark/hdfs/loop/LoopHdfsDriver.class */
public class LoopHdfsDriver implements Serializable {
    private final LoopHdfsExecutor loopHdfsExecutor;
    private final DebugLogger debugLogger;
    private final transient HdfsTargetFilesProvider hdfsTargetFilesProvider;
    private final transient HdfsFileSystemProvider hdfsFileSystemProvider;
    private final transient PartitionsCalculator calculator;
    private final transient SerializationConverter serializationConverter;

    public LoopHdfsDriver(LoopHdfsExecutor loopHdfsExecutor, HdfsTargetFilesProvider hdfsTargetFilesProvider, HdfsFileSystemProvider hdfsFileSystemProvider, PartitionsCalculator partitionsCalculator, SerializationConverter serializationConverter, DebugLogger debugLogger) {
        this.loopHdfsExecutor = loopHdfsExecutor;
        this.hdfsTargetFilesProvider = hdfsTargetFilesProvider;
        this.hdfsFileSystemProvider = hdfsFileSystemProvider;
        this.calculator = partitionsCalculator;
        this.serializationConverter = serializationConverter;
        this.debugLogger = debugLogger;
    }

    /* JADX WARN: Finally extract failed */
    public void run(InputParams inputParams, JavaSparkContext javaSparkContext) throws Throwable {
        ProcessExceptionTO processExceptionTO;
        FileSystem fs = this.hdfsFileSystemProvider.getFs(javaSparkContext.hadoopConfiguration());
        inputParams.inputDir = RunnerTools.resolveInputDir(javaSparkContext, inputParams.inputDir);
        if (fs.exists(new Path(inputParams.potoDir + "_PROCESS_ERROR_FLAG"))) {
            throw new SparkException("UserError occured. Please check the logs in RapidMiner Studio or the the logs of the first Application attempt.");
        }
        ExecutorGCMonitor executorGCMonitor = new ExecutorGCMonitor();
        try {
            if (inputParams.monitorExecutorGC.booleanValue()) {
                System.out.println("Starting Executor Health Monitoring");
                Integer num = inputParams.gcMonitorLookbackSeconds;
                Double d = inputParams.gcMonitorGcTreshold;
                Iterator<Tuple2<String, String>> it = inputParams.processConfiguration.iterator();
                while (it.hasNext()) {
                    Tuple2<String, String> next = it.next();
                    if (RadoopConf.SPARK_GC_MONITOR_LOOKBACK_SECONDS.equals(next._1)) {
                        num = Integer.valueOf(Integer.parseInt(next._2));
                    }
                    if (RadoopConf.SPARK_GC_MONITOR_TRESHOLD.equals(next._1)) {
                        d = Double.valueOf(Double.parseDouble(next._2));
                    }
                }
                executorGCMonitor.setLookbackSecs(num);
                executorGCMonitor.setGcTreshold(d);
                executorGCMonitor.listen(javaSparkContext.sc());
            } else {
                System.out.println("Executor Health Monitoring disabled");
            }
            PushdownRunner.OutputParams runJob = runJob(inputParams, javaSparkContext, fs);
            ProcessOutputTO processOutputTO = new ProcessOutputTO(runJob.partitionToAMD, runJob.ioObjects, runJob.logRecords, runJob.macros);
            processOutputTO.setPartitionErrors(runJob.partitionErrors);
            processOutputTO.setJobStats(runJob.jobStats);
            javaSparkContext.parallelize(List.of(processOutputTO.toJson()), 1).saveAsTextFile(inputParams.potoDir);
        } catch (Throwable th) {
            th = th;
            Map<String, List<LogRecord>> map = null;
            ProcessExceptionWithLogs extractOriginalProcessExceptionWithLogs = PushdownTools.extractOriginalProcessExceptionWithLogs(th);
            boolean z = false;
            if (extractOriginalProcessExceptionWithLogs != null) {
                map = extractOriginalProcessExceptionWithLogs.logRecords;
                processExceptionTO = extractOriginalProcessExceptionWithLogs.processExceptionTO;
                th = processExceptionTO.getProcessException();
                if (th instanceof UserError) {
                    z = true;
                }
            } else if (executorGCMonitor.isJobKilled()) {
                System.out.println("GC Monitor killed the application or OOM. Returning with exception.");
                javaSparkContext.sc().requestExecutors(1);
                processExceptionTO = new ProcessExceptionTO(new UserError((Operator) null, ProcessExceptionTO.ERROR_CODE_OOM));
            } else if (th.getMessage() != null && th.getMessage().contains("ExecutorLostFailure")) {
                processExceptionTO = new ProcessExceptionTO(new Exception("Process Pushdown possibly ran out of memory on the cluster."), th.getStackTrace());
            } else if (th instanceof UserError) {
                processExceptionTO = new ProcessExceptionTO(th);
                z = true;
            } else {
                processExceptionTO = new ProcessExceptionTO(new Exception(th.toString()), th.getStackTrace());
            }
            ProcessOutputTO processOutputTO2 = new ProcessOutputTO(map, processExceptionTO);
            ArrayList arrayList = new ArrayList(1);
            arrayList.add(processOutputTO2.toJson());
            if (javaSparkContext != null) {
                javaSparkContext.parallelize(arrayList, 1).saveAsTextFile(inputParams.potoDir);
            }
            if (z) {
                FSDataOutputStream fSDataOutputStream = null;
                try {
                    fSDataOutputStream = fs.create(new Path(inputParams.potoDir + "_PROCESS_ERROR_FLAG"));
                    if (fSDataOutputStream != null) {
                        fSDataOutputStream.close();
                    }
                } catch (Throwable th2) {
                    if (fSDataOutputStream != null) {
                        fSDataOutputStream.close();
                    }
                    throw th2;
                }
            }
            throw th;
        }
    }

    private PushdownRunner.OutputParams runJob(InputParams inputParams, JavaSparkContext javaSparkContext, FileSystem fileSystem) throws IOException {
        debug("runJob start");
        PushdownRunner.OutputParams outputParams = new PushdownRunner.OutputParams();
        JobStats jobStats = new JobStats();
        String[] targetHdfsFiles = this.hdfsTargetFilesProvider.getTargetHdfsFiles(inputParams, fileSystem);
        int length = targetHdfsFiles.length;
        debug("files: " + length);
        jobStats.setInputSize(length);
        int estimateNumberOfPartitions = this.calculator.estimateNumberOfPartitions(length, inputParams.numberOfPartitions);
        debug("Executing job using " + estimateNumberOfPartitions + " partitions");
        JavaRDD parallelize = javaSparkContext.parallelize(Arrays.asList(targetHdfsFiles), estimateNumberOfPartitions);
        debug("filesRDD " + parallelize);
        debug("filesRDD count " + parallelize.count());
        JavaRDD filter = parallelize.map(str -> {
            return this.loopHdfsExecutor.runOnPartition(str, inputParams, length, inputParams.handleErrorType);
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        });
        debug("collect start");
        List<PartitionResult> collect = filter.collect();
        debug("collect result: " + filter);
        debug("result count: " + collect.size());
        jobStats.setResultSize(collect.size());
        int orElse = collect.stream().map((v0) -> {
            return v0.getOutputs();
        }).mapToInt((v0) -> {
            return v0.size();
        }).max().orElse(0);
        debug("port count: " + orElse);
        TreeMap treeMap = new TreeMap();
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        ArrayList arrayList = new ArrayList();
        for (PartitionResult partitionResult : collect) {
            if (partitionResult instanceof PartitionError) {
                debug("error " + partitionResult.getPartitionId());
                treeMap.put(partitionResult.getPartitionId(), ((PartitionError) partitionResult).getErrorMessage());
                if (LogRetrievalType.NONE != inputParams.logRetrieveType) {
                    outputParams.logRecords.put(partitionResult.getPartitionId(), partitionResult.getLogs());
                }
            } else {
                for (int i = 0; i < orElse; i++) {
                    IOObject iOObject = partitionResult.getOutputs().get(i);
                    debug("result " + i + ": " + iOObject.getClass().getCanonicalName());
                    if (arrayList.size() <= i) {
                        arrayList.add(new IOObjectCollection());
                    }
                    ((IOObjectCollection) arrayList.get(i)).add(this.serializationConverter.convertForTOSerialization(iOObject));
                }
                if (LogRetrievalType.ALL == inputParams.logRetrieveType) {
                    outputParams.logRecords.put(partitionResult.getPartitionId(), partitionResult.getLogs());
                }
            }
            partitionResult.getMacros().forEach((str2, str3) -> {
                if (!hashMap.containsKey(str2) || Objects.equals(hashMap.get(str2), str3)) {
                    hashMap.put(str2, str3);
                } else {
                    hashSet.add(str2);
                }
            });
        }
        jobStats.setErrorCount(treeMap.size());
        HashSet hashSet2 = new HashSet(treeMap.values());
        if (length > 1 && treeMap.size() == collect.size() && hashSet2.size() == 1) {
            String str4 = (String) hashSet2.iterator().next();
            jobStats.setCommonError(str4);
            debug("All executors failed with the same error: " + str4);
        }
        outputParams.jobStats = jobStats;
        outputParams.partitionErrors = treeMap;
        outputParams.ioObjects = new ArrayList();
        outputParams.ioObjects.addAll(arrayList);
        hashSet.forEach(str5 -> {
            debug("macro removed because it contains multiple values across different partitions: " + str5);
            hashMap.remove(str5);
        });
        if (inputParams.setPartitionMacro.booleanValue()) {
            hashMap.remove(inputParams.partitionMacroName);
        }
        outputParams.macros = hashMap;
        return outputParams;
    }

    private void debug(String str) {
        this.debugLogger.debug(str);
    }

    private void debug(Supplier<String> supplier) {
        this.debugLogger.debug(supplier);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -922193540:
                if (implMethodName.equals("lambda$runJob$3e924e20$1")) {
                    z = false;
                    break;
                }
                break;
            case 2123019764:
                if (implMethodName.equals("nonNull")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/radoop/spark/hdfs/loop/LoopHdfsDriver") && serializedLambda.getImplMethodSignature().equals("(Leu/radoop/spark/InputParams;ILjava/lang/String;)Leu/radoop/spark/hdfs/loop/PartitionResult;")) {
                    LoopHdfsDriver loopHdfsDriver = (LoopHdfsDriver) serializedLambda.getCapturedArg(0);
                    InputParams inputParams = (InputParams) serializedLambda.getCapturedArg(1);
                    int intValue = ((Integer) serializedLambda.getCapturedArg(2)).intValue();
                    return str -> {
                        return this.loopHdfsExecutor.runOnPartition(str, inputParams, intValue, inputParams.handleErrorType);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/util/Objects") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Z")) {
                    return (v0) -> {
                        return Objects.nonNull(v0);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
