package eu.radoop.connections.service.test.integration;

import com.google.common.base.Strings;
import com.rapidminer.operator.OperatorException;
import com.rapidminer.tools.LogService;
import eu.radoop.connections.RadoopConnectionEntry;
import eu.radoop.connections.service.test.AbstractRadoopTest;
import eu.radoop.connections.service.test.KillableIntegrationTest;
import eu.radoop.connections.service.test.RadoopTest;
import eu.radoop.connections.service.test.RadoopTestContext;
import eu.radoop.connections.service.test.RadoopTestType;
import eu.radoop.connections.service.test.integration.TestUpload;
import eu.radoop.datahandler.HadoopContext;
import eu.radoop.datahandler.hdfs.TempHDFSDirectory;
import eu.radoop.datahandler.hdfs.TempHDFSFile;
import eu.radoop.datahandler.mapreducehdfs.MapReduceHDFSHandler;
import eu.radoop.exception.ConnectionException;
import eu.radoop.exception.InvalidConnectionException;
import eu.radoop.exception.OperationKilledException;
import eu.radoop.operator.spark.SparkTools;
import eu.radoop.spark.SparkHandler;
import eu.radoop.spark.SparkJobResult;
import eu.radoop.spark.SparkOperation;
import eu.radoop.spark.SparkVersion;
import eu.radoop.tools.LogCollectionMethod;
import eu.radoop.transfer.parameter.CommonParameter;
import eu.radoop.transfer.parameter.ParameterTransferObject;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;

/* loaded from: input_file:eu/radoop/connections/service/test/integration/TestSpark.class */
public class TestSpark extends AbstractRadoopTest {
    private static final int ATTEMPT_TO_LOG = 1;
    private static final int EXPECTED_RESULT_COUNT_IRIS = 150;
    private TempHDFSFile tempImportSourceFile;
    private TempHDFSDirectory tempOutputDir;
    final ConcurrentLinkedQueue<String> messageQueue;
    final ConcurrentLinkedQueue<String> commandQueue;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:eu/radoop/connections/service/test/integration/TestSpark$SparkTestCountJobResult.class */
    public static final class SparkTestCountJobResult {
        private String fullSCVersion;
        private int resultCount;
        private String resultAsText;

        private SparkTestCountJobResult() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:eu/radoop/connections/service/test/integration/TestSpark$TestSparkHadoopContext.class */
    public class TestSparkHadoopContext extends HadoopContext {
        public TestSparkHadoopContext(RadoopConnectionEntry radoopConnectionEntry) throws InvalidConnectionException {
            super(radoopConnectionEntry, TestSpark.this.getTestContext().getHadoopContext().getImpersonatedUser());
        }
    }

    private TestSpark(RadoopTestContext radoopTestContext) throws ConnectionException {
        super(RadoopTestType.SPARK, radoopTestContext);
        this.messageQueue = new ConcurrentLinkedQueue<>();
        this.commandQueue = new ConcurrentLinkedQueue<>();
    }

    public static RadoopTest create(RadoopTestContext radoopTestContext) throws ConnectionException {
        return new TestSpark(radoopTestContext);
    }

