package eu.radoop.spark;

import com.google.common.base.Strings;
import com.rapidminer.example.ExampleSet;
import com.rapidminer.operator.IOObject;
import com.rapidminer.operator.IOObjectCollection;
import com.rapidminer.operator.Operator;
import com.rapidminer.operator.UserError;
import eu.radoop.RadoopConf;
import eu.radoop.spark.LoopHdfsCommon;
import eu.radoop.spark.PushdownRunner;
import eu.radoop.spark.processrunner.ProcessRunner;
import eu.radoop.transfer.BlobParameterTransferObject;
import eu.radoop.transfer.ProcessExceptionTO;
import eu.radoop.transfer.ProcessOutputTO;
import eu.radoop.transfer.parameter.ProcessPushdownParameter;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.logging.LogRecord;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:lib/radoop-spark3.jar:eu/radoop/spark/LoopHdfsRunner.class */
public class LoopHdfsRunner implements Serializable {
    private static final long serialVersionUID = -4613737228780470389L;
    public static final String APPNAME = "Loop HDFS";
    private static final Logger log = LoggerFactory.getLogger((Class<?>) LoopHdfsRunner.class);
    private JavaSparkContext sc;
    private final PartitionsCalculator calculator = new PartitionsCalculator();

    public static void main(String[] strArr) throws Throwable {
        LoopHdfsCommon.debug("main start");
        new LoopHdfsRunner().run(strArr);
    }

