package eu.radoop.operator.spark;

import au.com.bytecode.opencsv.CSVParser;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.rapidminer.example.table.NominalMapping;
import com.rapidminer.operator.OperatorDescription;
import com.rapidminer.operator.OperatorException;
import com.rapidminer.operator.OperatorVersion;
import com.rapidminer.operator.ProcessSetupError;
import com.rapidminer.operator.SimpleProcessSetupError;
import com.rapidminer.operator.UserError;
import com.rapidminer.operator.ports.InputPortExtender;
import com.rapidminer.operator.ports.InputPorts;
import com.rapidminer.operator.ports.OutputPort;
import com.rapidminer.operator.ports.OutputPortExtender;
import com.rapidminer.operator.ports.OutputPorts;
import com.rapidminer.operator.ports.metadata.AttributeMetaData;
import com.rapidminer.operator.ports.metadata.ExampleSetMetaData;
import com.rapidminer.operator.ports.metadata.MDTransformationRule;
import com.rapidminer.operator.ports.metadata.MetaData;
import com.rapidminer.operator.ports.metadata.SetRelation;
import com.rapidminer.operator.ports.quickfix.ParameterSettingQuickFix;
import com.rapidminer.parameter.ParameterType;
import com.rapidminer.parameter.ParameterTypeBoolean;
import com.rapidminer.parameter.ParameterTypeCategory;
import com.rapidminer.parameter.ParameterTypeEnumeration;
import com.rapidminer.parameter.ParameterTypeFile;
import com.rapidminer.parameter.ParameterTypeString;
import com.rapidminer.parameter.ParameterTypeText;
import com.rapidminer.parameter.TextType;
import com.rapidminer.parameter.UndefinedParameterError;
import com.rapidminer.parameter.conditions.AboveOperatorVersionCondition;
import com.rapidminer.parameter.conditions.EqualOperatorVersionCondition;
import com.rapidminer.parameter.conditions.EqualTypeCondition;
import com.rapidminer.tools.LogService;
import eu.radoop.RadoopOperator;
import eu.radoop.RadoopTools;
import eu.radoop.datahandler.HadoopExampleSet;
import eu.radoop.datahandler.HadoopExampleSetFactory;
import eu.radoop.datahandler.hdfs.TempHDFSDirectory;
import eu.radoop.datahandler.hive.FileFormatHive;
import eu.radoop.datahandler.mapreducehdfs.MapReduceHDFSHandler;
import eu.radoop.hive.HiveStaticUtils;
import eu.radoop.operator.ports.metadata.HadoopExampleSetMetaData;
import eu.radoop.operator.spark.SparkTools;
import eu.radoop.spark.SparkJobResult;
import eu.radoop.spark.SparkVersion;
import eu.radoop.spark.script.SparkScriptLanguage;
import eu.radoop.tools.LogCollectionMethod;
import eu.radoop.tools.LogCollectionTools;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.compress.archivers.ArchiveStreamFactory;

/* loaded from: input_file:eu/radoop/operator/spark/SparkScript.class */
public class SparkScript extends RadoopOperator {
    private final InputPortExtender inputExtender;
    private final OutputPortExtender outputExtender;
    public static final char CSV_QUOTE = '\"';
    public static final char CSV_SEMICOLON = ';';
    public static final char CSV_ESCAPE = '\\';
    public static final String PARAMETER_SCRIPT_PYTHON = "Python_script";
    public static final String PARAMETER_SCRIPT_R = "R_script";
    public static final String PARAMETER_SCRIPT_SCALA = "Scala_script";
    public static final String PARAMETER_SCRIPT_JAVA = "Java_script";
    public static final String PARAMETER_ENABLE_HIVE_ACCESS = "enable_Hive_access";
    private static final String HINT_HIVE_ACCESS = "Hive access from Spark Script may not be configured properly. Please read the \"Accessing Hive from Spark Script\" section in the documentation at https://docs.rapidminer.com/radoop/installation/hadoop-security/hive-context.html";
    public static final String DEFAULT_SPARK_PYTHON_SCRIPT_LOCATION = "/eu/radoop/operator/spark/exampleSparkPython.py";
    public static final String DEFAULT_SPARK_R_SCRIPT_LOCATION = "/eu/radoop/operator/spark/exampleSparkR.R";
    public static final String HELP_TEXT_ADDITIONAL_LOCAL_PY = "You can specify additional local sources by adding entries to this parameter. The supported file extensions are .zip for compressed formats and .py for source files. ";
    public static final String HELP_TEXT_ADDITIONAL_LOCAL_R = "You can specify additional local sources by adding entries to this parameter. The supported file extensions are .zip for compressed formats and .R for source files. ";
    public static final String HELP_TEXT_ADDITIONAL_HDFS = "If you want to avoid uploading large source files or packages to the HDFS you can specify their absolute HDFS path in this parameter.";
    public static final String HELP_TEXT_PY_SCRIPT = "The Spark script to execute. A method (function) with the name <code>rm_main</code> with one input argument and one return value is defined in the default script. Please do not change the name of the function. It can have as many arguments as the number of the connected input ports and as many returned values as the number of the connected output ports.";
    public static final String HELP_TEXT_R_SCRIPT = "The Spark script to execute. A method (function) with the name <code>rm_main</code> with one input argument and one return value is defined in the default script. Please do not change the name of the function. It can have as many arguments as the number of the connected input ports and as many returned values as the number of the connected output ports. When using R, please return the output DataFrames as a list (see the default script or the examples).";
    public static final String PARAMETER_LANGUAGE = "language";
    public static final String PARAMETER_KEEP_BINOMINALS = "preserve_binominal_mappings";
    public static final String PARAMETER_ADDITIONAL_LOCAL_SOURCES_R_OLD = "additional_local_sources";
    public static final String PARAMETER_ADDITIONAL_LOCAL_SOURCE_R_OLD = "additional_local_source_file";
    public static final String PARAMETER_ADDITIONAL_HDFS_SOURCES_R_OLD = "additional_hdfs_sources";
    public static final String PARAMETER_ADDITIONAL_HDFS_SOURCE_R_OLD = "additional_hdfs_source_file";
    public static final String PARAMETER_ADDITIONAL_LOCAL_SOURCES_PY_OLD = "additional_local_sources_";
    public static final String PARAMETER_ADDITIONAL_LOCAL_SOURCE_PY_OLD = "additional_local_source_file_";
    public static final String PARAMETER_ADDITIONAL_HDFS_SOURCES_PY_OLD = "additional_hdfs_sources_";
    public static final String PARAMETER_ADDITIONAL_HDFS_SOURCE_PY_OLD = "additional_hdfs_source_file_";
    public static final String PARAMETER_ADDITIONAL_LOCAL_SOURCES_R = "additional_local_R_sources";
    public static final String PARAMETER_ADDITIONAL_LOCAL_SOURCE_R = "additional_local_R_source_file";
    public static final String PARAMETER_ADDITIONAL_HDFS_SOURCES_R = "additional_hdfs_R_sources";
    public static final String PARAMETER_ADDITIONAL_HDFS_SOURCE_R = "additional_hdfs_R_source_file";
    public static final String PARAMETER_ADDITIONAL_LOCAL_SOURCES_PY = "additional_local_py_sources";
    public static final String PARAMETER_ADDITIONAL_LOCAL_SOURCE_PY = "additional_local_py_source_file";
    public static final String PARAMETER_ADDITIONAL_HDFS_SOURCES_PY = "additional_hdfs_py_sources";
    public static final String PARAMETER_ADDITIONAL_HDFS_SOURCE_PY = "additional_hdfs_py_source_file";
    private static final String LF = System.lineSeparator();
    public static final String[] LANGUAGES = SparkScriptLanguage.getNames();
    public static final OperatorVersion VERSION_SAME_PARAM_NAME = new OperatorVersion(2, 7, 1);

