package org.apache.paimon.flink.source;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestEntrySerializer;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;

/* loaded from: input_file:org/apache/paimon/flink/source/RewriteFileIndexSource.class */
public class RewriteFileIndexSource implements Source<ManifestEntry, Split, CheckpointState> {
    private static final long serialVersionUID = 1;
    private final FileStoreTable table;

    @Nullable
    private final Predicate partitionPredicate;

    /* loaded from: input_file:org/apache/paimon/flink/source/RewriteFileIndexSource$CheckpointSerde.class */
    private static class CheckpointSerde implements SimpleVersionedSerializer<CheckpointState> {
        private final SplitSerder splitSerder;

        private CheckpointSerde() {
            this.splitSerder = new SplitSerder();
        }

        public int getVersion() {
            return 0;
        }

        public byte[] serialize(CheckpointState checkpointState) throws IOException {
            DataOutputStream dataOutputStream = new DataOutputStream(new ByteArrayOutputStream());
            List<Split> files = checkpointState.files();
            dataOutputStream.writeInt(files.size());
            Iterator<Split> it = files.iterator();
            while (it.hasNext()) {
                byte[] serialize = this.splitSerder.serialize(it.next());
                dataOutputStream.writeInt(serialize.length);
                dataOutputStream.write(serialize);
            }
            return new byte[0];
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public CheckpointState m1367deserialize(int i, byte[] bArr) throws IOException {
            DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bArr));
            int readInt = dataInputStream.readInt();
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < readInt; i2++) {
                byte[] bArr2 = new byte[dataInputStream.readInt()];
                dataInputStream.readFully(bArr2);
                arrayList.add(this.splitSerder.m1370deserialize(0, bArr2));
            }
            return new CheckpointState(arrayList);
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/source/RewriteFileIndexSource$CheckpointState.class */
    public static class CheckpointState {
        private final List<Split> files;

        public CheckpointState(List<Split> list) {
            this.files = list;
        }

        public List<Split> files() {
            return this.files;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/paimon/flink/source/RewriteFileIndexSource$ManifestFileSplitEnumerator.class */
    public static class ManifestFileSplitEnumerator implements SplitEnumerator<Split, CheckpointState> {
        private final SplitEnumeratorContext<Split> splitEnumeratorContext;
        private final List<Split> files;

        public ManifestFileSplitEnumerator(SplitEnumeratorContext<Split> splitEnumeratorContext, List<Split> list) {
            this.splitEnumeratorContext = splitEnumeratorContext;
            this.files = list;
        }

        public void start() {
        }

        public void handleSplitRequest(int i, @Nullable String str) {
            if (this.files.isEmpty()) {
                this.splitEnumeratorContext.signalNoMoreSplits(i);
            } else {
                this.splitEnumeratorContext.assignSplit(this.files.remove(0), i);
            }
        }

        public void addSplitsBack(List<Split> list, int i) {
            this.files.addAll(list);
        }

        public void addReader(int i) {
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public CheckpointState m1368snapshotState(long j) throws Exception {
            return new CheckpointState(this.files);
        }

        public void close() throws IOException {
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/source/RewriteFileIndexSource$Reader.class */
    private static class Reader implements SourceReader<ManifestEntry, Split> {
        private final SourceReaderContext context;
        private final ArrayDeque<Split> splits = new ArrayDeque<>();
        private boolean noMore;

        public Reader(SourceReaderContext sourceReaderContext) {
            this.context = sourceReaderContext;
        }

        public void start() {
            this.context.sendSplitRequest();
        }

        public InputStatus pollNext(ReaderOutput<ManifestEntry> readerOutput) throws Exception {
            if (!this.splits.isEmpty()) {
                readerOutput.collect(this.splits.poll().entry());
                if (!this.noMore && this.splits.isEmpty()) {
                    this.context.sendSplitRequest();
                }
                if (!this.splits.isEmpty()) {
                    return InputStatus.MORE_AVAILABLE;
                }
            }
            return this.noMore ? InputStatus.END_OF_INPUT : InputStatus.NOTHING_AVAILABLE;
        }

        public List<Split> snapshotState(long j) {
            return new ArrayList(this.splits);
        }

        public CompletableFuture<Void> isAvailable() {
            return this.splits.isEmpty() ? CompletableFuture.completedFuture(null) : FutureCompletingBlockingQueue.AVAILABLE;
        }

        public void addSplits(List<Split> list) {
            this.splits.addAll(list);
        }

        public void notifyNoMoreSplits() {
            this.noMore = true;
        }

        public void close() throws Exception {
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/source/RewriteFileIndexSource$Split.class */
    public static class Split implements SourceSplit {
        private final ManifestEntry manifestEntry;

        public Split(ManifestEntry manifestEntry) {
            this.manifestEntry = manifestEntry;
        }

        public String splitId() {
            return "splitId";
        }

        ManifestEntry entry() {
            return this.manifestEntry;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/paimon/flink/source/RewriteFileIndexSource$SplitSerder.class */
    public static class SplitSerder implements SimpleVersionedSerializer<Split> {
        private static final ManifestEntrySerializer manifestEntrySerializer = new ManifestEntrySerializer();

        public int getVersion() {
            return 0;
        }

        public byte[] serialize(Split split) throws IOException {
            return manifestEntrySerializer.serializeToBytes(split.entry());
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public Split m1370deserialize(int i, byte[] bArr) throws IOException {
            return new Split(manifestEntrySerializer.deserializeFromBytes(bArr));
        }
    }

    public RewriteFileIndexSource(FileStoreTable fileStoreTable, @Nullable Predicate predicate) {
        this.table = fileStoreTable;
        this.partitionPredicate = predicate;
    }

    public Boundedness getBoundedness() {
        return Boundedness.BOUNDED;
    }

    public SplitEnumerator<Split, CheckpointState> createEnumerator(SplitEnumeratorContext<Split> splitEnumeratorContext) throws Exception {
        return new ManifestFileSplitEnumerator(splitEnumeratorContext, (List) this.table.store().newScan().withPartitionFilter(this.partitionPredicate).plan().files().stream().map(Split::new).collect(Collectors.toList()));
    }

    public SplitEnumerator<Split, CheckpointState> restoreEnumerator(SplitEnumeratorContext<Split> splitEnumeratorContext, CheckpointState checkpointState) throws Exception {
        return new ManifestFileSplitEnumerator(splitEnumeratorContext, checkpointState.files());
    }

    public SimpleVersionedSerializer<Split> getSplitSerializer() {
        return new SplitSerder();
    }

    public SimpleVersionedSerializer<CheckpointState> getEnumeratorCheckpointSerializer() {
        return new CheckpointSerde();
    }

    public SourceReader<ManifestEntry, Split> createReader(SourceReaderContext sourceReaderContext) throws Exception {
        return new Reader(sourceReaderContext);
    }

    public /* bridge */ /* synthetic */ SplitEnumerator restoreEnumerator(SplitEnumeratorContext splitEnumeratorContext, Object obj) throws Exception {
        return restoreEnumerator((SplitEnumeratorContext<Split>) splitEnumeratorContext, (CheckpointState) obj);
    }
}