    @Override // eu.radoop.connections.service.test.AbstractRadoopTest, eu.radoop.connections.service.test.RadoopTest
    public boolean checkPreconditions() {
        this.tempImportSourceFile = (TempHDFSFile) getContextProperty(TestUpload.Property.IMPORT_SOURCE_FILE);
        return this.tempImportSourceFile != null;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public RadoopTest.RadoopTestStatus call() throws OperatorException, IOException {
        MapReduceHDFSHandler mapReduceHDFSHandler = getTestContext().getMapReduceHDFSHandler();
        RadoopConnectionEntry connection = getTestContext().getConnection();
        this.tempOutputDir = new TempHDFSDirectory(mapReduceHDFSHandler, getType().isCleaningEnabled());
        SparkTools.SparkFinalState sparkFinalState = SparkTools.SparkFinalState.FAILED;
        LinkedList linkedList = new LinkedList(Arrays.asList(SparkVersion.values()));
        linkedList.remove(SparkVersion.NONE);
        linkedList.remove(SparkVersion.DUMMY_VERSION);
        SparkVersion sparkVersion = connection.getSparkVersion();
        linkedList.remove(sparkVersion);
        linkedList.addFirst(sparkVersion);
        boolean z = false;
        String str = null;
        while (!linkedList.isEmpty() && sparkFinalState != SparkTools.SparkFinalState.SUCCEEDED) {
            try {
                sparkVersion = (SparkVersion) linkedList.removeFirst();
                log(2, "Assuming Spark version " + sparkVersion + ".");
                try {
                    SparkJobResult runJobWithVersion = runJobWithVersion(sparkVersion);
                    sparkFinalState = runJobWithVersion.getFinalState();
                    String applicationId = runJobWithVersion.getApplicationId();
                    if (str == null) {
                        str = applicationId;
                    }
                } catch (InvalidConnectionException e) {
                    LogService.getRoot().fine("Connection with Spark version " + sparkVersion + " is invalid, skipping that test.");
                }
            } catch (IOException e2) {
                throw new ConnectionException(getTestContext().getHadoopContext(), ConnectionException.ErrorType.SPARK_JOB, e2);
            }
        }
        if (!sparkFinalState.equals(SparkTools.SparkFinalState.SUCCEEDED)) {
            handleFailureWithAllSparkVersions(str);
        }
        SparkTestCountJobResult fetchLastJobResult = fetchLastJobResult();
        checkJobResult(fetchLastJobResult);
        log(2, "Spark context version: " + fetchLastJobResult.fullSCVersion);
        SparkVersion fromScVersionString = SparkVersion.getFromScVersionString(fetchLastJobResult.fullSCVersion);
        if (fromScVersionString == null) {
            log(5, "Unsupported Spark version detected: " + fetchLastJobResult.fullSCVersion + ". Radoop may not work seamlessly with this version, you may encounter some issues especially when using the Spark Script operator.");
            z = true;
        } else {
            SparkVersion sparkVersion2 = !sparkVersion.equals(fromScVersionString) && linkedList.contains(fromScVersionString) && testJobWorksWithInferredVersion(fromScVersionString) ? fromScVersionString : sparkVersion;
            if (!sparkVersion2.equals(connection.getSparkVersion())) {
                throw new ConnectionException(getTestContext().getHadoopContext(), ConnectionException.ErrorType.SPARK_VERSION, "Wrong Spark version detected in the connection entry: " + connection.getSparkVersion() + ". Please change it to \"" + sparkVersion2 + "\"");
            }
        }
        log(2, "Spark test job finished.");
        return z ? RadoopTest.RadoopTestStatus.WARNING : RadoopTest.RadoopTestStatus.SUCCESS;
    }

    private boolean testJobWorksWithInferredVersion(SparkVersion sparkVersion) throws IOException {
        log(2, "Testing with Spark version inferred from Spark context version: " + sparkVersion + ".");
        this.tempOutputDir.delete();
        return versionWorksCorrectly(sparkVersion);
    }

    private boolean versionWorksCorrectly(SparkVersion sparkVersion) {
        try {
            runJobWithVersion(sparkVersion);
            checkJobResult(fetchLastJobResult());
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    private SparkTestCountJobResult fetchLastJobResult() throws IOException, ConnectionException {
        this.tempOutputDir.cleanNonDataFiles();
        SparkTestCountJobResult sparkTestCountJobResult = null;
        for (String str : getTestContext().getMapReduceHDFSHandler().getFileListAsStringArray(this.tempOutputDir.getSubDir())) {
            sparkTestCountJobResult = parseJobOutput(str);
        }
        return sparkTestCountJobResult;
    }

    private void checkJobResult(SparkTestCountJobResult sparkTestCountJobResult) throws ConnectionException {
        if (sparkTestCountJobResult == null || sparkTestCountJobResult.resultAsText == null) {
            throw new ConnectionException(getTestContext().getHadoopContext(), ConnectionException.ErrorType.SPARK_JOB, "Spark test job produced no result.");
        }
        if (150 != sparkTestCountJobResult.resultCount) {
            throw new ConnectionException(getTestContext().getHadoopContext(), ConnectionException.ErrorType.SPARK_JOB, "The Spark test job generated wrong result. Please check the Spark job logs for more details.");
        }
        if (Strings.isNullOrEmpty(sparkTestCountJobResult.fullSCVersion)) {
            throw new ConnectionException(getTestContext().getHadoopContext(), ConnectionException.ErrorType.SPARK_VERSION, "Could not determine Spark version.");
        }
    }

    private SparkJobResult runJobWithVersion(SparkVersion sparkVersion) throws OperatorException, IOException, ConnectionException, InvalidConnectionException {
        SparkJobResult runTestSparkJob = runTestSparkJob(sparkVersion);
        getTestContext().addApplicationId(runTestSparkJob.getApplicationId(), getType());
        SparkTools.SparkFinalState finalState = runTestSparkJob.getFinalState();
        if (finalState == null) {
            throw new ConnectionException(getTestContext().getHadoopContext(), ConnectionException.ErrorType.SPARK_JOB, "Could not determine final application state.");
        }
        if (finalState.equals(SparkTools.SparkFinalState.KILLED)) {
            throw new OperationKilledException("Spark test cancelled.");
        }
        return runTestSparkJob;
    }

    private SparkTestCountJobResult parseJobOutput(String str) throws ConnectionException, IOException {
        MapReduceHDFSHandler mapReduceHDFSHandler = getTestContext().getMapReduceHDFSHandler();
        String substring = str.substring(str.indexOf(this.tempOutputDir.getSubDir()));
        SparkTestCountJobResult sparkTestCountJobResult = new SparkTestCountJobResult();
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(mapReduceHDFSHandler.getInputStream(mapReduceHDFSHandler.getUserDirectory() + substring), StandardCharsets.UTF_8));
        try {
            sparkTestCountJobResult.resultAsText = bufferedReader.readLine();
            bufferedReader.close();
            if (sparkTestCountJobResult != null) {
                String[] split = sparkTestCountJobResult.resultAsText.split(";");
                if (split.length != 2) {
                    throw new ConnectionException(getTestContext().getHadoopContext(), ConnectionException.ErrorType.SPARK_JOB, "The Spark job produced wrong result: " + sparkTestCountJobResult.resultAsText);
                }
                try {
                    sparkTestCountJobResult.resultCount = Integer.parseInt(split[0]);
                    sparkTestCountJobResult.fullSCVersion = split[1];
                } catch (NumberFormatException e) {
                    throw new ConnectionException(getTestContext().getHadoopContext(), ConnectionException.ErrorType.SPARK_JOB, e);
                }
            }
            return sparkTestCountJobResult;
        } catch (Throwable th) {
            try {
                bufferedReader.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void handleFailureWithAllSparkVersions(String str) throws ConnectionException {
        MapReduceHDFSHandler mapReduceHDFSHandler = getTestContext().getMapReduceHDFSHandler();
        String logDirectory = mapReduceHDFSHandler.getLogDirectory(str);
        try {
            Thread.sleep(1000L);
            SparkTools.checkSparkLogForCommonErrors(mapReduceHDFSHandler.getStdLog(logDirectory, (Integer) 1, LogCollectionMethod.FIRST_CONTAINER, "stderr", new HashSet<>()), getTestContext());
        } catch (IOException | InterruptedException e) {
            LogService.getRoot().fine("Could not retrieve Spark job stdout log: " + e.getMessage());
        }
        throw new ConnectionException(getTestContext().getHadoopContext(), ConnectionException.ErrorType.SPARK_JOB);
    }

    private SparkJobResult runTestSparkJob(SparkVersion sparkVersion) throws OperatorException, IOException, InvalidConnectionException {
        RadoopConnectionEntry connection = getTestContext().getConnection();
        SparkHandler sparkHandler = getTestContext().getSparkHandler();
        TestSparkHadoopContext testSparkHadoopContext = null;
        try {
            if (!connection.getSparkVersion().equals(sparkVersion)) {
                RadoopConnectionEntry copyEntry = RadoopConnectionEntry.copyEntry(connection);
                copyEntry.setSparkVersion(sparkVersion);
                testSparkHadoopContext = new TestSparkHadoopContext(copyEntry);
                sparkHandler = testSparkHadoopContext.getSparkHandler();
            }
            KillableIntegrationTest createTestOperation = createTestOperation(getType().getName() + " " + sparkVersion, this.commandQueue);
            SparkTools.startMonitoringThread(createTestOperation, this.messageQueue, this.commandQueue, false);
            ParameterTransferObject parameterTransferObject = new ParameterTransferObject();
            parameterTransferObject.setParameter(CommonParameter.INPUT_DIR, this.tempImportSourceFile.getParent().getFullPath());
            parameterTransferObject.setParameter(CommonParameter.EXAMPLE_SET_OUTPUT_DIR, this.tempOutputDir.getFullPath());
            SparkJobResult runSpark = sparkHandler.runSpark(createTestOperation, null, SparkOperation.TestCountJob, this.messageQueue, this.commandQueue, new HashMap(), parameterTransferObject, null);
            if (testSparkHadoopContext != null) {
                testSparkHadoopContext.close();
            }
            return runSpark;
        } catch (Throwable th) {
            if (testSparkHadoopContext != null) {
                testSparkHadoopContext.close();
            }
            throw th;
        }
    }

    @Override // eu.radoop.connections.service.test.AbstractRadoopTest, eu.radoop.connections.service.test.RadoopTest
    public boolean cleanUp() {
        close(this.tempOutputDir);
        return true;
    }
}
