/*
 * Decompiled with CFR 0.152.
 */
package com.altair.ai.pel.operator;

import com.altair.ai.pel.distribution.PythonDistribution;
import com.altair.ai.pel.loader.PythonExtension;
import com.altair.ai.pel.loader.model.ExpectedInput;
import com.altair.ai.pel.loader.model.ExpectedOutput;
import com.altair.ai.pel.operator.PythonInputPortContainer;
import com.altair.ai.pel.operator.PythonOperator;
import com.altair.ai.pel.operator.PythonOperatorChainTools;
import com.altair.ai.pel.operator.PythonOperatorConnectionEncryptor;
import com.altair.ai.pel.operator.PythonOperatorContainer;
import com.altair.ai.pel.operator.PythonOutputPortContainer;
import com.altair.ai.pel.operator.PythonResultHandler;
import com.altair.ai.pel.operator.SerializablePythonIOObject;
import com.altair.ai.pel.operator.wrapper.PythonFunctionCall;
import com.altair.ai.pel.operator.wrapper.PythonFunctionCallBuilder;
import com.altair.ai.pel.operator.wrapper.PythonFunctionDataType;
import com.altair.ai.pel.operator.wrapper.PythonFunctionSourceType;
import com.altair.ai.pel.operator.wrapper.PythonFunctionTargetType;
import com.altair.ai.pel.python.exception.PythonScriptProcessingException;
import com.altair.ai.pel.python.exception.PythonScriptRunnerException;
import com.altair.ai.pel.python.exception.PythonScriptStoppedException;
import com.altair.ai.pel.python.script.result.PythonScriptResult;
import com.altair.ai.pel.python.script.result.ScriptOutput;
import com.altair.ai.pel.python.script.result.SubmissionResult;
import com.altair.ai.pel.python.settings.PythonSDKSettings;
import com.altair.ai.pel.python.util.PythonOperatorTools;
import com.rapidminer.Process;
import com.rapidminer.ProcessStateListener;
import com.rapidminer.ProcessStoppedListener;
import com.rapidminer.adaption.belt.IOTable;
import com.rapidminer.operator.Operator;
import com.rapidminer.operator.OperatorException;
import com.rapidminer.operator.ProcessStoppedException;
import com.rapidminer.operator.UserData;
import com.rapidminer.operator.UserError;
import com.rapidminer.operator.nio.file.FileObject;
import com.rapidminer.operator.ports.InputPort;
import com.rapidminer.operator.ports.InputPorts;
import com.rapidminer.operator.ports.OutputPort;
import com.rapidminer.operator.ports.PortOwner;
import com.rapidminer.parameter.ParameterType;
import com.rapidminer.parameter.ParameterTypeAttributeSubset;
import com.rapidminer.parameter.ParameterTypeBoolean;
import com.rapidminer.parameter.ParameterTypeCategory;
import com.rapidminer.parameter.ParameterTypeDouble;
import com.rapidminer.parameter.ParameterTypeEnumeration;
import com.rapidminer.parameter.ParameterTypeInt;
import com.rapidminer.parameter.ParameterTypeList;
import com.rapidminer.parameter.ParameterTypeTupel;
import com.rapidminer.parameter.UndefinedParameterError;
import com.rapidminer.tools.ConsumerWithThrowable;
import com.rapidminer.tools.LogService;
import com.rapidminer.tools.ValidationUtilV2;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class PythonOperatorChain
implements UserData<Object> {
    public static final String PYTHON_CHAIN_KEY = "py_chain";
    private final List<PythonOperatorContainer> chain;
    private final List<Operator> orchestrators;
    private final List<PythonInputPortContainer> inputPortsConnectedWithinChain;
    private final List<PythonOutputPortContainer> outputPortsConnectedWithinChain;
    private final PythonDistribution pythonDist;
    private final AtomicBoolean cleanedUp;
    private volatile int currentOperator;

    public PythonOperatorChain(PythonOperator startingOperator) throws OperatorException {
        ValidationUtilV2.requireNonNull((Object)((Object)startingOperator), (String)"startingOperator");
        this.chain = new ArrayList<PythonOperatorContainer>();
        this.orchestrators = new ArrayList<Operator>();
        this.inputPortsConnectedWithinChain = new ArrayList<PythonInputPortContainer>();
        this.outputPortsConnectedWithinChain = new ArrayList<PythonOutputPortContainer>();
        this.pythonDist = startingOperator.getPythonOperatorDescription().getExtension().getPythonDist();
        this.cleanedUp = new AtomicBoolean(false);
        this.currentOperator = 0;
        this.registerCleanUp(startingOperator);
        if (!this.cleanedUp.get()) {
            this.initializeChain(startingOperator);
        }
    }

    public boolean contains(PythonOperator pyOp) {
        return this.chain.stream().anyMatch(container -> ((Object)((Object)container.getPyOp())).equals((Object)pyOp));
    }

    public int getIndexOf(PythonOperator pyOp) {
        for (int i = 0; i < this.chain.size(); ++i) {
            if (!((Object)((Object)this.chain.get(i).getPyOp())).equals((Object)pyOp)) continue;
            return i;
        }
        return -1;
    }

    public boolean isFirstInChain(PythonOperator pyOp) {
        return !this.chain.isEmpty() && ((Object)((Object)this.getFirstContainer().getPyOp())).equals((Object)pyOp);
    }

    public boolean isLastInChain(PythonOperator pyOp) {
        return !this.chain.isEmpty() && ((Object)((Object)this.chain.get(this.chain.size() - 1).getPyOp())).equals((Object)pyOp);
    }

    public boolean isMiddleInChain(PythonOperator pyOp) {
        return !this.isFirstInChain(pyOp) && !this.isLastInChain(pyOp) && this.contains(pyOp);
    }

    public PythonOperator getCurrentOperator() {
        return this.chain.get(this.currentOperator).getPyOp();
    }

    public PythonExtension getExtension() {
        return this.getFirstContainer().getPyOpDesc().getExtension();
    }

    public PythonDistribution getPythonDist() {
        return this.pythonDist;
    }

    public void doWork(PythonOperator pyOp) throws OperatorException {
        try {
            if (!this.cleanedUp.get()) {
                LogService.getRoot().log(PythonOperatorChainTools.getPythonOperatorLogLevel(), () -> "Python: Running operator '" + pyOp.getName() + "'");
                if (this.chain.stream().noneMatch(container -> ((Object)((Object)container.getPyOp())).equals((Object)pyOp))) {
                    throw new OperatorException("BUG: Tried to run Python operator not part of this Python operator chain");
                }
                this.runPythonOperator(pyOp);
            }
        }
        catch (Exception e) {
            this.cleanUp();
            throw e;
        }
    }

    public int getSize() {
        return this.chain.size();
    }

    public String getName() {
        return this.getFirstContainer().getPyOp().getName();
    }

    public UserData<Object> copyUserData(Object newParent) {
        return null;
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        PythonOperatorChain that = (PythonOperatorChain)o;
        return Objects.equals(this.chain, that.chain) && Objects.equals(this.orchestrators, that.orchestrators);
    }

    public int hashCode() {
        return Objects.hash(this.chain, this.orchestrators);
    }

    List<PythonOperatorContainer> getPythonOperatorContainers() {
        return Collections.unmodifiableList(this.chain);
    }

    List<Operator> getOrchestrators() {
        return Collections.unmodifiableList(this.orchestrators);
    }

    synchronized void completeCurrentOperator() {
        PythonOperatorTools.stopProgressAnimation(this.getCurrentOperator());
        if (this.currentOperator < this.chain.size() - 1 && !this.cleanedUp.get()) {
            PythonOperatorTools.completeOperator(this.getCurrentOperator());
            ++this.currentOperator;
            PythonOperatorTools.startProgressAnimation(this.getCurrentOperator());
        }
    }

    private void initializeChain(PythonOperator startingOperator) throws OperatorException {
        if (PythonSDKSettings.isChainingEnabled()) {
            this.initializeChain(startingOperator, startingOperator.getExecutionUnit().getChildOperators());
        } else {
            this.addToChain(startingOperator);
        }
    }

    private void initializeChain(PythonOperator startingOperator, Collection<Operator> allOperators) throws OperatorException {
        for (Operator op : PythonOperatorChain.pruneChain(this.growChain(startingOperator, allOperators))) {
            if (op instanceof PythonOperator) {
                this.addToChain((PythonOperator)op);
                continue;
            }
            this.orchestrators.add(op);
        }
    }

    private static List<Operator> pruneChain(List<Operator> rawChain) {
        int end = rawChain.size() - 1;
        HashSet<Operator> operatorsInChain = new HashSet<Operator>();
        for (int i = rawChain.size() - 1; i >= 0; --i) {
            Operator op = rawChain.get(i);
            if (PythonOperatorChainTools.ALLOWED_ORCHESTRATORS.contains(op.getClass())) {
                boolean isLastInChain = i == end;
                boolean outputOutsideChain = op.getOutputPorts().getAllPorts().stream().anyMatch(outputPort -> outputPort.isConnected() && !operatorsInChain.contains(outputPort.getOpposite().getPorts().getOwner().getOperator()));
                if (isLastInChain || outputOutsideChain) {
                    end = i - 1;
                    operatorsInChain.clear();
                    continue;
                }
                operatorsInChain.add(op);
                continue;
            }
            operatorsInChain.add(op);
        }
        return rawChain.subList(0, end + 1);
    }

    private List<Operator> growChain(PythonOperator startingOperator, Collection<Operator> allOperators) throws OperatorException {
        Operator op;
        ArrayList<Operator> rawChain = new ArrayList<Operator>();
        HashSet<Operator> pyOpsInChain = new HashSet<Operator>();
        boolean startFound = false;
        boolean endFound = false;
        Iterator<Operator> allOpIt = allOperators.iterator();
        while (allOpIt.hasNext() && !startFound) {
            op = allOpIt.next();
            if (!op.equals((Object)startingOperator)) continue;
            PythonOperatorChain.checkMandatoryParameters(startingOperator);
            startFound = true;
            rawChain.add(op);
            pyOpsInChain.add(op);
            endFound = op.hasBreakpoint(1);
        }
        if (!startFound) {
            throw new OperatorException("BUG: in Python operator chain: starting operator not found in ExecutionUnit");
        }
        while (allOpIt.hasNext() && !endFound) {
            op = allOpIt.next();
            if (!op.isEnabled()) continue;
            boolean isOrchestrator = PythonOperatorChainTools.ALLOWED_ORCHESTRATORS.contains(op.getClass());
            endFound = !this.compatibleDistribution(op) || op.hasBreakpoint(0) || PythonOperatorChainTools.usesMacros(op) || isOrchestrator && op.getInputPorts().getAllPorts().stream().anyMatch(inputPort -> inputPort.isConnected() && !pyOpsInChain.contains(inputPort.getOpposite().getPorts().getOwner().getOperator()));
            if (endFound) continue;
            rawChain.add(op);
            if (op instanceof PythonOperator) {
                pyOpsInChain.add(op);
            }
            if (!op.hasBreakpoint(1)) continue;
            endFound = true;
        }
        return rawChain;
    }

    private boolean compatibleDistribution(Operator op) {
        return PythonOperatorChainTools.ALLOWED_ORCHESTRATORS.contains(op.getClass()) || op instanceof PythonOperator && this.pythonDist.equals(((PythonOperator)op).getPythonOperatorDescription().getExtension().getPythonDist());
    }

    private void addToChain(PythonOperator pyOp) {
        this.chain.add(new PythonOperatorContainer(pyOp, pyOp.getPythonOperatorDescription()));
        pyOp.setUserData(PYTHON_CHAIN_KEY, this);
    }

    private PythonFunctionCall createFunctionCall(PythonOperatorContainer container) throws OperatorException {
        PythonOperator pyOp = container.getPyOp();
        int operatorCounter = this.chain.indexOf(container) + 1;
        PythonFunctionCallBuilder funcBuilder = new PythonFunctionCallBuilder(pyOp.getPythonOperatorDescription().getFunction(), pyOp.getPythonOperatorDescription().getModule(), pyOp.getPythonOperatorDescription().getSources());
        this.addInputs(funcBuilder, pyOp, operatorCounter);
        this.addOutputs(funcBuilder, pyOp, operatorCounter);
        return funcBuilder.build();
    }

    private PythonResultHandler createResultHandler(PythonOperatorContainer container) {
        PythonOperator pyOp = container.getPyOp();
        PythonResultHandler handler = new PythonResultHandler(pyOp);
        int outputCounter = pyOp.getConnectionSelector() != null ? 1 : 0;
        block10: for (ScriptOutput scriptOutput : PythonOperatorChainTools.createScriptOutputs(container.getPyFuncCall())) {
            OutputPort outputPort = (OutputPort)pyOp.getOutputPorts().getPortByIndex(outputCounter++);
            if (this.outputPortsConnectedWithinChain.stream().anyMatch(c -> c.getOutputPort().equals(outputPort))) continue;
            PythonFunctionTargetType type = scriptOutput.getTarget();
            switch (type) {
                case RAPIDMINER: {
                    switch (scriptOutput.getDataType()) {
                        case DATAFRAME: {
                            handler.addConsumerForResult(outputPort, scriptOutput, (ConsumerWithThrowable<InputStream, UserError>)((ConsumerWithThrowable)is -> outputPort.deliver(PythonOperatorChainTools.loadResultValueFromStream(pyOp, is, scriptOutput, IOTable.class))));
                            continue block10;
                        }
                        case FILE: {
                            handler.addConsumerForResult(outputPort, scriptOutput, (ConsumerWithThrowable<InputStream, UserError>)((ConsumerWithThrowable)is -> outputPort.deliver(PythonOperatorChainTools.loadResultValueFromStream(pyOp, is, scriptOutput, FileObject.class))));
                            continue block10;
                        }
                        case SERIALIZABLE_OBJECT: {
                            handler.addConsumerForResult(outputPort, scriptOutput, (ConsumerWithThrowable<InputStream, UserError>)((ConsumerWithThrowable)is -> outputPort.deliver(PythonOperatorChainTools.loadResultValueFromStream(pyOp, is, scriptOutput, SerializablePythonIOObject.class))));
                            continue block10;
                        }
                        case INTERNAL_OBJECT: {
                            LogService.getRoot().log(Level.FINE, () -> String.format("Python internal data type at output port %s of operator %s", outputPort.getName(), pyOp.getName()));
                            continue block10;
                        }
                    }
                    LogService.getRoot().log(Level.SEVERE, () -> String.format("BUG: Unexpected data type at port: %s", new Object[]{scriptOutput.getDataType()}));
                    continue block10;
                }
                case PYTHON: {
                    LogService.getRoot().log(Level.WARNING, () -> String.format("BUG: Unexpected Python input at output port %s of operator %s", outputPort.getName(), pyOp.getName()));
                    continue block10;
                }
            }
            LogService.getRoot().log(Level.WARNING, () -> String.format("Unhandled output at output port %s of operator %s", outputPort.getName(), pyOp.getName()));
        }
        return handler;
    }

    private PythonOperatorContainer getFirstContainer() {
        return this.chain.get(0);
    }

    private void runPythonOperator(PythonOperator pyOp) throws OperatorException {
        if (this.isFirstInChain(pyOp)) {
            LogService.getRoot().log(PythonOperatorChainTools.getPythonOperatorLogLevel(), () -> "Python: First operator of chain, doing all the work");
            PythonOperatorChain.checkForCycles(this.chain);
            this.collectPortConnectionInformationAndVerifyInputs();
            for (PythonOperatorContainer container : this.chain) {
                container.setPyFuncCall(this.createFunctionCall(container));
                container.setPythonResultHandler(this.createResultHandler(container));
            }
            PythonOperatorTools.startProgressAnimation(pyOp);
            PythonScriptResult result = this.awaitResult(pyOp, PythonOperatorChainTools.runPythonWrapper(this, this.getAllExpectedExternalOutputs()));
            if (result != null) {
                if (result.isSuccess()) {
                    for (PythonOperatorContainer container : this.chain) {
                        Optional<PythonResultHandler> resultHandler = container.getPythonResultHandler();
                        if (!resultHandler.isPresent()) continue;
                        resultHandler.get().deliverOutputs();
                    }
                }
                result.close();
            }
        }
        if (this.isMiddleInChain(pyOp)) {
            LogService.getRoot().log(PythonOperatorChainTools.getPythonOperatorLogLevel(), () -> "Python: Inner operator of chain, nothing to do");
        }
        if (this.isLastInChain(pyOp)) {
            LogService.getRoot().log(PythonOperatorChainTools.getPythonOperatorLogLevel(), () -> "Python: Last operator of chain, doing the cleanup");
            this.cleanUp();
        }
    }

    private List<ScriptOutput> getAllExpectedExternalOutputs() {
        ArrayList<ScriptOutput> totalOutputs = new ArrayList<ScriptOutput>();
        for (PythonOperatorContainer container : this.chain) {
            container.getPythonResultHandler().ifPresent(pythonResultHandler -> totalOutputs.addAll(pythonResultHandler.getAllExpectedOutputs()));
        }
        return totalOutputs;
    }

    private void collectPortConnectionInformationAndVerifyInputs() throws UserError {
        ArrayList<PythonInputPortContainer> connectedToPyOutputPorts = new ArrayList<PythonInputPortContainer>();
        ArrayList pyOpInputPorts = new ArrayList();
        ArrayList<PythonOutputPortContainer> pyOpOutputPorts = new ArrayList<PythonOutputPortContainer>();
        int counter = 1;
        for (PythonOperatorContainer pyOpContainer : this.chain) {
            for (OutputPort outputPort : pyOpContainer.getPyOp().getOutputPorts().getAllPorts()) {
                if (!outputPort.isConnected()) continue;
                String key = String.format("%s_%d", outputPort.getName(), counter);
                pyOpOutputPorts.add(new PythonOutputPortContainer(outputPort, key));
                connectedToPyOutputPorts.add(new PythonInputPortContainer(outputPort.getOpposite(), key));
            }
            pyOpInputPorts.addAll(pyOpContainer.getPyOp().getInputPorts().getAllPorts());
            ++counter;
        }
        for (PythonOutputPortContainer outPortContainer : pyOpOutputPorts) {
            OutputPort outputPort = outPortContainer.getOutputPort();
            InputPort connectedInputPort = outputPort.getOpposite();
            Operator connectedInputPortOp = connectedInputPort.getPorts().getOwner().getOperator();
            if (pyOpInputPorts.contains(connectedInputPort)) {
                this.inputPortsConnectedWithinChain.add(new PythonInputPortContainer(connectedInputPort, outPortContainer.getKey()));
                this.outputPortsConnectedWithinChain.add(outPortContainer);
                continue;
            }
            if (!PythonOperatorChainTools.ALLOWED_ORCHESTRATORS.contains(connectedInputPortOp.getClass()) || !this.orchestrators.contains(connectedInputPortOp)) continue;
            connectedInputPortOp.getOutputPorts().getAllPorts().forEach(orchestratorOutPort -> {
                if (!orchestratorOutPort.isConnected()) {
                    return;
                }
                InputPort targetInput = orchestratorOutPort.getOpposite();
                Operator targetOpOfOrchestrator = Optional.ofNullable(targetInput.getPorts().getOwner()).map(PortOwner::getOperator).orElse(null);
                if (targetOpOfOrchestrator instanceof PythonOperator && this.contains((PythonOperator)targetOpOfOrchestrator)) {
                    PythonInputPortContainer inputPortContainer = new PythonInputPortContainer(targetInput, outPortContainer.getKey());
                    connectedToPyOutputPorts.add(inputPortContainer);
                    this.inputPortsConnectedWithinChain.add(inputPortContainer);
                }
            });
            connectedInputPortOp.getInputPorts().getAllPorts().forEach(orchestratorInPort -> {
                OutputPort targetOutput = orchestratorInPort.getOpposite();
                Operator targetOpOfOrchestrator = Optional.ofNullable(targetOutput.getPorts().getOwner()).map(PortOwner::getOperator).orElse(null);
                if (targetOpOfOrchestrator instanceof PythonOperator && this.contains((PythonOperator)targetOpOfOrchestrator)) {
                    String key = String.format("%s_%d", targetOutput.getName(), this.getIndexOf((PythonOperator)targetOpOfOrchestrator) + 1);
                    PythonOutputPortContainer outputPortContainer = new PythonOutputPortContainer(targetOutput, key);
                    this.outputPortsConnectedWithinChain.add(outputPortContainer);
                }
            });
        }
        this.verifyChain(connectedToPyOutputPorts);
        this.verifyOrchestrators(this.inputPortsConnectedWithinChain);
    }

    private void verifyChain(List<PythonInputPortContainer> connectedToPyOutputPorts) throws UserError {
        for (PythonOperatorContainer pyOpContainer : this.chain) {
            for (ExpectedInput expectedInput : pyOpContainer.getPyOpDesc().getExpectedInputs()) {
                if (expectedInput.isOptional()) continue;
                PythonOperator pyOp = pyOpContainer.getPyOp();
                InputPorts inputPorts = pyOp.getInputPorts();
                InputPort inPort = (InputPort)inputPorts.getPortByName(expectedInput.getName());
                if (!inPort.isConnected()) {
                    throw new UserError((Operator)pyOp, "149", new Object[]{inPort.getName()});
                }
                if (this.isFirstInChain(pyOp) && inPort.getRawData() == null) {
                    throw new UserError((Operator)pyOp, "149", new Object[]{inPort.getName()});
                }
                if (!connectedToPyOutputPorts.stream().noneMatch(container -> container.getInputPort().equals(inPort)) || inPort.getRawData() != null) continue;
                throw new UserError((Operator)pyOp, "149", new Object[]{inPort.getName()});
            }
        }
    }

    private void verifyOrchestrators(List<PythonInputPortContainer> inputPortsConnectedWithinChain) throws UserError {
        for (Operator orchestratorOp : this.orchestrators) {
            Set<String> serializableDataClasses;
            OutputPort pyOpOutPort = ((InputPort)orchestratorOp.getInputPorts().getAllPorts().get(0)).getOpposite();
            PythonOperator pyOP = (PythonOperator)pyOpOutPort.getPorts().getOwner().getOperator();
            int portIndex = pyOP.getOutputPorts().getAllPorts().indexOf(pyOpOutPort);
            ExpectedOutput expectedOutput = pyOP.getPythonOperatorDescription().getExpectedOutputs().get(portIndex);
            if (expectedOutput.getDataType(serializableDataClasses = pyOP.getPythonOperatorDescription().getExtension().getSerializableDataClasses().keySet()) != PythonFunctionDataType.INTERNAL_OBJECT) continue;
            for (OutputPort outputPort : orchestratorOp.getOutputPorts().getAllPorts()) {
                if (!outputPort.isConnected() || !inputPortsConnectedWithinChain.stream().noneMatch(container -> container.getInputPort().equals(outputPort.getOpposite()))) continue;
                LogService.getRoot().log(Level.SEVERE, () -> String.format("Output port type '%s' at port %s can only be passed to other Python operators", expectedOutput.getDataClass(), outputPort.getName()));
                throw new UserError(orchestratorOp, "pel.operator.output_internal_python", new Object[]{outputPort.getName()});
            }
        }
    }

    private void addInputs(PythonFunctionCallBuilder funcBuilder, PythonOperator pyOp, int operatorCounter) throws OperatorException {
        if (pyOp.getConnectionSelector() != null) {
            funcBuilder.addInput(pyOp.getPythonOperatorDescription().getConnection().getName(), PythonFunctionSourceType.PROVIDED, PythonFunctionDataType.CONNECTION, PythonOperatorConnectionEncryptor.getConnectionParameters(pyOp), false);
        }
        for (ParameterType parameterType : pyOp.getPythonOperatorDescription().createParameterTypes(pyOp)) {
            this.addInputForParameterType(funcBuilder, pyOp, parameterType);
        }
        int resultCounter = 1;
        for (ExpectedInput expectedInput : pyOp.getPythonOperatorDescription().getExpectedInputs()) {
            InputPort inPort = (InputPort)pyOp.getInputPorts().getPortByName(expectedInput.getName());
            Optional<PythonInputPortContainer> inPortContainer = this.inputPortsConnectedWithinChain.stream().filter(c -> c.getInputPort().equals(inPort)).findFirst();
            if (inPortContainer.isPresent()) {
                this.addInternalInputForInputPort(funcBuilder, expectedInput, inPortContainer.get().getKey());
                continue;
            }
            this.addExternalInputForInputPort(funcBuilder, pyOp, expectedInput, operatorCounter, resultCounter++);
        }
    }

    private void addOutputs(PythonFunctionCallBuilder funcBuilder, PythonOperator pyOp, int operatorCounter) throws UserError {
        int resultCounter = 1;
        for (ExpectedOutput expectedOutput : pyOp.getPythonOperatorDescription().getExpectedOutputs()) {
            OutputPort outPort = (OutputPort)pyOp.getOutputPorts().getPortByName(expectedOutput.getName());
            Optional<PythonOutputPortContainer> outPortContainer = this.outputPortsConnectedWithinChain.stream().filter(c -> c.getOutputPort().equals(outPort)).findFirst();
            if (outPortContainer.isPresent()) {
                this.addInternalOutputForOutputPort(funcBuilder, pyOp, expectedOutput, outPortContainer.get().getKey());
                continue;
            }
            this.addExternalOutputForOutputPort(funcBuilder, pyOp, expectedOutput, operatorCounter, resultCounter++);
        }
    }

    private void addInternalInputForInputPort(PythonFunctionCallBuilder funcBuilder, ExpectedInput expectedInput, String key) {
        funcBuilder.addInput(expectedInput.getName(), PythonFunctionSourceType.PYTHON, PythonFunctionDataType.INTERNAL_OBJECT, key, expectedInput.isCollection());
    }

    private void addExternalInputForInputPort(PythonFunctionCallBuilder funcBuilder, PythonOperator pyOp, ExpectedInput expectedInput, int operatorCounter, int resultCounter) throws UserError {
        Set<String> serializableDataClasses = pyOp.getPythonOperatorDescription().getExtension().getSerializableDataClasses().keySet();
        PythonFunctionDataType dataType = expectedInput.getDataType(serializableDataClasses);
        switch (dataType) {
            case DATAFRAME: {
                String suffix = expectedInput.isCollection() ? "collection" : "rmhdf5table";
                funcBuilder.addInput(expectedInput.getName(), PythonFunctionSourceType.RAPIDMINER, dataType, String.format("input_%d_%d.%s", operatorCounter, resultCounter, suffix), expectedInput.isCollection());
                break;
            }
            case FILE: 
            case SERIALIZABLE_OBJECT: {
                funcBuilder.addInput(expectedInput.getName(), PythonFunctionSourceType.RAPIDMINER, dataType, String.format("input_%d_%d", operatorCounter, resultCounter), expectedInput.isCollection());
                break;
            }
            case INTERNAL_OBJECT: {
                if (!((InputPort)pyOp.getInputPorts().getPortByName(expectedInput.getName())).isConnected()) break;
                LogService.getRoot().log(Level.SEVERE, () -> String.format("Input port type '%s' at port %s can only be passed from other Python operators", expectedInput.getDataClass(), expectedInput.getName()));
                throw new UserError((Operator)pyOp, "pel.operator.input_internal_python", new Object[]{expectedInput.getName()});
            }
            default: {
                LogService.getRoot().log(Level.WARNING, () -> String.format("Unknown input port type '%s' for Python at port %s", expectedInput.getDataClass(), expectedInput.getName()));
            }
        }
    }

    private void addInputForParameterType(PythonFunctionCallBuilder funcBuilder, PythonOperator pyOp, ParameterType parameterType) {
        String key = parameterType.getKey();
        try {
            PythonFunctionDataType dataType = parameterType instanceof ParameterTypeEnumeration || parameterType instanceof ParameterTypeAttributeSubset ? PythonFunctionDataType.LIST : (parameterType instanceof ParameterTypeTupel ? PythonFunctionDataType.TUPLE : (parameterType instanceof ParameterTypeList ? PythonFunctionDataType.DICT : (parameterType instanceof ParameterTypeInt ? PythonFunctionDataType.INT : (parameterType instanceof ParameterTypeDouble ? PythonFunctionDataType.FLOAT : (parameterType instanceof ParameterTypeCategory ? PythonFunctionDataType.ENUM : (parameterType instanceof ParameterTypeBoolean ? PythonFunctionDataType.BOOL : PythonFunctionDataType.STR))))));
            Object value = this.transformString2ParameterType(pyOp.getParameter(key), parameterType);
            funcBuilder.addInput(key, PythonFunctionSourceType.PROVIDED, dataType, value, false);
        }
        catch (UndefinedParameterError undefinedParameterError) {
            // empty catch block
        }
    }

    private Object transformString2ParameterType(String value, ParameterType parameterType) {
        if (parameterType instanceof ParameterTypeEnumeration) {
            ParameterType valueType = ((ParameterTypeEnumeration)parameterType).getValueType();
            return ParameterTypeEnumeration.transformString2List((String)value).stream().map(v -> this.transformString2ParameterType((String)v, valueType)).collect(Collectors.toList());
        }
        if (parameterType instanceof ParameterTypeTupel) {
            ParameterType[] parameterTypes = ((ParameterTypeTupel)parameterType).getParameterTypes();
            String[] valueArray = ParameterTypeTupel.transformString2Tupel((String)value);
            return IntStream.range(0, parameterTypes.length).mapToObj(i -> this.transformString2ParameterType(valueArray[i], parameterTypes[i])).collect(Collectors.toList());
        }
        if (parameterType instanceof ParameterTypeList) {
            ParameterType keyType = ((ParameterTypeList)parameterType).getKeyType();
            ParameterType valueType = ((ParameterTypeList)parameterType).getValueType();
            return ParameterTypeList.transformString2List((String)value).stream().collect(Collectors.toMap(arr -> this.transformString2ParameterType(arr[0], keyType), arr -> this.transformString2ParameterType(arr[1], valueType), (oldVal, newVal) -> newVal));
        }
        if (parameterType instanceof ParameterTypeAttributeSubset) {
            return value.split(String.valueOf('\u241e'));
        }
        if (parameterType instanceof ParameterTypeInt) {
            return Integer.parseInt(value);
        }
        if (parameterType instanceof ParameterTypeDouble) {
            return Double.parseDouble(value);
        }
        if (parameterType instanceof ParameterTypeBoolean) {
            return Boolean.parseBoolean(value);
        }
        return value;
    }

    private void addInternalOutputForOutputPort(PythonFunctionCallBuilder funcBuilder, PythonOperator pyOp, ExpectedOutput expectedOutput, String name) {
        PythonFunctionDataType dataType = expectedOutput.getDataType(pyOp.getPythonOperatorDescription().getExtension().getSerializableDataClasses().keySet());
        funcBuilder.addOutput(name, PythonFunctionTargetType.PYTHON, dataType, expectedOutput.getDataClass(), expectedOutput.isCollection());
    }

    private void addExternalOutputForOutputPort(PythonFunctionCallBuilder funcBuilder, PythonOperator pyOp, ExpectedOutput expectedOutput, int operatorCounter, int resultCounter) throws UserError {
        PythonFunctionDataType dataType = expectedOutput.getDataType(pyOp.getPythonOperatorDescription().getExtension().getSerializableDataClasses().keySet());
        String dataClass = expectedOutput.getDataClass();
        boolean collection = expectedOutput.isCollection();
        OutputPort outputPort = (OutputPort)pyOp.getOutputPorts().getPortByName(expectedOutput.getName());
        if (!outputPort.isConnected()) {
            funcBuilder.addOutput(String.format("discard_%d_%d", operatorCounter, resultCounter), PythonFunctionTargetType.DISCARD, dataType, dataClass, collection);
            return;
        }
        switch (dataType) {
            case DATAFRAME: {
                funcBuilder.addOutput(String.format("result_%d_%d.rmhdf5table", operatorCounter, resultCounter), PythonFunctionTargetType.RAPIDMINER, dataType, dataClass, collection);
                break;
            }
            case FILE: 
            case SERIALIZABLE_OBJECT: {
                funcBuilder.addOutput(String.format("result_%d_%d", operatorCounter, resultCounter), PythonFunctionTargetType.RAPIDMINER, dataType, dataClass, collection);
                break;
            }
            case INTERNAL_OBJECT: {
                LogService.getRoot().log(Level.SEVERE, () -> String.format("Output port type '%s' at port %s can only be passed to other Python operators", expectedOutput.getDataClass(), outputPort.getName()));
                throw new UserError((Operator)pyOp, "pel.operator.output_internal_python", new Object[]{outputPort.getName()});
            }
            default: {
                LogService.getRoot().log(Level.WARNING, () -> String.format("Unknown output port type '%s' for Python at port %s", expectedOutput.getDataClass(), expectedOutput.getName()));
                throw new UserError((Operator)pyOp, "pel.operator.output_unknown", new Object[]{outputPort.getName()});
            }
        }
    }

    private PythonScriptResult awaitResult(PythonOperator pyOp, SubmissionResult submissionResult) throws UserError {
        CompletableFuture<PythonScriptResult> resultFuture = submissionResult.getSubmissionFuture();
        Supplier<Callable<Void>> terminatorProvider = submissionResult.getTerminatorSupplier();
        AtomicBoolean processStopped = new AtomicBoolean(false);
        try {
            do {
                Thread.sleep(50L);
                try {
                    pyOp.checkForStop();
                }
                catch (ProcessStoppedException e) {
                    try {
                        processStopped.set(true);
                        terminatorProvider.get().call();
                    }
                    catch (Exception ex) {
                        throw new PythonScriptProcessingException(ex);
                    }
                }
            } while (!processStopped.get() && !resultFuture.isDone());
            PythonScriptResult result = resultFuture.get();
            int exitCode = result.getExitCode();
            if (exitCode != 0) {
                result.close();
                throw new UserError((Operator)this.getCurrentOperator(), "pel.operator.script_abnormal_exit", new Object[]{String.valueOf(exitCode), result.getErrorMessage()});
            }
            return result;
        }
        catch (InterruptedException | CancellationException e) {
            throw new UserError((Operator)this.getCurrentOperator(), (Throwable)e, "pel.operator.script_result_wait_error");
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof PythonScriptProcessingException) {
                PythonScriptProcessingException processingException = (PythonScriptProcessingException)e.getCause();
                if (processingException.getCause() instanceof UserError) {
                    throw (UserError)processingException.getCause();
                }
                throw new UserError((Operator)this.getCurrentOperator(), processingException.getCause(), "pel.operator.script_result_technical_error");
            }
            if (e.getCause() instanceof PythonScriptRunnerException) {
                PythonScriptRunnerException runnerException = (PythonScriptRunnerException)e.getCause();
                throw new UserError((Operator)this.getCurrentOperator(), runnerException.getCause(), "pel.operator.script_result_technical_error");
            }
            if (e.getCause() instanceof PythonScriptStoppedException) {
                return null;
            }
            throw new UserError((Operator)this.getCurrentOperator(), (Throwable)e, "pel.operator.script_result_unexpected_error");
        }
        catch (PythonScriptStoppedException e) {
            return null;
        }
        catch (PythonScriptProcessingException e) {
            throw new UserError((Operator)this.getCurrentOperator(), (Throwable)e, "pel.operator.script_result_unexpected_error");
        }
    }

    private void registerCleanUp(PythonOperator startingOperator) {
        ProcessStoppedListener processStoppedListener = new ProcessStoppedListener(){

            public void stopped(Process process) {
                process.removeProcessStateListener((ProcessStateListener)this);
                PythonOperatorChain.this.cleanUp();
            }
        };
        Process process = startingOperator.getProcess();
        process.addProcessStateListener((ProcessStateListener)processStoppedListener);
        if (process.shouldStop()) {
            process.removeProcessStateListener((ProcessStateListener)processStoppedListener);
            this.cleanUp();
        }
    }

    private void cleanUp() {
        if (this.cleanedUp.compareAndSet(false, true)) {
            LogService.getRoot().log(Level.FINER, "Python: Cleaning up chain.");
            this.chain.forEach(container -> {
                container.getPyOp().setUserData(PYTHON_CHAIN_KEY, null);
                PythonOperatorTools.stopProgressAnimation(container.getPyOp());
            });
            this.orchestrators.forEach(PythonOperatorTools::stopProgressAnimation);
        }
    }

    private static void checkForCycles(List<PythonOperatorContainer> chain) throws UserError {
        HashSet<PythonOperator> visitedOperators = new HashSet<PythonOperator>();
        for (PythonOperatorContainer container : chain) {
            PythonOperator operator = container.getPyOp();
            visitedOperators.add(operator);
            for (OutputPort port : operator.getOutputPorts().getAllPorts()) {
                if (!port.isConnected() || !visitedOperators.contains(port.getOpposite().getPorts().getOwner().getOperator())) continue;
                throw new UserError((Operator)operator, "pel.operator.cycle");
            }
        }
    }

    private static void checkMandatoryParameters(Operator operator) throws UndefinedParameterError {
        try {
            for (ParameterType parameterType : operator.getParameterTypes()) {
                if (parameterType.isOptional()) continue;
                operator.getParameters().getParameter(parameterType.getKey());
            }
        }
        catch (UndefinedParameterError e) {
            e.setOperator(operator);
            throw e;
        }
    }
}

