package eu.radoop.spark;

import com.rapidminer.example.Attribute;
import com.rapidminer.example.Attributes;
import com.rapidminer.example.Example;
import com.rapidminer.operator.IOObject;
import com.rapidminer.operator.Operator;
import com.rapidminer.operator.OperatorException;
import com.rapidminer.operator.OperatorVersion;
import com.rapidminer.operator.UserError;
import com.rapidminer.settings.Settings;
import com.rapidminer.tools.LogService;
import com.rapidminer.tools.Ontology;
import com.rapidminer.tools.ParameterService;
import eu.radoop.RadoopConf;
import eu.radoop.transfer.BlobParameterTransferObject;
import eu.radoop.transfer.ProcessExceptionTO;
import eu.radoop.transfer.ProcessOutputTO;
import eu.radoop.transfer.PushdownOutputAttributeMetaData;
import eu.radoop.transfer.parameter.BootstrapingMode;
import eu.radoop.transfer.parameter.CommonParameter;
import eu.radoop.transfer.parameter.MissingAttributesHandlingMode;
import eu.radoop.transfer.parameter.PartitionSizing;
import eu.radoop.transfer.parameter.PartitioningMode;
import eu.radoop.transfer.parameter.ProcessPushdownParameter;
import eu.radoop.transfer.parameter.SchemaConflictResolutionMode;
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.logging.LogRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

/* loaded from: input_file:lib/radoop-spark3.jar:eu/radoop/spark/PushdownRunner.class */
public abstract class PushdownRunner implements Serializable {
    private static final long serialVersionUID = 1;
    public static final String MISSING_VALUE_OUTPUT = "\\N";
    protected final List<LogRecord> driverLogs = new ArrayList();

    /* loaded from: input_file:lib/radoop-spark3.jar:eu/radoop/spark/PushdownRunner$Meta.class */
    public static class Meta implements Serializable {
        private static final long serialVersionUID = 1;
        public Attributes outputExampleSchema;
        public HashMap<String, String> macros;
    }

