/*
 * Decompiled with CFR 0.152.
 */
package com.rapidminer.extension.pythonscripting.serialization.arrow;

import com.rapidminer.belt.column.Column;
import com.rapidminer.belt.column.ColumnType;
import com.rapidminer.belt.table.Table;
import com.rapidminer.extension.pythonscripting.serialization.arrow.ColumnTypeMapper;
import com.rapidminer.extension.pythonscripting.serialization.arrow.DictionaryInitializer;
import com.rapidminer.extension.pythonscripting.serialization.arrow.convert.Converters;
import com.rapidminer.tools.LogService;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.stream.IntStream;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.ArrowFileWriter;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;

public class ArrowWriter {
    private static final int BATCH_DEFAULT_ROWS = 512;
    private static final int BATCH_MAX_CELLS = 16384;
    public static final int NOMINAL_VALUE_LENGTH_THRESHOLD = 512;
    private static final int ARROW_VARCHAR_MAX_LENGTH = 32768;
    public static final String RM_TYPE = "rm_type";
    private Table table;
    private Schema schema;
    private final BufferAllocator allocator;
    private final DictionaryProvider.MapDictionaryProvider dictionaryProvider;
    private final Map<Integer, Integer> maxValueLengthForColumns = new HashMap<Integer, Integer>();
    private final ColumnTypeMapper columnTypeMapper;
    private final DictionaryInitializer dictionaryInitializer;

    public ArrowWriter() {
        this.allocator = new RootAllocator(Long.MAX_VALUE);
        this.dictionaryProvider = new DictionaryProvider.MapDictionaryProvider(new Dictionary[0]);
        this.columnTypeMapper = new ColumnTypeMapper();
        this.dictionaryInitializer = new DictionaryInitializer(this.allocator, this.dictionaryProvider);
    }

    public void setTable(Table table) {
        this.table = table;
        this.schema = this.createSchema(table);
    }

    public void writeArrowToFile(Path filePath) throws IOException {
        List<ValueVector> dictionaryVectors = this.dictionaryInitializer.initializeDictionaries(this.table, this.schema);
        try (FileChannel fileChannel = FileChannel.open(filePath, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
             VectorSchemaRoot root = VectorSchemaRoot.create(this.schema, this.allocator);
             ArrowFileWriter writer = new ArrowFileWriter(root, this.dictionaryProvider, fileChannel);){
            writer.start();
            this.writeArrowData(writer, root, "file: " + filePath);
            writer.end();
            LogService.getRoot().info(() -> String.format("Successfully serialized VectorSchemaRoot to Arrow file: %s", filePath));
        }
        catch (IOException e) {
            LogService.getRoot().log(Level.SEVERE, "Error serializing VectorSchemaRoot to Arrow file: " + filePath, e);
            throw e;
        }
        finally {
            this.dictionaryInitializer.closeDictionaryVectors(dictionaryVectors);
        }
    }

    public void writeArrowToStream(OutputStream stream) throws IOException {
        List<ValueVector> dictionaryVectors = this.dictionaryInitializer.initializeDictionaries(this.table, this.schema);
        LogService.getRoot().log(Level.INFO, "Starting serialization of VectorSchemaRoot to Arrow stream.");
        try (VectorSchemaRoot root = VectorSchemaRoot.create(this.schema, this.allocator);
             ArrowStreamWriter writer = new ArrowStreamWriter(root, (DictionaryProvider)this.dictionaryProvider, stream);){
            writer.start();
            this.writeArrowData(writer, root, "stream");
            writer.end();
            LogService.getRoot().log(Level.INFO, "Successfully serialized VectorSchemaRoot to Arrow stream.");
        }
        catch (IOException e) {
            LogService.getRoot().log(Level.SEVERE, "Error serializing VectorSchemaRoot to Arrow stream.", e);
            throw e;
        }
        finally {
            this.dictionaryInitializer.closeDictionaryVectors(dictionaryVectors);
        }
    }

    private Schema createSchema(Table table) {
        ArrayList<Field> fields = new ArrayList<Field>(table.width());
        for (int columnIndex = 0; columnIndex < table.width(); ++columnIndex) {
            Field field = this.columnTypeMapper.mapColumnType(table, columnIndex, this.maxValueLengthForColumns);
            fields.add(field);
        }
        return new Schema((Iterable<Field>)fields, null);
    }

    private int rowsPerBatch() {
        int maxGlobalValueLength = 1;
        for (int i = 0; i < this.table.width(); ++i) {
            int maxValueLength = this.maxValueLengthForColumns.getOrDefault(i, 1);
            if (maxGlobalValueLength >= maxValueLength) continue;
            maxGlobalValueLength = maxValueLength;
        }
        int tableWidth = this.table.width();
        if (tableWidth == 0) {
            return 1;
        }
        return Math.min(Math.min(512, 16384 / tableWidth), 32768 / maxGlobalValueLength);
    }

    private void writeArrowData(org.apache.arrow.vector.ipc.ArrowWriter writer, VectorSchemaRoot root, String logTarget) throws IOException {
        try {
            int rowsInBatch;
            int totalRows = this.table.height();
            int maxRowsPerBatch = this.rowsPerBatch();
            for (int rowsWritten = 0; rowsWritten < totalRows; rowsWritten += rowsInBatch) {
                rowsInBatch = Math.min(maxRowsPerBatch, totalRows - rowsWritten);
                root.allocateNew();
                root.setRowCount(rowsInBatch);
                this.writeToArrowVectorInBatch(root, rowsWritten, rowsInBatch);
                writer.writeBatch();
                root.clear();
            }
        }
        catch (IOException e) {
            LogService.getRoot().log(Level.SEVERE, "Error during batch writing to " + logTarget, e);
            throw e;
        }
    }

    private void writeToArrowVectorInBatch(VectorSchemaRoot root, int offset, int rowsInBatch) {
        IntStream.range(0, this.table.width()).parallel().forEach(i -> {
            Column column = (Column)this.table.select().columns().get(i);
            ColumnType columnType = column.type();
            FieldVector vector = root.getVector(this.table.label(i));
            vector.allocateNew();
            vector.setValueCount(rowsInBatch);
            try {
                Converters.INSTANCE.writeBatch(column, vector, offset, rowsInBatch);
            }
            catch (IllegalArgumentException e) {
                throw new IllegalArgumentException("No converter found for column '" + vector.getField().getName() + "' of type '" + columnType + "'", e);
            }
            vector.setValueCount(rowsInBatch);
        });
    }

    public void close() {
        this.allocator.close();
    }
}