    public SparkScript(OperatorDescription operatorDescription) {
        super(operatorDescription);
        this.inputExtender = new InputPortExtender("example set input", getInputPorts(), new HadoopExampleSetMetaData(), false);
        this.outputExtender = new OutputPortExtender("example set output", getOutputPorts());
        this.inputExtender.start();
        this.outputExtender.start();
        getTransformer().addRule(new MDTransformationRule() { // from class: eu.radoop.operator.spark.SparkScript.1
            public void transformMD() {
                InputPorts inputPorts = SparkScript.this.getInputPorts();
                OutputPorts outputPorts = SparkScript.this.getOutputPorts();
                int numberOfConnectedPorts = inputPorts.getNumberOfConnectedPorts();
                int numberOfConnectedPorts2 = outputPorts.getNumberOfConnectedPorts();
                MetaData[] metaDataArr = new HadoopExampleSetMetaData[numberOfConnectedPorts2];
                for (int i = 0; i < numberOfConnectedPorts2; i++) {
                    if (i < numberOfConnectedPorts) {
                        ExampleSetMetaData metaData = inputPorts.getPortByIndex(i).getMetaData();
                        if (metaData != null) {
                            metaData.mergeSetRelation(SetRelation.UNKNOWN);
                            metaDataArr[i] = RadoopOperator.castToHesMD(metaData);
                        }
                    } else if (i > 0) {
                        metaDataArr[i] = RadoopOperator.castToHesMD(metaDataArr[i - 1]);
                    }
                }
                for (int i2 = 0; i2 < numberOfConnectedPorts2; i2++) {
                    if (metaDataArr[i2] != null) {
                        Iterator it = metaDataArr[i2].getAllAttributes().iterator();
                        while (it.hasNext()) {
                            ((AttributeMetaData) it.next()).setRegular();
                        }
                    }
                    outputPorts.getPortByIndex(i2).deliverMD(metaDataArr[i2]);
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // eu.radoop.RadoopOperator
    public void performAdditionalChecks() {
        super.performAdditionalChecks();
        SparkTools.checkSparkVersionDesignTime(this, SparkVersion.get30AndAbove());
        try {
            SparkScriptLanguage language = getLanguage();
            String script = getScript();
            if (!getParameterAsBoolean(PARAMETER_ENABLE_HIVE_ACCESS)) {
                String[] split = script.split("\n");
                int length = split.length;
                int i = 0;
                while (true) {
                    if (i >= length) {
                        break;
                    }
                    String replaceAll = split[i].replaceAll("\r", "").replaceAll("\n", "");
                    if (replaceAll.contains(language.getHiveContextHint()) && !replaceAll.matches("\\s*#.*")) {
                        addError(new SimpleProcessSetupError(ProcessSetupError.Severity.WARNING, getPortOwner(), Collections.singletonList(new ParameterSettingQuickFix(this, PARAMETER_ENABLE_HIVE_ACCESS, String.valueOf(true))), "hivecontext_disabled", new Object[0]));
                        break;
                    }
                    i++;
                }
            } else {
                long count = this.inputExtender.getManagedPorts().stream().filter((v0) -> {
                    return v0.isConnected();
                }).count();
                int argCount = getArgCount(language, script);
                if (argCount >= 0 && argCount < Integer.MAX_VALUE && argCount != count) {
                    addError(new SimpleProcessSetupError(ProcessSetupError.Severity.WARNING, getPortOwner(), Collections.emptyList(), true, "process.error.spark_script_main_args", new Object[]{language.getName(), Long.valueOf(count)}));
                }
            }
            HashSet hashSet = new HashSet();
            getAdditionalDirParams(hashSet, new HashSet());
            Iterator<String> it = hashSet.iterator();
            while (it.hasNext()) {
                File file = new File(it.next());
                if (!file.exists()) {
                    addError(new SimpleProcessSetupError(ProcessSetupError.Severity.ERROR, getPortOwner(), "file_not_exist", new Object[]{file.getAbsolutePath()}));
                }
            }
        } catch (OperatorException e) {
        }
    }

    protected static int getArgCount(SparkScriptLanguage sparkScriptLanguage, String str) {
        Matcher matcher = Pattern.compile(sparkScriptLanguage.getMainSearchString()).matcher(str);
        int i = -1;
        if (matcher.find()) {
            String group = matcher.group(1);
            i = group.length() == 0 ? 0 : group.contains("...") ? Integer.MAX_VALUE : group.contains("*") ? Integer.MAX_VALUE : group.split(",").length;
        }
        return i;
    }

    @Override // eu.radoop.RadoopOperator
    public int getCost() {
        return 0;
    }

    public void doWork() throws OperatorException {
        StringBuilder appendR;
        SparkTools.checkSparkVersionRuntime(this, SparkVersion.get30AndAbove());
        SparkVersion sparkVersion = getHadoopContext().getConnectionEntry().getSparkVersion();
        SparkScriptLanguage valueOf = SparkScriptLanguage.valueOf(getParameterAsString(PARAMETER_LANGUAGE).toUpperCase());
        String script = getScript();
        boolean parameterAsBoolean = getParameterAsBoolean(PARAMETER_ENABLE_HIVE_ACCESS);
        List<HadoopExampleSet> hesFromInputPort = getHesFromInputPort(this.inputExtender);
        Iterator<HadoopExampleSet> it = hesFromInputPort.iterator();
        while (it.hasNext()) {
            it.next().materializeInFileFormat(this, getTempHdfsDirectory(), FileFormatHive.PARQUET);
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        try {
            HashSet hashSet = new HashSet();
            HashSet hashSet2 = new HashSet();
            switch (valueOf) {
                case PYTHON:
                    appendR = appendPython(sparkVersion, script, arrayList, arrayList2, hesFromInputPort, getOutputPorts());
                    break;
                case R:
                    appendR = appendR(script, arrayList, arrayList2, hesFromInputPort, getOutputPorts(), getHadoopContext().getConnectionEntry().getSparkVersion());
                    break;
                default:
                    throw new OperatorException("Illegal language parameter!");
            }
            getAdditionalDirParams(hashSet, hashSet2);
            Iterator<String> it2 = hashSet.iterator();
            while (it2.hasNext()) {
                File file = new File(it2.next());
                if (!file.exists()) {
                    throw new UserError(this, "file.not_exists", new Object[]{file.getAbsolutePath()});
                }
            }
            ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
            ConcurrentLinkedQueue concurrentLinkedQueue2 = new ConcurrentLinkedQueue();
            SparkTools.startMonitoringThread(this, concurrentLinkedQueue, concurrentLinkedQueue2, true);
            LogService.getRoot().fine("(" + getName() + ") " + script);
            try {
                SparkJobResult runSparkScript = getSparkHandler().runSparkScript(this, concurrentLinkedQueue, concurrentLinkedQueue2, valueOf, appendR.toString(), hashSet, hashSet2, parameterAsBoolean);
                SparkTools.SparkFinalState finalState = runSparkScript.getFinalState();
                String applicationId = runSparkScript.getApplicationId();
                checkForStop();
                ArrayList arrayList3 = new ArrayList();
                if (arrayList.isEmpty()) {
                    if (finalState.equals(SparkTools.SparkFinalState.FAILED)) {
                        LogCollectionTools.logStdOut(applicationId, LogCollectionMethod.FIRST_CONTAINER, getHadoopContext());
                        if (parameterAsBoolean) {
                            LogService.getRoot().warning(HINT_HIVE_ACCESS);
                        }
                        throw new UserError(this, "spark.script.script_execution_failed");
                    }
                    return;
                }
                for (int i = 0; i < arrayList.size(); i++) {
                    LinkedHashMap linkedHashMap = new LinkedHashMap();
                    TempHDFSDirectory tempHDFSDirectory = arrayList2.get(i);
                    TempHDFSDirectory tempHDFSDirectory2 = arrayList.get(i);
                    try {
                        tempHDFSDirectory.cleanNonDataFiles();
                        if (!finalState.equals(SparkTools.SparkFinalState.SUCCEEDED)) {
                            if (finalState.equals(SparkTools.SparkFinalState.FAILED)) {
                                LogCollectionTools.logStdOut(applicationId, LogCollectionMethod.FIRST_CONTAINER, getHadoopContext());
                                if (parameterAsBoolean) {
                                    LogService.getRoot().warning(HINT_HIVE_ACCESS);
                                }
                                throw new UserError(this, "spark.script.script_execution_failed");
                            }
                            if (finalState.equals(SparkTools.SparkFinalState.KILLED)) {
                                checkForOperationStop();
                                throw new UserError(this, "spark.job_killed");
                            }
                            TempHDFSDirectory.close(arrayList);
                            TempHDFSDirectory.close(arrayList2);
                            return;
                        }
                        if (!getMapReduceHDFSHandler().exists(tempHDFSDirectory.getFullPath()) || !getMapReduceHDFSHandler().exists(tempHDFSDirectory2.getFullPath())) {
                            LogCollectionTools.logStdOut(applicationId, LogCollectionMethod.FIRST_CONTAINER, getHadoopContext());
                            throw new UserError(this, "spark.script.output_missing");
                        }
                        MapReduceHDFSHandler mapReduceHDFSHandler = getMapReduceHDFSHandler();
                        Objects.requireNonNull(mapReduceHDFSHandler);
                        MapReduceHDFSHandler.HDFSDirectoryReader hDFSDirectoryReader = new MapReduceHDFSHandler.HDFSDirectoryReader(tempHDFSDirectory.getSubDir());
                        while (true) {
                            String readLineFromDirectory = hDFSDirectoryReader.readLineFromDirectory();
                            if (readLineFromDirectory != null) {
                                String[] parseLine = new CSVParser(';', '\"', '\\').parseLine(readLineFromDirectory);
                                if (parseLine.length <= 2 && !Strings.isNullOrEmpty(parseLine[0]) && !Strings.isNullOrEmpty(parseLine[1])) {
                                    linkedHashMap.put(parseLine[0], parseLine[1]);
                                }
                            } else {
                                if (linkedHashMap.isEmpty()) {
                                    LogCollectionTools.logStdOut(applicationId, LogCollectionMethod.FIRST_CONTAINER, getHadoopContext());
                                    throw new UserError(this, "spark.script.schema_error");
                                }
                                String tempTableName = getTempTableName();
                                arrayList3.add(tempTableName);
                                HadoopExampleSetFactory.createHiveTable(getHiveHandler(), tempTableName, linkedHashMap, "STORED AS " + FileFormatHive.PARQUET.toString());
                                tempHDFSDirectory2.loadDataIntoHive(getHiveHandler(), tempTableName, true, true);
                            }
                        }
                        LogCollectionTools.logStdOut(applicationId, LogCollectionMethod.FIRST_CONTAINER, getHadoopContext());
                        throw new UserError(this, "spark.script.schema_error");
                    } catch (IOException e) {
                        throw new UserError(this, 1504, new Object[]{RadoopTools.formatOperatorExceptionMessage(null, e)});
                    }
                }
                for (int i2 = 0; i2 < getOutputPorts().getNumberOfConnectedPorts(); i2++) {
                    Map<String, NominalMapping> map = null;
                    if (getParameterAsBoolean("preserve_binominal_mappings")) {
                        map = hesFromInputPort.get(0).buildNewMappings();
                    }
                    createExampleSet((OutputPort) getOutputPorts().getPortByIndex(i2), (String) arrayList3.get(i2), null, true, null, null, null, null, map, new HadoopExampleSet[0]);
                }
                TempHDFSDirectory.close(arrayList);
                TempHDFSDirectory.close(arrayList2);
            } catch (IOException e2) {
                throw RadoopTools.formattedOperatorException("Unable to run the Spark job.", e2);
            }
        } finally {
            TempHDFSDirectory.close(arrayList);
            TempHDFSDirectory.close(arrayList2);
        }
    }

    private SparkScriptLanguage getLanguage() throws UndefinedParameterError {
        return SparkScriptLanguage.valueOf(getParameterAsString(PARAMETER_LANGUAGE).toUpperCase());
    }

    private String getScript() throws OperatorException {
        switch (getLanguage()) {
            case PYTHON:
                return getParameterAsString(PARAMETER_SCRIPT_PYTHON);
            case R:
                return getParameterAsString(PARAMETER_SCRIPT_R);
            default:
                throw new OperatorException("Illegal language parameter!");
        }
    }

    public StringBuilder appendR(String str, List<TempHDFSDirectory> list, List<TempHDFSDirectory> list2, List<HadoopExampleSet> list3, OutputPorts outputPorts, SparkVersion sparkVersion) throws OperatorException {
        StringBuilder sb = new StringBuilder();
        sb.append("library(SparkR)" + LF);
        sb.append("rm_hive_keywords <-c(\"");
        sb.append(Joiner.on("\", \"").join((Iterable<?>) HiveStaticUtils.getKeyWords()));
        sb.append("\")" + LF + LF);
        sb.append("sc <- sparkR.session()" + LF);
        sb.append("get_canonical_name <- function(name, default_prefix_underscore=\"attribute\"){" + LF + "\tif (is.null(name) || is.na(name) || length(name) == 0){" + LF + "\t\treturn(name)" + LF + "\t}" + LF + "\tif (length(grep(\"^[0-9]+$\", name)) > 0 || (toupper(name) %in% rm_hive_keywords)){" + LF + "\t\treturn(paste(name, \"_\", sep=\"\"))" + LF + "\t}" + LF + "\tcanonical_name = gsub(\"[^0-9a-zA-Z]\", \"_\", name)" + LF + "\tif (substring(canonical_name, 1, 1) == '_'){" + LF + "\t\tcanonical_name = paste(default_prefix_underscore, canonical_name, sep=\"\")" + LF + "\t}" + LF + "\treturn(tolower(canonical_name))" + LF + "}" + LF);
        sb.append("canonize_field_names <- function(df){" + LF + "\tcorrected_df = df" + LF + "\tfor (field in schema(df)$fields()){" + LF + "\t\tfield_name = field$name()" + LF + "\t\tcanonical_name = get_canonical_name(field_name)" + LF + "\t\tif (field_name != canonical_name){" + LF + "\t\t\tcorrected_df = withColumnRenamed(corrected_df, field_name, canonical_name)" + LF + "\t\t}" + LF + "\t}" + LF + "\treturn(corrected_df)" + LF + "}" + LF);
        for (int i = 1; i <= list3.size(); i++) {
            sb.append("df" + i + " <- read.parquet(\"" + list3.get(i - 1).getSparkInputPathSpecification() + "\")" + LF);
        }
        sb.append(str + LF);
        boolean z = true;
        sb.append("results <- rm_main(");
        for (int i2 = 1; i2 <= list3.size(); i2++) {
            if (!z) {
                sb.append(",");
            }
            z = false;
            sb.append("df" + i2);
        }
        sb.append(")" + LF);
        if (outputPorts.getNumberOfConnectedPorts() == 0) {
            return sb;
        }
        sb.append("outputs = c()" + LF + "if (class(results) != \"list\"){" + LF + "\toutputs = c(results)" + LF + "} else {" + LF + "\toutputs = results" + LF + "}" + LF);
        for (int i3 = 1; i3 <= outputPorts.getNumberOfConnectedPorts(); i3++) {
            sb.append("if (class(outputs[[" + i3 + "]]) != \"SparkDataFrame\"){" + LF + "\tstop(paste(\"Error: outputs[[" + i3 + "]] is not an instance of DataFrame, but \", class(outputs[[" + i3 + "]]), sep=\"\"))" + LF + "}" + LF);
            sb.append("canonized_output = canonize_field_names(outputs[[" + i3 + "]])" + LF);
            TempHDFSDirectory tempHdfsDirectory = getTempHdfsDirectory();
            sb.append("write.df(canonized_output,\"" + tempHdfsDirectory.getFullPath() + "\",\"parquet\",\"overwrite\")" + LF);
            TempHDFSDirectory tempHdfsDirectory2 = getTempHdfsDirectory();
            sb.append("schemaInfo = sapply(schema(canonized_output)$fields(), function(field) { paste(field$name(),\";\",field$dataType.simpleString(),sep = \"\") })" + LF);
            sb.append(createSaveSchemaToHDFSRCall(tempHdfsDirectory2, sparkVersion) + LF);
            list.add(tempHdfsDirectory);
            list2.add(tempHdfsDirectory2);
        }
        sb.append("sparkR.stop()" + LF);
        return sb;
    }

    private String createSaveSchemaToHDFSRCall(TempHDFSDirectory tempHDFSDirectory, SparkVersion sparkVersion) {
        return "write.text(repartition(createDataFrame(as.list(schemaInfo)), numPartitions=1), \"" + tempHDFSDirectory.getFullPath() + "\")";
    }

    public StringBuilder appendPython(SparkVersion sparkVersion, String str, List<TempHDFSDirectory> list, List<TempHDFSDirectory> list2, List<HadoopExampleSet> list3, OutputPorts outputPorts) throws OperatorException {
        StringBuilder sb = new StringBuilder();
        sb.append("from pyspark.sql import SparkSession" + LF).append("from pyspark import SparkContext, SQLContext, SparkConf" + LF + "from pyspark.sql.dataframe import DataFrame" + LF + "import re" + LF + "from pyspark.sql.functions import col" + LF + str + LF);
        sb.append("__keywords = set([\"");
        sb.append(Joiner.on("\", \"").join((Iterable<?>) HiveStaticUtils.getKeyWords()));
        sb.append("\"])" + LF + LF);
        sb.append("def get_canonical_name(name, default_prefix_underscore=\"attribute\"):" + LF + "\tif name is None or len(name) == 0:" + LF + "\t\treturn name" + LF + "\tif re.match(\"[0-9]+$\", name) or name.upper() in __keywords:" + LF + "\t\treturn \"{0}_\".format(name)" + LF + "\tcanonical_name = re.sub('[^0-9a-zA-Z]', '_', name)" + LF + "\tif canonical_name[0] == '_':" + LF + "\t\tcanonical_name = \"{0}{1}\".format(default_prefix_underscore, canonical_name)" + LF + "\treturn canonical_name.lower()" + LF + LF);
        if (getParameterAsBoolean(PARAMETER_ENABLE_HIVE_ACCESS)) {
            sb.append("sparkSession = SparkSession.builder.enableHiveSupport().getOrCreate()" + LF);
        } else {
            sb.append("sparkSession = SparkSession.builder.getOrCreate()" + LF);
        }
        sb.append("sparkSession.catalog.setCurrentDatabase(\"" + getHadoopContext().getHiveConfiguration().getHiveDB() + "\")" + LF);
        sb.append("conf = sparkSession.conf" + LF + "sc = sparkSession.sparkContext" + LF + "sqlContext = SQLContext(sc)" + LF);
        for (int i = 1; i <= list3.size(); i++) {
            sb.append("df" + i + " = sparkSession.read.load(\"" + list3.get(i - 1).getSparkInputPathSpecification() + "\")" + LF);
        }
        boolean z = true;
        sb.append("results = rm_main(");
        for (int i2 = 1; i2 <= list3.size(); i2++) {
            if (!z) {
                sb.append(",");
            }
            z = false;
            sb.append("df" + i2);
        }
        sb.append(")" + LF);
        if (outputPorts.getNumberOfConnectedPorts() == 0) {
            return sb;
        }
        sb.append("if (isinstance(results,list) or isinstance(results,tuple)):" + LF);
        for (int i3 = 1; i3 <= outputPorts.getNumberOfConnectedPorts(); i3++) {
            sb.append("\tresult" + i3 + " = results[" + (i3 - 1) + "]" + LF);
        }
        sb.append("elif (isinstance(results, DataFrame)):" + LF + "\tresult1 = results" + LF + "else:" + LF + "\traise TypeError('Returned value is not a DataFrame or an Array of DataFrames.')" + LF);
        for (int i4 = 1; i4 <= outputPorts.getNumberOfConnectedPorts(); i4++) {
            TempHDFSDirectory tempHdfsDirectory = getTempHdfsDirectory();
            sb.append("columnNames = [f.name for f in result" + i4 + ".schema.fields]" + LF + "columnsWithCanonizedAliases = [col(c).alias(get_canonical_name(c)) for c in columnNames]" + LF + "result" + i4 + " = result" + i4 + ".select(columnsWithCanonizedAliases)" + LF + "result" + i4 + ".write.save(\"" + tempHdfsDirectory.getFullPath() + "\", format=\"parquet\")" + LF);
            TempHDFSDirectory tempHdfsDirectory2 = getTempHdfsDirectory();
            sb.append("cols = [f.name + \";\" + f.dataType.simpleString() for f in result" + i4 + ".schema.fields]" + LF + "sc.parallelize(cols).coalesce(1).saveAsTextFile(\"" + tempHdfsDirectory2.getFullPath() + "\")" + LF);
            list.add(tempHdfsDirectory);
            list2.add(tempHdfsDirectory2);
        }
        sb.append("sparkSession.stop()" + LF);
        return sb;
    }

    public static String getResource(Class cls, String str) throws IOException {
        if (Files.exists(Paths.get(str, new String[0]), new LinkOption[0])) {
            return ((String) Files.readAllLines(Paths.get(str, new String[0]), Charset.defaultCharset()).stream().collect(Collectors.joining(LF))).trim();
        }
        InputStream resourceAsStream = cls.getResourceAsStream(str);
        try {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(resourceAsStream, Charset.defaultCharset()));
            try {
                String trim = ((String) bufferedReader.lines().collect(Collectors.joining(LF))).trim();
                bufferedReader.close();
                if (resourceAsStream != null) {
                    resourceAsStream.close();
                }
                return trim;
            } finally {
            }
        } catch (Throwable th) {
            if (resourceAsStream != null) {
                try {
                    resourceAsStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public List<ParameterType> getParameterTypes() {
        List<ParameterType> parameterTypes = super.getParameterTypes();
        parameterTypes.add(new ParameterTypeCategory(PARAMETER_LANGUAGE, "Selects the language for the Spark script.", LANGUAGES, SparkScriptLanguage.PYTHON.ordinal(), false));
        ParameterTypeText parameterTypeText = new ParameterTypeText(PARAMETER_SCRIPT_PYTHON, HELP_TEXT_PY_SCRIPT, TextType.PYTHON);
        parameterTypeText.setExpert(false);
        try {
            parameterTypeText.setTemplateText(getResource(getClass(), DEFAULT_SPARK_PYTHON_SCRIPT_LOCATION));
        } catch (IOException e) {
            logError("Error with getting example spark script python");
        }
        parameterTypeText.registerDependencyCondition(new EqualTypeCondition(this, PARAMETER_LANGUAGE, LANGUAGES, true, new int[]{SparkScriptLanguage.PYTHON.ordinal()}));
        parameterTypes.add(parameterTypeText);
        ParameterTypeText parameterTypeText2 = new ParameterTypeText(PARAMETER_SCRIPT_R, HELP_TEXT_R_SCRIPT, TextType.R);
        parameterTypeText2.setExpert(false);
        try {
            parameterTypeText2.setTemplateText(getResource(getClass(), DEFAULT_SPARK_R_SCRIPT_LOCATION));
        } catch (IOException e2) {
            logError("Error with getting example spark script R");
        }
        parameterTypeText2.registerDependencyCondition(new EqualTypeCondition(this, PARAMETER_LANGUAGE, LANGUAGES, true, new int[]{SparkScriptLanguage.R.ordinal()}));
        parameterTypes.add(parameterTypeText2);
        parameterTypes.add(new ParameterTypeBoolean(PARAMETER_ENABLE_HIVE_ACCESS, "Enables access to Hive via HiveContext (Spark 1.x) or SparkSession (Spark 2.x). Can be enabled in both languages.", false, false));
        ParameterTypeFile parameterTypeFile = new ParameterTypeFile(PARAMETER_ADDITIONAL_LOCAL_SOURCE_PY_OLD, "A local file that is used in the script as a dependency.", true, new String[]{"py", ArchiveStreamFactory.ZIP, ArchiveStreamFactory.JAR});
        parameterTypeFile.registerDependencyCondition(new EqualOperatorVersionCondition(this, VERSION_SAME_PARAM_NAME));
        parameterTypeFile.setAddAllFileExtensionsFilter(true);
        ParameterTypeEnumeration parameterTypeEnumeration = new ParameterTypeEnumeration(PARAMETER_ADDITIONAL_LOCAL_SOURCES_PY_OLD, HELP_TEXT_ADDITIONAL_LOCAL_PY, parameterTypeFile, true);
        parameterTypeEnumeration.registerDependencyCondition(new EqualTypeCondition(this, PARAMETER_LANGUAGE, LANGUAGES, false, new int[]{SparkScriptLanguage.PYTHON.ordinal()}));
        parameterTypeEnumeration.registerDependencyCondition(new EqualOperatorVersionCondition(this, VERSION_SAME_PARAM_NAME));
        parameterTypeEnumeration.setOptional(true);
        parameterTypes.add(parameterTypeEnumeration);
        ParameterTypeFile parameterTypeFile2 = new ParameterTypeFile(PARAMETER_ADDITIONAL_LOCAL_SOURCE_R_OLD, "A local file that is used in the script as a dependency.", true, new String[]{"R", ArchiveStreamFactory.ZIP, ArchiveStreamFactory.JAR});
        parameterTypeFile2.registerDependencyCondition(new EqualOperatorVersionCondition(this, VERSION_SAME_PARAM_NAME));
        parameterTypeFile2.setAddAllFileExtensionsFilter(true);
        ParameterTypeEnumeration parameterTypeEnumeration2 = new ParameterTypeEnumeration(PARAMETER_ADDITIONAL_LOCAL_SOURCES_R_OLD, HELP_TEXT_ADDITIONAL_LOCAL_R, parameterTypeFile2, true);
        parameterTypeEnumeration2.registerDependencyCondition(new EqualTypeCondition(this, PARAMETER_LANGUAGE, LANGUAGES, false, new int[]{SparkScriptLanguage.R.ordinal()}));
        parameterTypeEnumeration2.registerDependencyCondition(new EqualOperatorVersionCondition(this, VERSION_SAME_PARAM_NAME));
        parameterTypeEnumeration2.setOptional(true);
        parameterTypes.add(parameterTypeEnumeration2);
        ParameterTypeEnumeration parameterTypeEnumeration3 = new ParameterTypeEnumeration(PARAMETER_ADDITIONAL_HDFS_SOURCES_R_OLD, HELP_TEXT_ADDITIONAL_HDFS, new ParameterTypeString(PARAMETER_ADDITIONAL_HDFS_SOURCE_R_OLD, "A file or directory on the HDFS that is used in the script as a dependency."), true);
        parameterTypeEnumeration3.registerDependencyCondition(new EqualTypeCondition(this, PARAMETER_LANGUAGE, LANGUAGES, false, new int[]{SparkScriptLanguage.R.ordinal()}));
        parameterTypeEnumeration3.registerDependencyCondition(new EqualOperatorVersionCondition(this, VERSION_SAME_PARAM_NAME));
        parameterTypeEnumeration3.setOptional(true);
        parameterTypes.add(parameterTypeEnumeration3);
        ParameterTypeEnumeration parameterTypeEnumeration4 = new ParameterTypeEnumeration(PARAMETER_ADDITIONAL_HDFS_SOURCES_PY_OLD, HELP_TEXT_ADDITIONAL_HDFS, new ParameterTypeString(PARAMETER_ADDITIONAL_HDFS_SOURCE_PY_OLD, "A file or directory on the HDFS that is used in the script as a dependency."), true);
        parameterTypeEnumeration4.registerDependencyCondition(new EqualTypeCondition(this, PARAMETER_LANGUAGE, LANGUAGES, false, new int[]{SparkScriptLanguage.PYTHON.ordinal()}));
        parameterTypeEnumeration4.registerDependencyCondition(new EqualOperatorVersionCondition(this, VERSION_SAME_PARAM_NAME));
        parameterTypeEnumeration4.setOptional(true);
        parameterTypes.add(parameterTypeEnumeration4);
        ParameterTypeFile parameterTypeFile3 = new ParameterTypeFile(PARAMETER_ADDITIONAL_LOCAL_SOURCE_PY, "A local file that is used in the script as a dependency.", true, new String[]{"py", ArchiveStreamFactory.ZIP});
        parameterTypeFile3.registerDependencyCondition(new AboveOperatorVersionCondition(this, VERSION_SAME_PARAM_NAME));
        parameterTypeFile3.setAddAllFileExtensionsFilter(true);
        ParameterTypeEnumeration parameterTypeEnumeration5 = new ParameterTypeEnumeration(PARAMETER_ADDITIONAL_LOCAL_SOURCES_PY, HELP_TEXT_ADDITIONAL_LOCAL_PY, parameterTypeFile3, true);
        parameterTypeEnumeration5.registerDependencyCondition(new EqualTypeCondition(this, PARAMETER_LANGUAGE, LANGUAGES, false, new int[]{SparkScriptLanguage.PYTHON.ordinal()}));
        parameterTypeEnumeration5.registerDependencyCondition(new AboveOperatorVersionCondition(this, VERSION_SAME_PARAM_NAME));
        parameterTypeEnumeration5.setOptional(true);
        parameterTypes.add(parameterTypeEnumeration5);
        ParameterTypeFile parameterTypeFile4 = new ParameterTypeFile(PARAMETER_ADDITIONAL_LOCAL_SOURCE_R, "A local file that is used in the script as a dependency.", true, new String[]{"R", ArchiveStreamFactory.ZIP});
        parameterTypeFile4.registerDependencyCondition(new AboveOperatorVersionCondition(this, VERSION_SAME_PARAM_NAME));
        parameterTypeFile4.setAddAllFileExtensionsFilter(true);
        ParameterTypeEnumeration parameterTypeEnumeration6 = new ParameterTypeEnumeration(PARAMETER_ADDITIONAL_LOCAL_SOURCES_R, HELP_TEXT_ADDITIONAL_LOCAL_R, parameterTypeFile4, true);
        parameterTypeEnumeration6.registerDependencyCondition(new EqualTypeCondition(this, PARAMETER_LANGUAGE, LANGUAGES, false, new int[]{SparkScriptLanguage.R.ordinal()}));
        parameterTypeEnumeration6.registerDependencyCondition(new AboveOperatorVersionCondition(this, VERSION_SAME_PARAM_NAME));
        parameterTypeEnumeration6.setOptional(true);
        parameterTypes.add(parameterTypeEnumeration6);
        ParameterTypeEnumeration parameterTypeEnumeration7 = new ParameterTypeEnumeration(PARAMETER_ADDITIONAL_HDFS_SOURCES_R, HELP_TEXT_ADDITIONAL_HDFS, new ParameterTypeString(PARAMETER_ADDITIONAL_HDFS_SOURCE_R, "A file or directory on the HDFS that is used in the script as a dependency"), true);
        parameterTypeEnumeration7.registerDependencyCondition(new EqualTypeCondition(this, PARAMETER_LANGUAGE, LANGUAGES, false, new int[]{SparkScriptLanguage.R.ordinal()}));
        parameterTypeEnumeration7.registerDependencyCondition(new AboveOperatorVersionCondition(this, VERSION_SAME_PARAM_NAME));
        parameterTypeEnumeration7.setOptional(true);
        parameterTypes.add(parameterTypeEnumeration7);
        ParameterTypeEnumeration parameterTypeEnumeration8 = new ParameterTypeEnumeration(PARAMETER_ADDITIONAL_HDFS_SOURCES_PY, HELP_TEXT_ADDITIONAL_HDFS, new ParameterTypeString(PARAMETER_ADDITIONAL_HDFS_SOURCE_PY, "A file or directory on the HDFS that is used in the script as a dependency."), true);
        parameterTypeEnumeration8.registerDependencyCondition(new EqualTypeCondition(this, PARAMETER_LANGUAGE, LANGUAGES, false, new int[]{SparkScriptLanguage.PYTHON.ordinal()}));
        parameterTypeEnumeration8.registerDependencyCondition(new AboveOperatorVersionCondition(this, VERSION_SAME_PARAM_NAME));
        parameterTypeEnumeration8.setOptional(true);
        parameterTypes.add(parameterTypeEnumeration8);
        parameterTypes.add(new ParameterTypeBoolean("preserve_binominal_mappings", "Keep the mappings of the binominal attributes. If set to false, they will be converted to nominals. If true, you you should not introduce new values to the binominal attribute other than the positive value, the negative value and missings.", false, true));
        return parameterTypes;
    }

    public void getAdditionalDirParams(Set<String> set, Set<String> set2) throws UndefinedParameterError {
        switch (getLanguage()) {
            case PYTHON:
                if (getCompatibilityLevel().isAtMost(VERSION_SAME_PARAM_NAME)) {
                    set.addAll(Arrays.asList(ParameterTypeEnumeration.transformString2Enumeration(getParameterAsString(PARAMETER_ADDITIONAL_LOCAL_SOURCES_PY_OLD))));
                    set2.addAll(Arrays.asList(ParameterTypeEnumeration.transformString2Enumeration(getParameterAsString(PARAMETER_ADDITIONAL_HDFS_SOURCES_PY_OLD))));
                    return;
                } else {
                    set.addAll(Arrays.asList(ParameterTypeEnumeration.transformString2Enumeration(getParameterAsString(PARAMETER_ADDITIONAL_LOCAL_SOURCES_PY))));
                    set2.addAll(Arrays.asList(ParameterTypeEnumeration.transformString2Enumeration(getParameterAsString(PARAMETER_ADDITIONAL_HDFS_SOURCES_PY))));
                    return;
                }
            case R:
                if (getCompatibilityLevel().isAtMost(VERSION_SAME_PARAM_NAME)) {
                    set.addAll(Arrays.asList(ParameterTypeEnumeration.transformString2Enumeration(getParameterAsString(PARAMETER_ADDITIONAL_LOCAL_SOURCES_R_OLD))));
                    set2.addAll(Arrays.asList(ParameterTypeEnumeration.transformString2Enumeration(getParameterAsString(PARAMETER_ADDITIONAL_HDFS_SOURCES_R_OLD))));
                    return;
                } else {
                    set.addAll(Arrays.asList(ParameterTypeEnumeration.transformString2Enumeration(getParameterAsString(PARAMETER_ADDITIONAL_LOCAL_SOURCES_R))));
                    set2.addAll(Arrays.asList(ParameterTypeEnumeration.transformString2Enumeration(getParameterAsString(PARAMETER_ADDITIONAL_HDFS_SOURCES_R))));
                    return;
                }
            default:
                return;
        }
    }

    public OperatorVersion[] getIncompatibleVersionChanges() {
        return new OperatorVersion[]{VERSION_SAME_PARAM_NAME};
    }
}