    /* loaded from: input_file:lib/radoop-spark3.jar:eu/radoop/spark/PushdownRunner$OutputParams.class */
    public static class OutputParams implements Serializable {
        private static final long serialVersionUID = 1;
        public List<IOObject> ioObjects;
        public Map<String, String> macros;
        public Map<String, ArrayList<PushdownOutputAttributeMetaData>> partitionToAMD = new HashMap();
        public Map<String, List<LogRecord>> logRecords = new TreeMap();
        public Map<String, String> partitionErrors = new TreeMap();
        public JobStats jobStats = new JobStats();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Function<Example, String> convertExampleToString(String str) {
        final StringBuilder sb = new StringBuilder();
        final boolean isAbove = new OperatorVersion(str).isAbove(new OperatorVersion(RadoopConf.VERSION_INCOMPATIBLE_MISSING_HANDLING));
        return new Function<Example, String>() { // from class: eu.radoop.spark.PushdownRunner.1
            private static final long serialVersionUID = 1;

            public String call(Example example) {
                try {
                    Iterator allAttributes = example.getAttributes().allAttributes();
                    sb.setLength(0);
                    while (allAttributes.hasNext()) {
                        sb.append(PushdownRunner.this.convertExampleValue(example, (Attribute) allAttributes.next(), isAbove).replace("\u0001", "\\\u0001"));
                        sb.append((char) 1);
                    }
                    return sb.toString();
                } catch (Throwable th) {
                    throw PushdownTools.createExecutorException(th, null);
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String convertExampleValue(Example example, Attribute attribute, boolean z) throws OperatorException {
        String date;
        double value = example.getValue(attribute);
        if (z && Double.isNaN(value)) {
            date = "\\N";
        } else if (attribute.isNominal()) {
            if (Double.isNaN(value)) {
                date = "?";
            } else {
                date = attribute.getMapping().mapIndex((int) value);
                if (date.contains("\r") || date.contains("\n")) {
                    throw new UserError((Operator) null, 1032, new Object[]{attribute.getName()});
                }
            }
        } else if (attribute.isNumerical()) {
            date = attribute.getValueType() == 3 ? String.valueOf((int) value) : String.valueOf(value);
            if (date.contains("E")) {
                date = new BigDecimal(value).toPlainString();
            }
        } else {
            if (!attribute.isDateTime()) {
                throw new UserError((Operator) null, "spark.pushdown.unexpected_attribute_type", new Object[]{Ontology.VALUE_TYPE_NAMES[attribute.getValueType()]});
            }
            date = new Date((long) value).toString();
        }
        return date;
    }

    protected abstract OutputParams runJob(JavaSparkContext javaSparkContext, InputParams inputParams) throws SparkException, OperatorException;

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Finally extract failed */
    public void run(String[] strArr) throws SparkException, Throwable {
        ProcessExceptionTO processExceptionTO;
        tweakStaticInitializerSequence();
        BlobParameterTransferObject<ProcessPushdownParameter> blobParameterTransferObject = new BlobParameterTransferObject<>(RunnerTools.readFromArgFile(strArr[0]), ProcessPushdownParameter.class);
        System.out.println(blobParameterTransferObject);
        InputParams inputParams = setupParameterFromPTO(blobParameterTransferObject);
        JavaSparkContext javaSparkContext = null;
        ExecutorGCMonitor executorGCMonitor = new ExecutorGCMonitor();
        try {
            javaSparkContext = new JavaSparkContext(new SparkConf());
            FileSystem fileSystem = FileSystem.get(javaSparkContext.hadoopConfiguration());
            inputParams.inputDir = RunnerTools.resolveInputDir(javaSparkContext, inputParams.inputDir);
            if (fileSystem.exists(new Path(inputParams.potoDir + "_PROCESS_ERROR_FLAG"))) {
                throw new SparkException("UserError occured. Please check the logs in RapidMiner Studio or the the logs of the first Application attempt.");
            }
            try {
                if (inputParams.monitorExecutorGC.booleanValue()) {
                    System.out.println("Starting Executor Health Monitoring");
                    Integer num = inputParams.gcMonitorLookbackSeconds;
                    Double d = inputParams.gcMonitorGcTreshold;
                    Iterator<Tuple2<String, String>> it = inputParams.processConfiguration.iterator();
                    while (it.hasNext()) {
                        Tuple2<String, String> next = it.next();
                        if (RadoopConf.SPARK_GC_MONITOR_LOOKBACK_SECONDS.equals(next._1)) {
                            try {
                                num = Integer.valueOf(Integer.parseInt(next._2));
                            } catch (NumberFormatException e) {
                            }
                        }
                        if (RadoopConf.SPARK_GC_MONITOR_TRESHOLD.equals(next._1)) {
                            try {
                                d = Double.valueOf(Double.parseDouble(next._2));
                            } catch (NumberFormatException e2) {
                            }
                        }
                    }
                    executorGCMonitor.setLookbackSecs(num);
                    executorGCMonitor.setGcTreshold(d);
                    executorGCMonitor.listen(javaSparkContext.sc());
                } else {
                    System.out.println("Executor Health Monitoring disabled");
                }
                OutputParams runJob = runJob(javaSparkContext, inputParams);
                ProcessOutputTO processOutputTO = new ProcessOutputTO(runJob.partitionToAMD, runJob.ioObjects, runJob.logRecords, runJob.macros);
                ArrayList arrayList = new ArrayList(1);
                arrayList.add(processOutputTO.toJson());
                javaSparkContext.parallelize(arrayList, 1).saveAsTextFile(inputParams.potoDir);
                if (javaSparkContext != null) {
                    javaSparkContext.stop();
                }
            } catch (Throwable th) {
                th = th;
                Map<String, List<LogRecord>> map = null;
                ProcessExceptionWithLogs extractOriginalProcessExceptionWithLogs = PushdownTools.extractOriginalProcessExceptionWithLogs(th);
                boolean z = false;
                if (extractOriginalProcessExceptionWithLogs != null) {
                    map = extractOriginalProcessExceptionWithLogs.logRecords;
                    processExceptionTO = extractOriginalProcessExceptionWithLogs.processExceptionTO;
                    th = processExceptionTO.getProcessException();
                    if (th instanceof UserError) {
                        z = true;
                    }
                } else if (executorGCMonitor.isJobKilled()) {
                    System.out.println("GC Monitor killed the application or OOM. Returning with exception.");
                    javaSparkContext.sc().requestExecutors(1);
                    processExceptionTO = new ProcessExceptionTO(new UserError((Operator) null, ProcessExceptionTO.ERROR_CODE_OOM));
                } else if (th.getMessage() != null && th.getMessage().contains("ExecutorLostFailure")) {
                    processExceptionTO = new ProcessExceptionTO(new Exception("Process Pushdown possibly ran out of memory on the cluster."), th.getStackTrace());
                } else if (th instanceof UserError) {
                    processExceptionTO = new ProcessExceptionTO(th);
                    z = true;
                } else {
                    processExceptionTO = new ProcessExceptionTO(new Exception(th.toString()), th.getStackTrace());
                }
                ProcessOutputTO processOutputTO2 = new ProcessOutputTO(map, processExceptionTO);
                ArrayList arrayList2 = new ArrayList(1);
                arrayList2.add(processOutputTO2.toJson());
                javaSparkContext.parallelize(arrayList2, 1).saveAsTextFile(inputParams.potoDir);
                if (z) {
                    FSDataOutputStream fSDataOutputStream = null;
                    try {
                        fSDataOutputStream = FileSystem.get(javaSparkContext.hadoopConfiguration()).create(new Path(inputParams.potoDir + "_PROCESS_ERROR_FLAG"));
                        if (fSDataOutputStream != null) {
                            fSDataOutputStream.close();
                        }
                    } catch (Throwable th2) {
                        if (fSDataOutputStream != null) {
                            fSDataOutputStream.close();
                        }
                        throw th2;
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (javaSparkContext != null) {
                javaSparkContext.stop();
            }
            throw th3;
        }
    }

    private void tweakStaticInitializerSequence() {
        Settings.setSetting("rapidminer.logging.resource-file-jar-path", "com.rapidminer.resources.i18n.LogMessages");
        LogService.getRoot();
        ParameterService.init();
    }

    private InputParams setupParameterFromPTO(BlobParameterTransferObject<ProcessPushdownParameter> blobParameterTransferObject) {
        InputParams inputParams = new InputParams();
        inputParams.inputDir = blobParameterTransferObject.getParameterAsString(ProcessPushdownParameter.INPUT_DIR);
        inputParams.outputDir = blobParameterTransferObject.getParameterAsStringArray(ProcessPushdownParameter.OUTPUT_DIRS)[0];
        inputParams.tempDir = blobParameterTransferObject.getParameterAsString(ProcessPushdownParameter.TEMP_DIR);
        inputParams.potoDir = blobParameterTransferObject.getParameterAsString(ProcessPushdownParameter.POTO_DIR);
        inputParams.fileFormat = CommonParameter.FileFormat.getByName(blobParameterTransferObject.getParameterAsString(ProcessPushdownParameter.FILE_FORMAT));
        inputParams.names = blobParameterTransferObject.getParameterAsStringArray(ProcessPushdownParameter.ATTRIBUTE_NAMES);
        inputParams.roles = blobParameterTransferObject.getParameterAsStringArray(ProcessPushdownParameter.ATTRIBUTE_ROLES);
        inputParams.ontologyTypes = blobParameterTransferObject.getParameterAsIntegerArray(ProcessPushdownParameter.ONTOLOGY_TYPES);
        inputParams.positiveValues = blobParameterTransferObject.getParameterAsStringArray(ProcessPushdownParameter.POSITIVE_VALUES);
        inputParams.negativeValues = blobParameterTransferObject.getParameterAsStringArray(ProcessPushdownParameter.NEGATIVE_VALUES);
        inputParams.exaOutputConnected = blobParameterTransferObject.getParameterAsBoolean(ProcessPushdownParameter.EXA_OUTPUT_CONNECTED).booleanValue();
        inputParams.processXML = (String) blobParameterTransferObject.getParameterAsBlobTO(ProcessPushdownParameter.PROCESS_XML).getPayload();
        inputParams.partitioningAttributeName = blobParameterTransferObject.getParameterAsString(ProcessPushdownParameter.PARTITIONING_ATTRIBUTE);
        inputParams.monitorExecutorGC = blobParameterTransferObject.getParameterAsBoolean(ProcessPushdownParameter.MONITOR_EXECUTOR_GC);
        inputParams.gcMonitorLookbackSeconds = blobParameterTransferObject.getParameterAsInteger(ProcessPushdownParameter.GCMONITOR_LOOKBACK_SECONDS);
        inputParams.gcMonitorGcTreshold = blobParameterTransferObject.getParameterAsDouble(ProcessPushdownParameter.GCMONITOR_GCTRESHOLD);
        List list = (List) blobParameterTransferObject.getParameterAsBlobTO(ProcessPushdownParameter.IOOBJECTS).getPayload();
        inputParams.ioObjectList = new ArrayList();
        if (list != null) {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                inputParams.ioObjectList.add((IOObject) it.next());
            }
        }
        inputParams.connectionsXML = null;
        inputParams.processConfiguration = new ArrayList<>();
        String[] parameterAsStringArray = blobParameterTransferObject.getParameterAsStringArray(ProcessPushdownParameter.ADDITIONAL_PROCESS_CONFIGURATION_KEYS);
        String[] parameterAsStringArray2 = blobParameterTransferObject.getParameterAsStringArray(ProcessPushdownParameter.ADDITIONAL_PROCESS_CONFIGURATION_VALUES);
        for (int i = 0; i < parameterAsStringArray.length; i++) {
            if (parameterAsStringArray2.length > i) {
                inputParams.processConfiguration.add(new Tuple2<>(parameterAsStringArray[i], parameterAsStringArray2[i]));
                System.out.println("Setting parameters for process: (" + parameterAsStringArray[i] + ", " + parameterAsStringArray2[i] + ")");
            }
        }
        inputParams.macros = new HashMap();
        String[] parameterAsStringArray3 = blobParameterTransferObject.getParameterAsStringArray(ProcessPushdownParameter.MACRO_NAMES);
        String[] parameterAsStringArray4 = blobParameterTransferObject.getParameterAsStringArray(ProcessPushdownParameter.MACRO_VALUES);
        for (int i2 = 0; i2 < parameterAsStringArray3.length; i2++) {
            if (parameterAsStringArray4.length > i2) {
                inputParams.macros.put(parameterAsStringArray3[i2], parameterAsStringArray4[i2]);
                System.out.println("Setting macros for process: (" + parameterAsStringArray3[i2] + ", " + parameterAsStringArray4[i2] + ")");
            }
        }
        inputParams.extensionsUsed = Arrays.asList(blobParameterTransferObject.getParameterAsStringArray(ProcessPushdownParameter.EXTENSIONS_USED));
        inputParams.partitioningMode = PartitioningMode.getByName(blobParameterTransferObject.getParameterAsString(ProcessPushdownParameter.PARTITIONING_MODE));
        inputParams.partitionSizing = PartitionSizing.getByName(blobParameterTransferObject.getParameterAsString(ProcessPushdownParameter.PARTITION_SIZING));
        inputParams.numberOfPartitions = blobParameterTransferObject.getParameterAsInteger(ProcessPushdownParameter.NUMBER_OF_PARTITIONS);
        inputParams.partitionSize = blobParameterTransferObject.getParameterAsInteger(ProcessPushdownParameter.PARTITION_SIZE);
        inputParams.addPartitionIndex = blobParameterTransferObject.getParameterAsBoolean(ProcessPushdownParameter.ADD_PARTITION_INDEX);
        inputParams.setPartitionMacro = blobParameterTransferObject.getParameterAsBoolean(ProcessPushdownParameter.SET_PARTITION_MACRO);
        inputParams.partitionMacroName = blobParameterTransferObject.getParameterAsString(ProcessPushdownParameter.PARTITION_MACRO_NAME);
        inputParams.partitionMacroStartValue = blobParameterTransferObject.getParameterAsInteger(ProcessPushdownParameter.PARTITION_MACRO_START_VALUE);
        inputParams.inputStatisticsCountAll = blobParameterTransferObject.getParameterAsLong(ProcessPushdownParameter.INPUT_STATISTICS_COUNT_ALL);
        inputParams.mergeOutput = blobParameterTransferObject.getParameterAsBoolean(ProcessPushdownParameter.MERGE_OUTPUT);
        inputParams.resolveSchemaConflicts = blobParameterTransferObject.getParameterAsBoolean(ProcessPushdownParameter.RESOLVE_SCHEMA_CONFLICTS);
        inputParams.schemaConflictResolutionMode = SchemaConflictResolutionMode.getByName(blobParameterTransferObject.getParameterAsString(ProcessPushdownParameter.SCHEMA_CONFLICT_RESOLUTION_MODE));
        inputParams.handleMissingAttributes = MissingAttributesHandlingMode.getByName(blobParameterTransferObject.getParameterAsString(ProcessPushdownParameter.HANDLE_MISSING_ATTRIBUTES));
        inputParams.missingAttributesValue = blobParameterTransferObject.getParameterAsString(ProcessPushdownParameter.MISSING_ATTRIBUTES_VALUE);
        inputParams.maxCollectionSize = blobParameterTransferObject.getParameterAsInteger(ProcessPushdownParameter.MAX_COLLECTION_SIZE);
        inputParams.unconnectedExaSource = blobParameterTransferObject.getParameterAsBoolean(ProcessPushdownParameter.UNCONNECTED_EXA_SOURCE);
        inputParams.radoopVersion = blobParameterTransferObject.getParameterAsString(ProcessPushdownParameter.RADOOP_VERSION);
        inputParams.distributeNominalMappings = blobParameterTransferObject.getParameterAsBoolean(ProcessPushdownParameter.DISTRIBUTE_NOMINAL_MAPPINGS);
        inputParams.logLimit = blobParameterTransferObject.getParameterAsInteger(ProcessPushdownParameter.LOG_LIMIT);
        inputParams.bootstrap = blobParameterTransferObject.getParameterAsBoolean(ProcessPushdownParameter.BOOTSTRAP);
        inputParams.bootstrapMode = BootstrapingMode.getByName(blobParameterTransferObject.getParameterAsString(ProcessPushdownParameter.BOOTSTRAP_MODE));
        inputParams.bootstrapSize = blobParameterTransferObject.getParameterAsInteger(ProcessPushdownParameter.BOOTSTRAP_SIZE);
        inputParams.bootstrapProbability = blobParameterTransferObject.getParameterAsDouble(ProcessPushdownParameter.BOOTSTRAP_PROBABILITY);
        inputParams.seed = blobParameterTransferObject.getParameterAsLong(ProcessPushdownParameter.SEED);
        inputParams.numberOfBootstraps = blobParameterTransferObject.getParameterAsInteger(ProcessPushdownParameter.NUMBER_OF_BOOTSTRAPS);
        return inputParams;
    }
}
