/*
 * Decompiled with CFR 0.152.
 */
package com.altair.ai.pel.python.script;

import com.altair.ai.pel.distribution.PythonDistributionReference;
import com.altair.ai.pel.python.exception.PythonScriptRunnerException;
import com.altair.ai.pel.python.script.AbstractScriptRunnerWithLocalFileTransfer;
import com.altair.ai.pel.python.script.PythonBaseScript;
import com.altair.ai.pel.python.script.PythonScriptTask;
import com.altair.ai.pel.python.script.PythonScriptTaskLocalFiles;
import com.altair.ai.pel.python.script.PythonStreamProvider;
import com.altair.ai.pel.python.script.WebSocketListenerImpl;
import com.altair.ai.pel.python.util.PythonRunnerTools;
import com.altair.ai.pel.server.local.ServerResource;
import com.altair.ai.pel.server.local.ServerService;
import com.rapidminer.tools.I18N;
import com.rapidminer.tools.LogService;
import java.io.InputStream;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.WebSocket;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.logging.Level;

class ScriptRunnerWebSocketWithLocalFileTransfer
extends AbstractScriptRunnerWithLocalFileTransfer {
    private final AtomicReference<Callable<Void>> terminator = new AtomicReference<Callable<Void>>(() -> null);
    private PythonScriptTask task;

    ScriptRunnerWebSocketWithLocalFileTransfer() {
    }

    @Override
    public Callable<Void> createRunner(PythonDistributionReference distRef, PythonBaseScript script) {
        this.task = new PythonScriptTaskLocalFiles(script);
        this.task.setCleanupRunnable(this::cleanupRun);
        return () -> {
            long startTime = System.currentTimeMillis();
            ServerResource serverResource = ServerService.INSTANCE.openServerResource(distRef, script);
            try {
                List<PythonStreamProvider> inputProviders = script.createInputProviders(this.inputsDir);
                PythonRunnerTools.writeContentToDisk(inputProviders, this.inputsDir);
                WebSocketListenerImpl socketListener = new WebSocketListenerImpl(this.task, script, distRef, serverResource, ScriptRunnerWebSocketWithLocalFileTransfer.createStartMessage(inputProviders), this::setResultOutputs, startTime);
                CompletableFuture<WebSocket> wsFuture = this.initWebSocket(serverResource, socketListener);
                this.terminator.set(() -> {
                    wsFuture.thenAccept(socketListener::stop);
                    return null;
                });
            }
            catch (Exception e) {
                serverResource.close();
                this.task.markException(e);
                LogService.getRoot().log(Level.WARNING, "script runner failed with error : " + e.getMessage());
                this.task.completeScriptTaskAsync();
            }
            return null;
        };
    }

    @Override
    public Supplier<Callable<Void>> createStopCallback() {
        return this.terminator::get;
    }

    @Override
    public PythonScriptTask getTask() {
        return this.task;
    }

    private CompletableFuture<WebSocket> initWebSocket(ServerResource serverResource, WebSocketListenerImpl socketListener) {
        return HttpClient.newBuilder().build().newWebSocketBuilder().header("Authorization", new String(serverResource.getCredentials())).buildAsync(URI.create("ws://localhost:" + serverResource.getPort()), socketListener).whenCompleteAsync((webSocket, error) -> {
            if (error != null) {
                LogService.getRoot().log(Level.WARNING, "failed to open web socket : " + error.getMessage());
                serverResource.close();
                this.task.markException(PythonRunnerTools.throwableToException(error));
                this.task.completeScriptTaskAsync();
            }
        });
    }

    private static String createStartMessage(List<PythonStreamProvider> inputProviders) throws Exception {
        try (InputStream args = inputProviders.stream().filter(e -> "args.json".equals(e.getContentName())).findFirst().orElseThrow(() -> new PythonScriptRunnerException(I18N.getErrorMessage((String)"error.pel.script_runner.missing_args", (Object[])new Object[0]))).getContentSupplier().call();){
            String argsString = new String(args.readAllBytes(), StandardCharsets.UTF_8);
            String string = "{\"type\": \"START\",\"data\": " + argsString + "}";
            return string;
        }
    }
}