    /* JADX WARN: Finally extract failed */
    private void run(String[] strArr) throws Throwable {
        ProcessExceptionTO processExceptionTO;
        BlobParameterTransferObject<ProcessPushdownParameter> blobParameterTransferObject = new BlobParameterTransferObject<>(RunnerTools.readFromArgFile(strArr[0]), ProcessPushdownParameter.class);
        LoopHdfsCommon.debug("pto: " + blobParameterTransferObject);
        InputParams inputParams = setupParameterFromPTO(blobParameterTransferObject);
        LoopHdfsCommon.debug("create spark conf");
        SparkConf appName = new SparkConf().setAppName(APPNAME);
        LoopHdfsCommon.debug("got spark conf");
        JavaSparkContext javaSparkContext = new JavaSparkContext(appName);
        try {
            this.sc = javaSparkContext;
            FileSystem fileSystem = FileSystem.get(javaSparkContext.hadoopConfiguration());
            inputParams.inputDir = RunnerTools.resolveInputDir(javaSparkContext, inputParams.inputDir);
            if (fileSystem.exists(new Path(inputParams.potoDir + "_PROCESS_ERROR_FLAG"))) {
                throw new SparkException("UserError occured. Please check the logs in RapidMiner Studio or the the logs of the first Application attempt.");
            }
            ExecutorGCMonitor executorGCMonitor = new ExecutorGCMonitor();
            try {
                if (inputParams.monitorExecutorGC.booleanValue()) {
                    System.out.println("Starting Executor Health Monitoring");
                    Integer num = inputParams.gcMonitorLookbackSeconds;
                    Double d = inputParams.gcMonitorGcTreshold;
                    Iterator<Tuple2<String, String>> it = inputParams.processConfiguration.iterator();
                    while (it.hasNext()) {
                        Tuple2<String, String> next = it.next();
                        if (RadoopConf.SPARK_GC_MONITOR_LOOKBACK_SECONDS.equals(next._1)) {
                            num = Integer.valueOf(Integer.parseInt(next._2));
                        }
                        if (RadoopConf.SPARK_GC_MONITOR_TRESHOLD.equals(next._1)) {
                            d = Double.valueOf(Double.parseDouble(next._2));
                        }
                    }
                    executorGCMonitor.setLookbackSecs(num);
                    executorGCMonitor.setGcTreshold(d);
                    executorGCMonitor.listen(javaSparkContext.sc());
                } else {
                    System.out.println("Executor Health Monitoring disabled");
                }
                PushdownRunner.OutputParams runJob = runJob(inputParams);
                ProcessOutputTO processOutputTO = new ProcessOutputTO(runJob.partitionToAMD, runJob.ioObjects, runJob.logRecords, runJob.macros);
                ArrayList arrayList = new ArrayList(1);
                arrayList.add(processOutputTO.toJson());
                javaSparkContext.parallelize(arrayList, 1).saveAsTextFile(inputParams.potoDir);
                javaSparkContext.close();
            } catch (Throwable th) {
                th = th;
                Map<String, List<LogRecord>> map = null;
                ProcessExceptionWithLogs extractOriginalProcessExceptionWithLogs = PushdownTools.extractOriginalProcessExceptionWithLogs(th);
                boolean z = false;
                if (extractOriginalProcessExceptionWithLogs != null) {
                    map = extractOriginalProcessExceptionWithLogs.logRecords;
                    processExceptionTO = extractOriginalProcessExceptionWithLogs.processExceptionTO;
                    th = processExceptionTO.getProcessException();
                    if (th instanceof UserError) {
                        z = true;
                    }
                } else if (executorGCMonitor.isJobKilled()) {
                    System.out.println("GC Monitor killed the application or OOM. Returning with exception.");
                    javaSparkContext.sc().requestExecutors(1);
                    processExceptionTO = new ProcessExceptionTO(new UserError((Operator) null, ProcessExceptionTO.ERROR_CODE_OOM));
                } else if (th.getMessage() != null && th.getMessage().contains("ExecutorLostFailure")) {
                    processExceptionTO = new ProcessExceptionTO(new Exception("Process Pushdown possibly ran out of memory on the cluster."), th.getStackTrace());
                } else if (th instanceof UserError) {
                    processExceptionTO = new ProcessExceptionTO(th);
                    z = true;
                } else {
                    processExceptionTO = new ProcessExceptionTO(new Exception(th.toString()), th.getStackTrace());
                }
                ProcessOutputTO processOutputTO2 = new ProcessOutputTO(map, processExceptionTO);
                ArrayList arrayList2 = new ArrayList(1);
                arrayList2.add(processOutputTO2.toJson());
                if (javaSparkContext != null) {
                    javaSparkContext.parallelize(arrayList2, 1).saveAsTextFile(inputParams.potoDir);
                }
                if (z) {
                    FSDataOutputStream fSDataOutputStream = null;
                    try {
                        fSDataOutputStream = FileSystem.get(javaSparkContext.hadoopConfiguration()).create(new Path(inputParams.potoDir + "_PROCESS_ERROR_FLAG"));
                        if (fSDataOutputStream != null) {
                            fSDataOutputStream.close();
                        }
                    } catch (Throwable th2) {
                        if (fSDataOutputStream != null) {
                            fSDataOutputStream.close();
                        }
                        throw th2;
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            try {
                javaSparkContext.close();
            } catch (Throwable th4) {
                th3.addSuppressed(th4);
            }
            throw th3;
        }
    }

    private String[] getInputHdfsFiles(InputParams inputParams) throws IOException {
        List asList;
        if (Strings.isNullOrEmpty(inputParams.loop_hdfsPath)) {
            return inputParams.loop_inputFiles;
        }
        FileSystem fileSystem = FileSystem.get(new Configuration());
        if (inputParams.loop_recursive == null || !inputParams.loop_recursive.booleanValue()) {
            asList = Strings.isNullOrEmpty(inputParams.loop_filter) ? Arrays.asList(fileSystem.listStatus(new Path(inputParams.loop_hdfsPath))) : Arrays.asList(fileSystem.globStatus(new Path(toGlob(inputParams.loop_hdfsPath, inputParams.loop_filter))));
        } else {
            RemoteIterator<LocatedFileStatus> listFiles = fileSystem.listFiles(new Path(inputParams.loop_hdfsPath), true);
            FileStatusNameMatcher fileStatusNameMatcher = new FileStatusNameMatcher(inputParams.loop_filter);
            asList = new ArrayList();
            while (listFiles.hasNext()) {
                LocatedFileStatus next = listFiles.next();
                if (fileStatusNameMatcher.test((FileStatusNameMatcher) next)) {
                    asList.add(next);
                }
            }
        }
        return (String[]) asList.stream().filter((v0) -> {
            return v0.isFile();
        }).map(fileStatus -> {
            return fileStatus.getPath().toString();
        }).toArray(i -> {
            return new String[i];
        });
    }

    private String toGlob(String str, String str2) {
        String path = Path.getPathWithoutSchemeAndAuthority(new Path(str)).toString();
        if (!path.endsWith("/")) {
            path = path + "/";
        }
        return path + str2;
    }

    private PushdownRunner.OutputParams runJob(InputParams inputParams) throws Throwable {
        LoopHdfsCommon.debug("runJob start");
        String[] inputHdfsFiles = getInputHdfsFiles(inputParams);
        int length = inputHdfsFiles.length;
        LoopHdfsCommon.debug("files: " + length);
        LoopHdfsCommon.debug("files: " + Arrays.deepToString(inputHdfsFiles));
        int estimateNumberOfPartitions = this.calculator.estimateNumberOfPartitions(length, inputParams.numberOfPartitions);
        LoopHdfsCommon.debug("Executing job using " + estimateNumberOfPartitions + " partitions");
        JavaRDD parallelize = this.sc.parallelize(Arrays.asList(inputHdfsFiles), estimateNumberOfPartitions);
        LoopHdfsCommon.debug("filesRDD " + parallelize);
        LoopHdfsCommon.debug("filesRDD count " + parallelize.count());
        JavaRDD filter = parallelize.map(str -> {
            return LoopHdfsExecutor.runOnPartition(str, inputParams, length, inputParams.handleErrorType);
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        });
        LoopHdfsCommon.debug("collect start");
        List collect = filter.collect();
        LoopHdfsCommon.debug("result: " + filter);
        collect.forEach(partitionResult -> {
            LoopHdfsCommon.debug((Supplier<String>) () -> {
                return partitionResult.logs.toString();
            });
        });
        PushdownRunner.OutputParams outputParams = new PushdownRunner.OutputParams();
        List list = (List) IntStream.range(0, collect.isEmpty() ? 0 : ((LoopHdfsCommon.PartitionResult) collect.get(0)).outputs.size()).mapToObj(i -> {
            IOObjectCollection iOObjectCollection = new IOObjectCollection();
            collect.forEach(partitionResult2 -> {
                if (i < partitionResult2.outputs.size()) {
                    IOObject elementAt = partitionResult2.outputs.getElementAt(i);
                    LoopHdfsCommon.debug("result " + i + ": " + elementAt.getClass().getCanonicalName());
                    iOObjectCollection.add(convertForTOSerialization(elementAt));
                }
            });
            return iOObjectCollection;
        }).collect(Collectors.toList());
        outputParams.ioObjects = new ArrayList();
        outputParams.ioObjects.addAll(list);
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        Objects.requireNonNull(hashMap);
        arrayList.forEach((v1) -> {
            r1.remove(v1);
        });
        if (inputParams.setPartitionMacro.booleanValue()) {
            hashMap.remove(inputParams.partitionMacroName);
        }
        outputParams.macros = hashMap;
        outputParams.partitionToAMD = new HashMap();
        outputParams.logRecords = (Map) collect.stream().collect(Collectors.toMap(partitionResult2 -> {
            return partitionResult2.index == null ? "?" : partitionResult2.index;
        }, partitionResult3 -> {
            return partitionResult3.logs;
        }));
        return outputParams;
    }

    private InputParams setupParameterFromPTO(BlobParameterTransferObject<ProcessPushdownParameter> blobParameterTransferObject) {
        InputParams inputParams = new InputParams();
        inputParams.handleErrorType = HandleErrorType.fromCode(blobParameterTransferObject.getParameterAsInteger(ProcessPushdownParameter.HANDLE_ERROR));
        inputParams.inputDir = blobParameterTransferObject.getParameterAsString(ProcessPushdownParameter.INPUT_DIR);
        inputParams.outputDir = blobParameterTransferObject.getParameterAsStringArray(ProcessPushdownParameter.OUTPUT_DIRS)[0];
        inputParams.tempDir = blobParameterTransferObject.getParameterAsString(ProcessPushdownParameter.TEMP_DIR);
        inputParams.potoDir = blobParameterTransferObject.getParameterAsString(ProcessPushdownParameter.POTO_DIR);
        inputParams.processXML = (String) blobParameterTransferObject.getParameterAsBlobTO(ProcessPushdownParameter.PROCESS_XML).getPayload();
        inputParams.partitioningAttributeName = blobParameterTransferObject.getParameterAsString(ProcessPushdownParameter.PARTITIONING_ATTRIBUTE);
        inputParams.monitorExecutorGC = blobParameterTransferObject.getParameterAsBoolean(ProcessPushdownParameter.MONITOR_EXECUTOR_GC);
        inputParams.gcMonitorLookbackSeconds = blobParameterTransferObject.getParameterAsInteger(ProcessPushdownParameter.GCMONITOR_LOOKBACK_SECONDS);
        inputParams.gcMonitorGcTreshold = blobParameterTransferObject.getParameterAsDouble(ProcessPushdownParameter.GCMONITOR_GCTRESHOLD);
        List list = (List) blobParameterTransferObject.getParameterAsBlobTO(ProcessPushdownParameter.IOOBJECTS).getPayload();
        inputParams.ioObjectList = new ArrayList();
        if (list != null) {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                inputParams.ioObjectList.add((IOObject) it.next());
            }
        }
        inputParams.connectionsXML = null;
        inputParams.processConfiguration = new ArrayList<>();
        String[] parameterAsStringArray = blobParameterTransferObject.getParameterAsStringArray(ProcessPushdownParameter.ADDITIONAL_PROCESS_CONFIGURATION_KEYS);
        String[] parameterAsStringArray2 = blobParameterTransferObject.getParameterAsStringArray(ProcessPushdownParameter.ADDITIONAL_PROCESS_CONFIGURATION_VALUES);
        for (int i = 0; i < parameterAsStringArray.length; i++) {
            if (parameterAsStringArray2.length > i) {
                inputParams.processConfiguration.add(new Tuple2<>(parameterAsStringArray[i], parameterAsStringArray2[i]));
                System.out.println("Setting parameters for process: (" + parameterAsStringArray[i] + ", " + parameterAsStringArray2[i] + ")");
            }
        }
        inputParams.macros = new HashMap();
        String[] parameterAsStringArray3 = blobParameterTransferObject.getParameterAsStringArray(ProcessPushdownParameter.MACRO_NAMES);
        String[] parameterAsStringArray4 = blobParameterTransferObject.getParameterAsStringArray(ProcessPushdownParameter.MACRO_VALUES);
        for (int i2 = 0; i2 < parameterAsStringArray3.length; i2++) {
            if (parameterAsStringArray4.length > i2) {
                inputParams.macros.put(parameterAsStringArray3[i2], parameterAsStringArray4[i2]);
                System.out.println("Setting macros for process: (" + parameterAsStringArray3[i2] + ", " + parameterAsStringArray4[i2] + ")");
            }
        }
        inputParams.extensionsUsed = Arrays.asList(blobParameterTransferObject.getParameterAsStringArray(ProcessPushdownParameter.EXTENSIONS_USED));
        inputParams.numberOfPartitions = blobParameterTransferObject.getParameterAsInteger(ProcessPushdownParameter.NUMBER_OF_PARTITIONS);
        inputParams.setPartitionMacro = blobParameterTransferObject.getParameterAsBoolean(ProcessPushdownParameter.SET_PARTITION_MACRO);
        inputParams.partitionMacroName = blobParameterTransferObject.getParameterAsString(ProcessPushdownParameter.PARTITION_MACRO_NAME);
        inputParams.radoopVersion = blobParameterTransferObject.getParameterAsString(ProcessPushdownParameter.RADOOP_VERSION);
        inputParams.logLimit = blobParameterTransferObject.getParameterAsInteger(ProcessPushdownParameter.LOG_LIMIT);
        inputParams.loop_inputFiles = blobParameterTransferObject.getParameterAsStringArray(ProcessPushdownParameter.INPUT_FILES);
        inputParams.loop_hdfsPath = blobParameterTransferObject.getParameterAsString(ProcessPushdownParameter.LOOP_HDFSPATH);
        inputParams.loop_filter = blobParameterTransferObject.getParameterAsString(ProcessPushdownParameter.LOOP_FILTER);
        inputParams.loop_recursive = blobParameterTransferObject.getParameterAsBoolean(ProcessPushdownParameter.LOOP_RECURSIVE);
        return inputParams;
    }

    private static IOObject convertForTOSerialization(IOObject iOObject) {
        IOObject iOObject2;
        if (iOObject instanceof ExampleSet) {
            LoopHdfsCommon.debug("driver result exampleset table: " + ((ExampleSet) iOObject).getExampleTable().getClass().getCanonicalName());
            iOObject2 = ProcessRunner.asExampleSetOrNull(iOObject);
        } else {
            iOObject2 = iOObject;
        }
        return iOObject2;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1912344726:
                if (implMethodName.equals("lambda$runJob$5f642557$1")) {
                    z = false;
                    break;
                }
                break;
            case 2123019764:
                if (implMethodName.equals("nonNull")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/radoop/spark/LoopHdfsRunner") && serializedLambda.getImplMethodSignature().equals("(Leu/radoop/spark/InputParams;ILjava/lang/String;)Leu/radoop/spark/LoopHdfsCommon$PartitionResult;")) {
                    InputParams inputParams = (InputParams) serializedLambda.getCapturedArg(0);
                    int intValue = ((Integer) serializedLambda.getCapturedArg(1)).intValue();
                    return str -> {
                        return LoopHdfsExecutor.runOnPartition(str, inputParams, intValue, inputParams.handleErrorType);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/util/Objects") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Z")) {
                    return (v0) -> {
                        return Objects.nonNull(v0);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
