package com.altair.ks_engine.bridge;

import com.altair.ks_engine.bridge.exception.KSEngineLoginException;
import com.altair.ks_engine.bridge.exception.KSEngineQueryProcessingException;
import com.altair.ks_engine.bridge.exception.KSEngineQueryQueueFullException;
import com.altair.ks_engine.bridge.exception.KSEngineShutdownException;
import com.altair.ks_engine.bridge.listener.KSEngineSocketListener;
import com.altair.ks_engine.bridge.util.KSEngineConstants;
import com.altair.ks_engine.query.KSCustomQuery;
import com.altair.ks_engine.query.KSError;
import com.altair.ks_engine.query.KSQuery;
import com.altair.ks_engine.query.KSResult;
import com.rapidminer.tools.LogService;
import com.rapidminer.tools.ValidationUtilV2;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Phaser;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/altair/ks_engine/bridge/KSEngineSocket.class */
public enum KSEngineSocket {
    INSTANCE;

    private Socket clientSocket;
    private BufferedReader in;
    private PrintWriter out;
    private Phaser submissionPhaser;
    private final List<KSEngineSocketListener> listenerList = Collections.synchronizedList(new ArrayList());
    private final ExecutorService listenerNotificationService = Executors.newFixedThreadPool(1);
    private final ExecutorService readerService = Executors.newFixedThreadPool(1, runnable -> {
        Thread thread = new Thread(runnable, "ks-engine-connection-reader");
        thread.setDaemon(true);
        return thread;
    });
    private final ExecutorService queryService = Executors.newFixedThreadPool(1, runnable -> {
        Thread thread = new Thread(runnable, "ks-engine-query-submitter");
        thread.setDaemon(true);
        return thread;
    });
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
    private final AtomicInteger errorCounter = new AtomicInteger(0);
    private final AtomicInteger phase = new AtomicInteger(0);
    private final LinkedBlockingQueue<KSQueryTask> queryQueue = new LinkedBlockingQueue<>(128);
    private volatile KSQueryTask loginTask = null;
    private volatile KSQueryTask currentTask = null;
    private volatile SocketStatus socketStatus = SocketStatus.CLOSED;

