/*
 * Decompiled with CFR 0.152.
 */
package com.owc.operator.io.batching;

import com.owc.license.ProductInformation;
import com.owc.operator.io.batching.BatchEnabledOriginalReadOperator;
import com.owc.operator.io.batching.LimitingDataResultSet;
import com.owc.operator.loops.ParallelLoopingOperatorChain;
import com.owc.process.ports.OneToOneExtender;
import com.owc.tools.ConcurrencyTools;
import com.rapidminer.example.ExampleSet;
import com.rapidminer.extension.PluginInitJackhammerExtension;
import com.rapidminer.operator.IOObject;
import com.rapidminer.operator.Operator;
import com.rapidminer.operator.OperatorDescription;
import com.rapidminer.operator.OperatorException;
import com.rapidminer.operator.UserError;
import com.rapidminer.operator.io.AbstractExampleSource;
import com.rapidminer.operator.nio.file.FileObject;
import com.rapidminer.operator.nio.model.DataResultSetFactory;
import com.rapidminer.operator.ports.InputPort;
import com.rapidminer.operator.ports.OutputPort;
import com.rapidminer.operator.ports.metadata.MetaData;
import com.rapidminer.operator.ports.metadata.Precondition;
import com.rapidminer.operator.ports.metadata.SimplePrecondition;
import com.rapidminer.parameter.ParameterType;
import com.rapidminer.parameter.ParameterTypeInt;
import com.rapidminer.parameter.conditions.ParameterCondition;
import com.rapidminer.studio.concurrency.internal.ConcurrencyExecutionServiceProvider;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;

