/*
 * Decompiled with CFR 0.152.
 */
package com.gc.iotools.stream.writer;

import com.gc.iotools.stream.base.ExecutionModel;
import com.gc.iotools.stream.base.ExecutorServiceFactory;
import com.gc.iotools.stream.reader.CloseShieldReader;
import com.gc.iotools.stream.utils.LogUtils;
import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.io.Reader;
import java.io.Writer;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class WriterToReader<T>
extends Writer {
    private static final int DEFAULT_TIMEOUT = 900000;
    private static int defaultPipeSize = 4096;
    private static final Logger LOG = LoggerFactory.getLogger(WriterToReader.class);
    private boolean abort = false;
    private boolean closeCalled = false;
    private final boolean joinOnClose;
    private final PipedWriter pipedWriter;
    private final Future<T> writingResult;

    public static void setDefaultPipeSize(int defaultPipeSize) {
        WriterToReader.defaultPipeSize = defaultPipeSize;
    }

    public WriterToReader() {
        this(true, ExecutionModel.THREAD_PER_INSTANCE);
    }

    public WriterToReader(boolean joinOnClose, ExecutionModel executionModel) {
        this(joinOnClose, ExecutorServiceFactory.getExecutor(executionModel));
    }

    public WriterToReader(boolean joinOnClose, ExecutorService executorService) {
        this(joinOnClose, executorService, defaultPipeSize);
    }

    public WriterToReader(boolean joinOnClose, ExecutorService executorService, int pipeBufferSize) {
        if (executorService == null) {
            throw new IllegalArgumentException("executor service can't be null");
        }
        String callerId = LogUtils.getCaller(this.getClass());
        this.pipedWriter = new PipedWriter();
        PipedReader pipedIS = new PipedReader(pipeBufferSize);
        try {
            pipedIS.connect(this.pipedWriter);
        }
        catch (IOException e) {
            throw new IllegalStateException("Error during pipe creaton", e);
        }
        DataConsumer executingProcess = new DataConsumer(pipedIS);
        this.joinOnClose = joinOnClose;
        LOG.debug("invoked by[{}] queued for start.", (Object)callerId);
        this.writingResult = executorService.submit(executingProcess);
    }

    @Override
    public final void close() throws IOException {
        this.internalClose(this.joinOnClose, TimeUnit.MILLISECONDS, 900000L);
    }

    public final void close(long timeout, TimeUnit tu) throws IOException {
        this.internalClose(true, tu, timeout);
    }

    protected abstract T doRead(Reader var1) throws Exception;

    @Override
    public final void flush() throws IOException {
        if (this.abort) {
            this.internalClose(true, TimeUnit.SECONDS, 1L);
        } else {
            this.pipedWriter.flush();
        }
    }

    public final T getResults() throws InterruptedException, ExecutionException {
        if (!this.closeCalled) {
            throw new IllegalStateException("Method close() must be called before getResults");
        }
        return this.writingResult.get();
    }

    private void internalClose(boolean join, TimeUnit timeUnit, long timeout) throws IOException {
        if (!this.closeCalled) {
            this.closeCalled = true;
            this.pipedWriter.close();
            if (join) {
                try {
                    this.writingResult.get(timeout, timeUnit);
                }
                catch (ExecutionException e) {
                    IOException e1 = new IOException("The doRead() threw exception. Use getCause() for details.");
                    e1.initCause(e.getCause());
                    throw e1;
                }
                catch (InterruptedException e) {
                    IOException e1 = new IOException("Waiting of the thread has been interrupted");
                    e1.initCause(e);
                    throw e1;
                }
                catch (TimeoutException e) {
                    if (!this.writingResult.isDone()) {
                        this.writingResult.cancel(true);
                    }
                    IOException e1 = new IOException("Waiting for the internal thread to finish took more than [" + timeout + "] " + (Object)((Object)timeUnit));
                    e1.initCause(e);
                    throw e1;
                }
            }
        }
    }

    @Override
    public final void write(char[] bytes) throws IOException {
        if (this.abort) {
            this.internalClose(true, TimeUnit.SECONDS, 1L);
        } else {
            this.pipedWriter.write(bytes);
        }
    }

    @Override
    public final void write(char[] bytes, int offset, int length) throws IOException {
        if (this.abort) {
            this.internalClose(true, TimeUnit.SECONDS, 1L);
        } else {
            this.pipedWriter.write(bytes, offset, length);
        }
    }

    @Override
    public final void write(int chartowrite) throws IOException {
        if (this.abort) {
            this.internalClose(true, TimeUnit.SECONDS, 1L);
        } else {
            this.pipedWriter.write(chartowrite);
        }
    }

    private final class DataConsumer
    implements Callable<T> {
        private final Reader reader;

        DataConsumer(Reader reader) {
            this.reader = reader;
        }

        @Override
        public synchronized T call() throws Exception {
            Object processResult;
            try {
                CloseShieldReader<Reader> reader = new CloseShieldReader<Reader>(this.reader);
                processResult = WriterToReader.this.doRead(reader);
            }
            catch (Exception e) {
                WriterToReader.this.abort = true;
                throw e;
            }
            finally {
                this.emptyReader();
                this.reader.close();
            }
            return processResult;
        }

        private void emptyReader() {
            try {
                char[] buffer = new char[8192];
                while (this.reader.read(buffer) >= 0) {
                }
            }
            catch (IOException e) {
                if (e.getMessage() != null && e.getMessage().indexOf("closed") > 0) {
                    LOG.debug("Stream already closed");
                } else {
                    LOG.error("IOException while empty Reader a thread can be locked", e);
                }
            }
            catch (Throwable e) {
                LOG.error("IOException while empty Reader a thread can be locked", e);
            }
        }
    }
}

