package eu.radoop.connections.service.test.integration;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rapidminer.operator.OperatorException;
import com.rapidminer.tools.LogService;
import eu.radoop.client.rest.RadoopProxyAppRestClient;
import eu.radoop.connections.RadoopConnectionEntry;
import eu.radoop.connections.service.test.AbstractRadoopTest;
import eu.radoop.connections.service.test.KillableIntegrationTest;
import eu.radoop.connections.service.test.RadoopTest;
import eu.radoop.connections.service.test.RadoopTestContext;
import eu.radoop.connections.service.test.RadoopTestType;
import eu.radoop.datahandler.HadoopContext;
import eu.radoop.datahandler.mapreducehdfs.MapReduceHDFSHandler;
import eu.radoop.exception.ConnectionException;
import eu.radoop.operator.spark.SparkScript;
import eu.radoop.operator.spark.SparkTools;
import eu.radoop.spark.ScriptLibServiceHolder;
import eu.radoop.spark.SparkHandler;
import eu.radoop.spark.SparkJobResult;
import eu.radoop.spark.SparkOperation;
import eu.radoop.spark.script.SparkScriptLanguage;
import eu.radoop.spark.script.SparkScriptLibService;
import eu.radoop.transfer.parameter.ParameterTransferObject;
import eu.radoop.transfer.parameter.PySparkTestParameter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;

/* loaded from: input_file:eu/radoop/connections/service/test/integration/TestPySpark.class */
public class TestPySpark extends AbstractRadoopTest {
    private static final String SYS_VERSION_KEY = "sys.version = ";
    private static final String SYS_PATH_KEY = "sys.path = ";
    private static final String OS_ENVIRON_KEY = "os.environ = ";
    private static final String PYSPARK_KEY = "radoop-pysp-json = ";
    private static final String PY4J_KEY = "radoop-py4j-json = ";
    private static final String FILE_LOCATION_JSON_KEY = "fullPath";
    private static String LOG_EMPTY_MESSAGE = "<Logs empty - check if log aggregation for cluster is enabled>";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:eu/radoop/connections/service/test/integration/TestPySpark$PyFiles.class */
    public static class PyFiles {
        Set<String> pySparkPaths = new HashSet();

        private PyFiles(String str, String str2) {
            this.pySparkPaths.add(str2);
            this.pySparkPaths.add(str);
        }

        private PyFiles(Collection<String> collection) {
            this.pySparkPaths.addAll(collection);
        }

        private String localWrap(String str) {
            return !str.startsWith(SparkScriptLibService.REPLACEMENT_RADOOP_PATH) ? String.format("local://%s", str) : str;
        }