public abstract class AbstractReadBatchwiseOperator
extends ParallelLoopingOperatorChain {
    public static final String PARAMETER_BATCH_SIZE = "batch_size";
    protected AbstractExampleSource parameterSource;
    protected InputPort fileInputPort = (InputPort)this.getInputPorts().createPort("file");
    protected OutputPort innerBatchPort = (OutputPort)this.getSubprocess(0).getInnerSources().createPort("batch set");

    public AbstractReadBatchwiseOperator(OperatorDescription description, AbstractExampleSource parameterSource) {
        super(description, "Batch Processing");
        this.parameterSource = parameterSource;
        ((BatchEnabledOriginalReadOperator)parameterSource).setParent(this);
        this.fileInputPort.addPrecondition((Precondition)new SimplePrecondition(this.fileInputPort, new MetaData(FileObject.class)){

            protected boolean isMandatory() {
                return false;
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public void doWork(boolean isLicensed, boolean isParallizable) throws OperatorException {
        List<IOObject> inputData = this.inputExtender.getDataOrNull(IOObject.class);
        List metaData = this.getParameterList("data_set_meta_data_information");
        if (metaData.isEmpty()) {
            throw new UserError((Operator)this, 217, new Object[]{"data_set_meta_data_information", this.getName()});
        }
        BatchEnabledOriginalReadOperator batchedOriginalOperator = this.getBatchedOriginalOperator();
        LimitingDataResultSet dataResultSet = null;
        try (DataResultSetFactory dataResultSetFactory = batchedOriginalOperator.getDataResultSetFactoryPublic();){
            dataResultSet = new LimitingDataResultSet(dataResultSetFactory.makeDataResultSet((Operator)this), this.getParameterAsInt(PARAMETER_BATCH_SIZE));
            boolean executeParallely = this.checkParallelizability();
            if (executeParallely) {
                if (!isLicensed) throw new UserError((Operator)this, "toolkit.license_exceeded_parallel_execution");
                this.doLoopAsynchronously(dataResultSet, inputData, batchedOriginalOperator);
                return;
            } else {
                this.doLoopSynchronously(dataResultSet, inputData, batchedOriginalOperator);
            }
            return;
        }
        finally {
            if (dataResultSet != null) {
                dataResultSet.closeFinally();
            }
        }
    }

    protected abstract BatchEnabledOriginalReadOperator getBatchedOriginalOperator();

    private void doLoopAsynchronously(final LimitingDataResultSet resultSet, final List<IOObject> inputData, final BatchEnabledOriginalReadOperator operator) throws OperatorException {
        LinkedList<Callable> tasks = new LinkedList<Callable>();
        int numberOfUnusedThreads = Math.max(1, ConcurrencyTools.getUnusedThreads());
        for (int currentIteration = 0; currentIteration < numberOfUnusedThreads; ++currentIteration) {
            final AbstractReadBatchwiseOperator copy = (AbstractReadBatchwiseOperator)this.cloneOperator(this.getName(), true);
            Callable task = ConcurrencyExecutionServiceProvider.INSTANCE.getService().prepareOperatorTask(this.getProcess(), (Operator)copy, currentIteration, false, (Callable)new Callable<List<List<IOObject>>>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public List<List<IOObject>> call() throws Exception {
                    LinkedList<List<IOObject>> resultList = new LinkedList<List<IOObject>>();
                    while (true) {
                        ExampleSet batchSet = null;
                        LimitingDataResultSet limitingDataResultSet = resultSet;
                        synchronized (limitingDataResultSet) {
                            if (!resultSet.hasNextBatch()) {
                                return resultList;
                            }
                            resultSet.nextBatch();
                            batchSet = operator.transformDataResultSetPublic(resultSet);
                        }
                        copy.inputExtender.deliver(AbstractReadBatchwiseOperator.this.getDataCopy(inputData));
                        List batchResult = copy.performBatch(batchSet);
                        resultList.add(batchResult);
                    }
                }
            });
            tasks.add(task);
        }
        List results = ConcurrencyExecutionServiceProvider.INSTANCE.getService().executeOperatorTasks((Operator)this, tasks);
        List<OneToOneExtender.PortPair> managedPairs = this.outputExtender.getManagedPairs();
        for (List resultListList : results) {
            for (List resultList : resultListList) {
                int i = 0;
                for (IOObject result : resultList) {
                    managedPairs.get(i++).getInputPort().receive(result);
                }
                this.outputExtender.collect();
            }
        }
    }

    private void doLoopSynchronously(LimitingDataResultSet resultSet, List<IOObject> inputData, BatchEnabledOriginalReadOperator operator) throws OperatorException {
        this.inputExtender.deliver(this.getDataCopy(inputData));
        this.loopExtender.deliver(this.getDataCopy(inputData));
        while (resultSet.hasNext() || resultSet.hasNextBatch()) {
            resultSet.nextBatch();
            ExampleSet batchSet = operator.transformDataResultSetPublic(resultSet);
            this.inputExtender.deliver(this.getDataCopy(inputData));
            this.performBatch(batchSet);
            this.outputExtender.collect();
        }
    }

    private List<IOObject> performBatch(ExampleSet batchSet) throws OperatorException, UserError {
        this.innerBatchPort.deliver((IOObject)batchSet);
        this.getSubprocess(0).execute();
        if (this.loopExtender.isConnected()) {
            this.loopExtender.deliver(this.loopExtender.getDataOrNull(IOObject.class));
        }
        return this.outputExtender.getDataOrNull(IOObject.class);
    }

    @Override
    public List<ParameterType> getParameterTypes() {
        List types = this.parameterSource.getParameterTypes();
        for (ParameterType type : types) {
            for (ParameterCondition condition : type.getConditions()) {
                condition.setOperator((Operator)this);
            }
            if (!"data_set_meta_data_information".equals(type.getKey())) continue;
            type.setExpert(false);
        }
        types.add(0, new ParameterTypeInt(PARAMETER_BATCH_SIZE, "The size of a batch that is read before being processed by the subprocess.", 1, Integer.MAX_VALUE));
        types.addAll(super.getParameterTypes());
        return types;
    }

    public InputPort getFileInputPort() {
        return this.fileInputPort;
    }

    @Override
    public ProductInformation getProductInformation() {
        return PluginInitJackhammerExtension.PRODUCT_INFORMATION;
    }
}

