package com.altair.ai.pel.python.script;

import com.altair.ai.pel.distribution.PythonDistributionReference;
import com.altair.ai.pel.python.exception.PythonScriptRunnerException;
import com.altair.ai.pel.python.util.PythonRunnerTools;
import com.altair.ai.pel.server.local.ServerResource;
import com.altair.ai.pel.server.local.ServerService;
import com.rapidminer.tools.I18N;
import com.rapidminer.tools.LogService;
import java.io.InputStream;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.WebSocket;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.logging.Level;

/* loaded from: input_file:com/altair/ai/pel/python/script/ScriptRunnerWebSocketWithLocalFileTransfer.class */
class ScriptRunnerWebSocketWithLocalFileTransfer extends AbstractScriptRunnerWithLocalFileTransfer {
    private final AtomicReference<Callable<Void>> terminator = new AtomicReference<>(() -> {
        return null;
    });
    private PythonScriptTask task;

    @Override // com.altair.ai.pel.python.script.PythonScriptRunner
    public Callable<Void> createRunner(PythonDistributionReference pythonDistributionReference, PythonBaseScript pythonBaseScript) {
        this.task = new PythonScriptTaskLocalFiles(pythonBaseScript);
        this.task.setCleanupRunnable(this::cleanupRun);
        return () -> {
            long currentTimeMillis = System.currentTimeMillis();
            ServerResource openServerResource = ServerService.INSTANCE.openServerResource(pythonDistributionReference, pythonBaseScript);
            try {
                List<PythonStreamProvider> createInputProviders = pythonBaseScript.createInputProviders(this.inputsDir);
                PythonRunnerTools.writeContentToDisk(createInputProviders, this.inputsDir);
                WebSocketListenerImpl webSocketListenerImpl = new WebSocketListenerImpl(this.task, pythonBaseScript, pythonDistributionReference, openServerResource, createStartMessage(createInputProviders), this::setResultOutputs, currentTimeMillis);
                CompletableFuture<WebSocket> initWebSocket = initWebSocket(openServerResource, webSocketListenerImpl);
                this.terminator.set(() -> {
                    Objects.requireNonNull(webSocketListenerImpl);
                    initWebSocket.thenAccept(webSocketListenerImpl::stop);
                    return null;
                });
                return null;
            } catch (Exception e) {
                openServerResource.close();
                this.task.markException(e);
                LogService.getRoot().log(Level.WARNING, "script runner failed with error : " + e.getMessage());
                this.task.completeScriptTaskAsync();
                return null;
            }
        };
    }

    @Override // com.altair.ai.pel.python.script.PythonScriptRunner
    public Supplier<Callable<Void>> createStopCallback() {
        AtomicReference<Callable<Void>> atomicReference = this.terminator;
        Objects.requireNonNull(atomicReference);
        return atomicReference::get;
    }

    @Override // com.altair.ai.pel.python.script.PythonScriptRunner
    public PythonScriptTask getTask() {
        return this.task;
    }

    private CompletableFuture<WebSocket> initWebSocket(ServerResource serverResource, WebSocketListenerImpl webSocketListenerImpl) {
        return HttpClient.newBuilder().build().newWebSocketBuilder().header("Authorization", new String(serverResource.getCredentials())).buildAsync(URI.create("ws://localhost:" + serverResource.getPort()), webSocketListenerImpl).whenCompleteAsync((webSocket, th) -> {
            if (th != null) {
                LogService.getRoot().log(Level.WARNING, "failed to open web socket : " + th.getMessage());
                serverResource.close();
                this.task.markException(PythonRunnerTools.throwableToException(th));
                this.task.completeScriptTaskAsync();
            }
        });
    }

    private static String createStartMessage(List<PythonStreamProvider> list) throws Exception {
        InputStream call = list.stream().filter(pythonStreamProvider -> {
            return "args.json".equals(pythonStreamProvider.getContentName());
        }).findFirst().orElseThrow(() -> {
            return new PythonScriptRunnerException(I18N.getErrorMessage("error.pel.script_runner.missing_args", new Object[0]));
        }).getContentSupplier().call();
        try {
            String str = "{\"type\": \"START\",\"data\": " + new String(call.readAllBytes(), StandardCharsets.UTF_8) + "}";
            if (call != null) {
                call.close();
            }
            return str;
        } catch (Throwable th) {
            if (call != null) {
                try {
                    call.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