    KSEngineSocket() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void initialize() {
        if (this.initialized.compareAndSet(false, true)) {
            LogService.getRoot().log(Level.FINE, "Initializing KS engine socket");
            this.submissionPhaser = new Phaser(1);
            this.phase.set(0);
            this.shutdownRequested.set(false);
            this.queryService.submit(createSubmissionSender());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerSocketListener(KSEngineSocketListener kSEngineSocketListener) {
        this.listenerList.add((KSEngineSocketListener) ValidationUtilV2.requireNonNull(kSEngineSocketListener, "listener"));
        notifyListeners(Collections.singletonList(kSEngineSocketListener), this.socketStatus);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unregisterSocketListener(KSEngineSocketListener kSEngineSocketListener) {
        this.listenerList.remove(ValidationUtilV2.requireNonNull(kSEngineSocketListener, "listener"));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void connectToSocket(String str, int i) {
        if (this.clientSocket != null) {
            LogService.getRoot().log(Level.WARNING, "Cannot connect KS engine socket, socket already set!");
            return;
        }
        try {
            this.clientSocket = new Socket(str, i);
            this.in = new BufferedReader(new InputStreamReader(this.clientSocket.getInputStream()));
            this.out = new PrintWriter(this.clientSocket.getOutputStream(), true);
            this.readerService.submit(createSocketStreamListenerRunnable());
        } catch (IOException e) {
            notifyListenersConnectionFailure(this.errorCounter.incrementAndGet(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void login(String str, String str2) throws KSEngineLoginException {
        if (this.socketStatus != SocketStatus.CONNECTED) {
            throw new KSEngineLoginException("Socket connection was not waiting for login, but was: " + this.socketStatus);
        }
        setSocketStatus(SocketStatus.LOGGING_IN);
        this.loginTask = new KSQueryTask(new KSCustomQuery(String.format("Mining Location=%s;UID=%s;authenticator=NOT_SECURE_AUTHENTICATION;ENHANCED_TIME=true", str, str2)));
        this.out.println(this.loginTask.getQuery().getProcessedQuery());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized Future<KSResult> submitQuery(KSQuery kSQuery) throws KSEngineQueryQueueFullException {
        KSQueryTask kSQueryTask = new KSQueryTask(kSQuery);
        if (!this.queryQueue.offer(kSQueryTask)) {
            throw new KSEngineQueryQueueFullException("Could not submit query, queue was full");
        }
        LogService.getRoot().log(Level.FINE, "KS engine query added to queue: " + kSQuery);
        return kSQueryTask.getFutureTask();
    }

    synchronized void closeSocket() {
        try {
            setSocketStatus(SocketStatus.CLOSED);
            if (this.clientSocket != null) {
                this.clientSocket.close();
            }
        } catch (IOException e) {
            LogService.getRoot().log(Level.WARNING, "Failed to close connection socket to KS engine.", (Throwable) e);
        } finally {
            this.clientSocket = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown() {
        if (this.initialized.compareAndSet(true, false)) {
            this.shutdownRequested.set(true);
            closeSocketAndResetState(new KSEngineShutdownException("KS engine shutting down."), true);
        }
    }

    private void closeSocketAndResetState(Exception exc, boolean z) {
        closeSocket();
        this.submissionPhaser.forceTermination();
        if (z) {
            this.errorCounter.set(0);
        }
        this.queryQueue.clear();
        if (this.currentTask != null) {
            this.currentTask.markException(exc);
            completeQueryTaskAsync(this.currentTask);
            this.currentTask = null;
        }
    }

    private Runnable createSocketStreamListenerRunnable() {
        return () -> {
            while (true) {
                try {
                    if (this.shutdownRequested.get()) {
                        break;
                    }
                    String readLine = this.in.readLine();
                    if (KSEngineSettings.INSTANCE.getDebugMode() == KSDebugMode.ALL) {
                        System.out.println(readLine);
                    }
                    if (readLine == null) {
                        if (this.socketStatus == SocketStatus.LOGGING_IN) {
                            notifyListenersLoginFailure(new KSEngineLoginException("Login failed. Invalid credentials?"));
                        } else {
                            LogService.getRoot().log(Level.INFO, "Connection was closed by KS engine due to inactivity.");
                            closeSocket();
                        }
                    } else if (readLine.startsWith(KSEngineConstants.PREFIX_KS_LOGIN)) {
                        this.errorCounter.set(0);
                        setSocketStatus(SocketStatus.CONNECTED);
                    } else if (readLine.startsWith(KSEngineConstants.PREFIX_KS_READY)) {
                        if (this.currentTask != null) {
                            LogService.getRoot().log(Level.FINE, "KS engine query completed, returning result");
                            completeCurrentTask();
                        } else {
                            if (this.socketStatus != SocketStatus.LOGGING_IN) {
                                throw new IllegalStateException("UNEXPECTED STATE FOR KS ENGINE SOCKET: " + this.socketStatus);
                            }
                            setSocketStatus(SocketStatus.READY);
                            this.loginTask = null;
                            this.submissionPhaser.arrive();
                        }
                    } else if (readLine.startsWith(KSEngineConstants.PREFIX_KS_EMPTY)) {
                        if (this.currentTask == null) {
                            throw new IllegalStateException("UNEXPECTED STATE WHEN READING EMPTY RESULT: " + this.socketStatus);
                        }
                        LogService.getRoot().log(Level.FINE, "Reading KS engine empty response");
                        this.currentTask.markEmpty();
                    } else if (this.socketStatus == SocketStatus.LOGGING_IN) {
                        if (this.loginTask.isReadingError()) {
                            this.loginTask.addErrorLine(readLine);
                            this.loginTask.markError();
                            String errorAsString = new KSError(this.loginTask.getErrorLines()).getErrorAsString();
                            this.loginTask = null;
                            throw new KSEngineLoginException(errorAsString);
                        }
                        this.loginTask.addErrorLine(readLine);
                    } else {
                        if (this.currentTask == null) {
                            LogService.getRoot().log(Level.SEVERE, () -> {
                                return String.format("UNEXPECTED RESULT FROM KS ENGINE SOCKET WHILE NOT BUSY: %s", readLine);
                            });
                            throw new IllegalStateException("UNEXPECTED RESULT FROM KS ENGINE SOCKET WHILE NOT BUSY");
                        }
                        if (readLine.startsWith(KSEngineConstants.PREFIX_KS_RESULT)) {
                            this.currentTask.addResultLine(readLine);
                        } else if (readLine.startsWith(KSEngineConstants.PREFIX_KS_ERROR)) {
                            this.currentTask.addErrorLine(readLine);
                        } else if (this.currentTask.isReadingResult()) {
                            LogService.getRoot().log(Level.FINEST, "Reading KS engine query result line");
                            this.currentTask.addResultLine(readLine);
                        } else {
                            if (!this.currentTask.isReadingError()) {
                                LogService.getRoot().log(Level.SEVERE, () -> {
                                    return String.format("UNEXPECTED STATE %d OF KS ENGINE QUERY TASK WHILE READING LINE: %s", Integer.valueOf(this.currentTask.getState()), readLine);
                                });
                                throw new IllegalStateException("UNEXPECTED STATE OF KS ENGINE QUERY TASK WHILE READING LINE");
                            }
                            LogService.getRoot().log(Level.FINEST, "Reading KS engine error line");
                            this.currentTask.addErrorLine(readLine);
                        }
                    }
                } catch (Exception e) {
                    if (this.socketStatus == SocketStatus.CLOSED) {
                        notifyListenersConnectionFailure(this.errorCounter.incrementAndGet(), e);
                    } else if (this.socketStatus == SocketStatus.LOGGING_IN) {
                        notifyListenersLoginFailure(e);
                    } else {
                        LogService.getRoot().log(Level.SEVERE, "Error for KS socket connection: " + e.getMessage(), (Throwable) e);
                        if (this.currentTask != null) {
                            this.currentTask.markException(e);
                            completeQueryTaskAsync(this.currentTask);
                            this.currentTask = null;
                        }
                    }
                    return;
                } finally {
                    closeSocketAndResetState(new KSEngineQueryProcessingException(new KSEngineShutdownException("KS engine bridge was shut down while processing")), false);
                }
            }
        };
    }

    private Runnable createSubmissionSender() {
        return () -> {
            while (!this.shutdownRequested.get()) {
                try {
                    KSQueryTask poll = this.queryQueue.poll(5L, TimeUnit.SECONDS);
                    if (poll != null) {
                        LogService.getRoot().log(Level.FINER, "KS engine query found, waiting until KS engine can accept query");
                        this.submissionPhaser.awaitAdvance(this.phase.getAndIncrement());
                        this.currentTask = poll;
                        setSocketStatus(SocketStatus.BUSY);
                        LogService.getRoot().log(Level.FINE, "KS engine query submitted");
                        String processedQuery = this.currentTask.getQuery().getProcessedQuery();
                        if (KSEngineSettings.INSTANCE.getDebugMode() == KSDebugMode.QUERY || KSEngineSettings.INSTANCE.getDebugMode() == KSDebugMode.ALL) {
                            System.out.println("Submitted query >>> " + processedQuery);
                        }
                        this.out.println(processedQuery);
                        if (this.out.checkError()) {
                            LogService.getRoot().log(Level.WARNING, "KS engine query could not be submitted, will put it back into query queue");
                            if (!this.queryQueue.offer(this.currentTask)) {
                                LogService.getRoot().log(Level.SEVERE, "KS engine query could not be put back into queue because it was full, discarding!");
                            }
                            this.submissionPhaser.arriveAndDeregister();
                        }
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        };
    }

    private void completeCurrentTask() {
        if (this.currentTask.isReadingResult()) {
            this.currentTask.markSuccessful();
        } else if (this.currentTask.isReadingError()) {
            this.currentTask.markError();
        } else {
            this.currentTask.markEmpty();
        }
        completeQueryTaskAsync(this.currentTask);
        this.currentTask = null;
        setSocketStatus(SocketStatus.READY);
        this.submissionPhaser.arrive();
    }

    private void completeQueryTaskAsync(KSQueryTask kSQueryTask) {
        Thread thread = new Thread(kSQueryTask.getFutureTask());
        thread.setDaemon(true);
        thread.setName("ks-engine-result-parser");
        thread.start();
    }

    private synchronized void setSocketStatus(SocketStatus socketStatus) {
        if (this.socketStatus != socketStatus) {
            this.socketStatus = socketStatus;
            notifyAllListeners(socketStatus);
        }
    }

    private void notifyListenersLoginFailure(Exception exc) {
        synchronized (this.listenerList) {
            for (KSEngineSocketListener kSEngineSocketListener : this.listenerList) {
                notifyListener(() -> {
                    kSEngineSocketListener.socketLoginFailed(exc);
                });
            }
        }
    }

    private synchronized void notifyListenersConnectionFailure(int i, Exception exc) {
        synchronized (this.listenerList) {
            for (KSEngineSocketListener kSEngineSocketListener : this.listenerList) {
                notifyListener(() -> {
                    kSEngineSocketListener.socketConnectionFailed(i, exc);
                });
            }
        }
    }

    private void notifyAllListeners(SocketStatus socketStatus) {
        synchronized (this.listenerList) {
            notifyListeners(this.listenerList, socketStatus);
        }
    }

    private void notifyListeners(List<KSEngineSocketListener> list, SocketStatus socketStatus) {
        for (KSEngineSocketListener kSEngineSocketListener : list) {
            switch (socketStatus) {
                case CONNECTED:
                    Objects.requireNonNull(kSEngineSocketListener);
                    notifyListener(kSEngineSocketListener::socketConnected);
                    break;
                case LOGGING_IN:
                    break;
                case READY:
                    Objects.requireNonNull(kSEngineSocketListener);
                    notifyListener(kSEngineSocketListener::socketReady);
                    break;
                case BUSY:
                    Objects.requireNonNull(kSEngineSocketListener);
                    notifyListener(kSEngineSocketListener::socketBusy);
                    break;
                case CLOSED:
                    Objects.requireNonNull(kSEngineSocketListener);
                    notifyListener(kSEngineSocketListener::socketClosed);
                    break;
                default:
                    LogService.getRoot().log(Level.WARNING, () -> {
                        return String.format("KS engine socket status %s not handled, cannot notify listeners!", socketStatus);
                    });
                    return;
            }
        }
    }

    private void notifyListener(Runnable runnable) {
        try {
            this.listenerNotificationService.submit(runnable);
        } catch (RejectedExecutionException e) {
            LogService.getRoot().log(Level.WARNING, "Failed to notify listener about KS engine lifecycle event", (Throwable) e);
        }
    }
}