        private String wrapAndConcat() {
            return (String) this.pySparkPaths.stream().map(this::localWrap).collect(Collectors.joining(","));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:eu/radoop/connections/service/test/integration/TestPySpark$TestScript.class */
    public enum TestScript {
        PYSPARK_PI_TEST("Radoop-PySpark-PI-Test", "/eu/radoop/connections/service/test/integration/PySparkPiTest.py");

        protected final String resourceLocation;
        protected final String appName;

        TestScript(String str, String str2) {
            this.appName = str;
            this.resourceLocation = str2;
        }
    }

    public SparkJobResult runScript(TestScript testScript, MapReduceHDFSHandler mapReduceHDFSHandler, SparkHandler sparkHandler) throws IOException, OperatorException {
        String resource = SparkScript.getResource(mapReduceHDFSHandler.getClass(), testScript.resourceLocation);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        ConcurrentLinkedQueue concurrentLinkedQueue2 = new ConcurrentLinkedQueue();
        try {
            SparkTools.startMonitoringThread(createTestOperation(testScript.appName, concurrentLinkedQueue2), concurrentLinkedQueue, concurrentLinkedQueue2, false);
            SparkJobResult runSparkScript = sparkHandler.runSparkScript(testScript.appName, (Queue<String>) concurrentLinkedQueue, (Queue<String>) concurrentLinkedQueue2, SparkScriptLanguage.PYTHON, resource, (Set<String>) new HashSet(), (Set<String>) new HashSet(), false);
            getTestContext().addApplicationId(runSparkScript.getApplicationId(), getType());
            return runSparkScript;
        } catch (Exception e) {
            LogService.getRoot().log(Level.INFO, e, () -> {
                return String.format("Exception during Pyspark test of %s, was using python libraries of [%s]", this, mapReduceHDFSHandler.getSparkScriptDependencies(false, SparkScriptLanguage.PYTHON));
            });
            throw e;
        }
    }

    private TestPySpark(RadoopTestContext radoopTestContext) throws ConnectionException {
        super(RadoopTestType.PYSPARK_TESTS, radoopTestContext);
    }

    public static TestPySpark create(RadoopTestContext radoopTestContext) throws ConnectionException {
        return new TestPySpark(radoopTestContext);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public RadoopTest.RadoopTestStatus call() throws Exception {
        SparkJobResult runScript;
        MapReduceHDFSHandler mapReduceHDFSHandler = getTestContext().getMapReduceHDFSHandler();
        SparkHandler sparkHandler = getTestContext().getSparkHandler();
        HashSet hashSet = new HashSet();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        ConcurrentLinkedQueue concurrentLinkedQueue2 = new ConcurrentLinkedQueue();
        ParameterTransferObject parameterTransferObject = new ParameterTransferObject();
        parameterTransferObject.setParameter(PySparkTestParameter.SEARCH_PATH, ScriptLibServiceHolder.INSTANCE.getSparkScriptDefaultSearchPath().toArray(new String[0]));
        KillableIntegrationTest createTestOperation = createTestOperation("PySpark detect", concurrentLinkedQueue);
        SparkTools.startMonitoringThread(createTestOperation, concurrentLinkedQueue2, concurrentLinkedQueue, false);
        SparkJobResult runSpark = sparkHandler.runSpark(createTestOperation, null, SparkOperation.SparkSystemConfigurationDetection, concurrentLinkedQueue2, concurrentLinkedQueue, new HashMap(), parameterTransferObject, null);
        getTestContext().addApplicationId(runSpark.getApplicationId(), getType());
        hashSet.add(runSpark.getApplicationId());
        String logs = SparkTools.getLogs(mapReduceHDFSHandler, runSpark.getApplicationId(), "Ending SparkSystemConfigurationDetection");
        LogService.getRoot().info(() -> {
            Object[] objArr = new Object[1];
            objArr[0] = StringUtils.isEmpty(logs) ? LOG_EMPTY_MESSAGE : logs;
            return String.format("Basic SparkContext System information -----%n %s %n -----%n", objArr);
        });
        SparkTools.checkSparkLogForCommonErrors(logs, getTestContext());
        PyFiles pySparkFilesFromLogs = getPySparkFilesFromLogs(logs, mapReduceHDFSHandler.getConnectionEntry());
        log(2, "Checking with Spark Python Pi example.");
        try {
            runScript = runScript(TestScript.PYSPARK_PI_TEST, mapReduceHDFSHandler, sparkHandler);
        } catch (Exception e) {
            log(8, "Exception with running PySpark tests");
            logStackTrace(e);
        }
        if (SparkTools.SparkFinalState.SUCCEEDED == runScript.getFinalState()) {
            return RadoopTest.RadoopTestStatus.SUCCESS;
        }
        log(8, String.format("Issue with PySpark tests see logs for job %s", runScript.getApplicationId()));
        if (pySparkFilesFromLogs != null) {
            log(8, String.format("Investigating if auto-detected Python libraries will work, files[%s]", pySparkFilesFromLogs.wrapAndConcat()));
            RadoopConnectionEntry copyEntry = RadoopConnectionEntry.copyEntry(getTestContext().getConnection());
            copyEntry.setUseCustomPySparkLocation(true);
            copyEntry.setCustomPySparkLocation(pySparkFilesFromLogs.wrapAndConcat());
            HadoopContext hadoopContext = new HadoopContext(copyEntry);
            MapReduceHDFSHandler mapReduceHDFSHandler2 = new MapReduceHDFSHandler(hadoopContext);
            SparkHandler sparkHandler2 = new SparkHandler(hadoopContext, new RadoopProxyAppRestClient(hadoopContext.getConnectionEntry().getProxyAppServer(), hadoopContext.getConnectionEntry().getAdvancedRadoopProxyParameters()));
            log(4, String.format("Testing with pyspark.zip and py4j-src.zip located at [%s]%n", pySparkFilesFromLogs.wrapAndConcat()));
            if (SparkTools.SparkFinalState.SUCCEEDED == runScript(TestScript.PYSPARK_PI_TEST, mapReduceHDFSHandler2, sparkHandler2).getFinalState()) {
                log(8, String.format("Change on Spark Tab - 'Use Custom PySpark Locations' and provide [%s] 'Custom PySpark Archive Path'", pySparkFilesFromLogs.wrapAndConcat()));
            } else {
                log(8, String.format("Failed to find pyspark.zip and py4j-<version>-src.zip that pass test (look at Spark logs appIds of %s for further suggestions), try to manually find them on cluster and apply to Spark Tab's 'Use Custom PySpark Locations' settings", hashSet));
            }
        } else {
            log(8, "Could not auto-detect alternative Python libraries on cluster, have to manually find.");
        }
        return RadoopTest.RadoopTestStatus.ERROR;
    }

    private PyFiles getPySparkFilesFromLogs(String str, RadoopConnectionEntry radoopConnectionEntry) throws IOException {
        String findLineSubString = findLineSubString(str, PYSPARK_KEY, 0);
        String findLineSubString2 = findLineSubString(str, PY4J_KEY, 0);
        if (findLineSubString2 != null && findLineSubString != null) {
            ObjectMapper objectMapper = new ObjectMapper();
            Map map = (Map) objectMapper.readValue(findLineSubString, new TypeReference<HashMap<String, String>>() { // from class: eu.radoop.connections.service.test.integration.TestPySpark.1
            });
            Map map2 = (Map) objectMapper.readValue(findLineSubString2, new TypeReference<HashMap<String, String>>() { // from class: eu.radoop.connections.service.test.integration.TestPySpark.2
            });
            if (map.containsKey(FILE_LOCATION_JSON_KEY) && map2.containsKey(FILE_LOCATION_JSON_KEY)) {
                String str2 = (String) map.get(FILE_LOCATION_JSON_KEY);
                String str3 = (String) map2.get(FILE_LOCATION_JSON_KEY);
                if (StringUtils.isNotEmpty(str2) && StringUtils.isNotEmpty(str3)) {
                    return new PyFiles(str2, str3);
                }
            }
        }
        Set<String> findScriptGenericDependencies = radoopConnectionEntry.getSparkVersion().findScriptGenericDependencies(radoopConnectionEntry.getHadoopVersion().getId(), SparkScriptLanguage.PYTHON);
        if (splitToSet(radoopConnectionEntry.getCustomPySparkLocation()).containsAll(findScriptGenericDependencies)) {
            return null;
        }
        return new PyFiles(findScriptGenericDependencies);
    }

    private Set<String> splitToSet(String str) {
        return str != null ? (Set) Arrays.stream(str.split(",")).collect(Collectors.toSet()) : new HashSet();
    }

    private String findLineSubString(String str, String str2, int i) {
        int indexOf;
        if (str == null || (indexOf = str.indexOf(str2, i)) <= -1) {
            return null;
        }
        int indexOf2 = str.indexOf("\n", str2.length() + indexOf);
        if (indexOf2 < 0) {
            indexOf2 = str.length() - 1;
        }
        return str.substring(str2.length() + indexOf, indexOf2).trim();
    }
}
