/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.postpone;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.paimon.KeyValue;
import org.apache.paimon.KeyValueSerializer;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.IOFunction;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SinkWriter;

public class PostponeBucketWriter
implements RecordWriter<KeyValue>,
MemoryOwner {
    private final FileIO fileIO;
    private final DataFilePathFactory pathFactory;
    private final MergeFunction<KeyValue> mergeFunction;
    private final KeyValueFileWriterFactory writerFactory;
    private final List<DataFileMeta> files;
    private final IOFunction<List<DataFileMeta>, RecordReaderIterator<KeyValue>> fileRead;
    @Nullable
    private final IOManager ioManager;
    private final CompressOptions spillCompression;
    private final MemorySize maxDiskSize;
    private SinkWriter<KeyValue> sinkWriter;
    private MemorySegmentPool memorySegmentPool;
    private boolean retractValidated = false;

    public PostponeBucketWriter(FileIO fileIO, DataFilePathFactory pathFactory, CompressOptions spillCompression, MemorySize maxDiskSize, @Nullable IOManager ioManager, MergeFunction<KeyValue> mergeFunction, KeyValueFileWriterFactory writerFactory, IOFunction<List<DataFileMeta>, RecordReaderIterator<KeyValue>> fileRead, boolean useWriteBuffer, boolean spillable, @Nullable CommitIncrement restoreIncrement) {
        this.ioManager = ioManager;
        this.mergeFunction = mergeFunction;
        this.writerFactory = writerFactory;
        this.fileRead = fileRead;
        this.fileIO = fileIO;
        this.pathFactory = pathFactory;
        this.spillCompression = spillCompression;
        this.maxDiskSize = maxDiskSize;
        this.files = new ArrayList<DataFileMeta>();
        if (restoreIncrement != null) {
            this.files.addAll(restoreIncrement.newFilesIncrement().newFiles());
        }
        this.sinkWriter = useWriteBuffer ? this.createBufferedSinkWriter(spillable) : new SinkWriter.DirectSinkWriter(this::createRollingRowWriter);
    }

    private RollingFileWriter<KeyValue, DataFileMeta> createRollingRowWriter() {
        return this.writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND);
    }

    @Override
    public void write(KeyValue record) throws Exception {
        this.validateRetract(record);
        boolean success = this.sinkWriter.write(record);
        if (!success) {
            this.flush();
            success = this.sinkWriter.write(record);
            if (!success) {
                throw new RuntimeException("Mem table is too small to hold a single element.");
            }
        }
    }

    private void validateRetract(KeyValue kv) {
        if (kv.valueKind().isRetract()) {
            if (this.retractValidated) {
                return;
            }
            this.mergeFunction.reset();
            this.mergeFunction.add(kv);
            this.mergeFunction.getResult();
            this.retractValidated = true;
        }
    }

    private void flush() throws Exception {
        this.files.addAll(this.sinkWriter.flush());
    }

    @Override
    public void compact(boolean fullCompaction) throws Exception {
    }

    @Override
    public void addNewFiles(List<DataFileMeta> files) {
        this.files.addAll(files);
    }

    @Override
    public Collection<DataFileMeta> dataFiles() {
        return new ArrayList<DataFileMeta>(this.files);
    }

    @Override
    public long maxSequenceNumber() {
        return 0L;
    }

    @Override
    public void setMemoryPool(MemorySegmentPool memoryPool) {
        this.memorySegmentPool = memoryPool;
        this.sinkWriter.setMemoryPool(memoryPool);
    }

    @Override
    public long memoryOccupancy() {
        return this.sinkWriter.memoryOccupancy();
    }

    @Override
    public void flushMemory() throws Exception {
        boolean success = this.sinkWriter.flushMemory();
        if (!success) {
            this.flush();
        }
    }

    private SinkWriter.BufferedSinkWriter<KeyValue> createBufferedSinkWriter(boolean spillable) {
        RowType keyType = this.writerFactory.keyType();
        RowType valueType = this.writerFactory.valueType();
        RowType kvRowType = KeyValue.schema(keyType, valueType);
        KeyValueSerializer serializer = new KeyValueSerializer(keyType, valueType);
        return new SinkWriter.BufferedSinkWriter<KeyValue>(this::createRollingRowWriter, serializer::toRow, serializer::fromRow, this.ioManager, kvRowType, spillable, this.maxDiskSize, this.spillCompression);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void toBufferedWriter() throws Exception {
        if (this.sinkWriter != null && !this.sinkWriter.bufferSpillableWriter() && this.fileRead != null) {
            List<DataFileMeta> files = this.sinkWriter.flush();
            this.sinkWriter.close();
            this.sinkWriter = this.createBufferedSinkWriter(true);
            this.sinkWriter.setMemoryPool(this.memorySegmentPool);
            try (RecordReaderIterator<KeyValue> reader = this.fileRead.apply(files);){
                while (reader.hasNext()) {
                    this.sinkWriter.write(reader.next());
                }
            }
            finally {
                for (DataFileMeta file : files) {
                    this.fileIO.deleteQuietly(this.pathFactory.toPath(file));
                }
            }
        }
    }

    @Override
    public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception {
        this.flush();
        ArrayList<DataFileMeta> result = new ArrayList<DataFileMeta>(this.files);
        this.files.clear();
        return new CommitIncrement(new DataIncrement(result, Collections.emptyList(), Collections.emptyList()), CompactIncrement.emptyIncrement(), null);
    }

    @VisibleForTesting
    public boolean useBufferedSinkWriter() {
        return this.sinkWriter instanceof SinkWriter.BufferedSinkWriter;
    }

    @Override
    public boolean compactNotCompleted() {
        return false;
    }

    @Override
    public void sync() throws Exception {
    }

    @Override
    public void close() throws Exception {
        this.sinkWriter.close();
    }
}

