package org.apache.paimon.table.source.snapshot;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.ManifestsReader;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.PlanImpl;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.utils.ManifestReadThreadPool;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.class */
public class IncrementalStartingScanner extends AbstractStartingScanner {
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalStartingScanner.class);
    private final long endingSnapshotId;
    private final ScanMode scanMode;

    public IncrementalStartingScanner(SnapshotManager snapshotManager, long j, long j2, ScanMode scanMode) {
        super(snapshotManager);
        this.startingSnapshotId = Long.valueOf(j);
        this.endingSnapshotId = j2;
        this.scanMode = scanMode;
    }

    @Override // org.apache.paimon.table.source.snapshot.StartingScanner
    public StartingScanner.Result scan(SnapshotReader snapshotReader) {
        Optional<StartingScanner.Result> checkScanSnapshotIdValidity = checkScanSnapshotIdValidity();
        if (checkScanSnapshotIdValidity.isPresent()) {
            return checkScanSnapshotIdValidity.get();
        }
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        ManifestsReader manifestsReader = snapshotReader.manifestsReader();
        Iterator randomlyExecute = ManifestReadThreadPool.randomlyExecute(l -> {
            Snapshot snapshot = this.snapshotManager.snapshot(l.longValue());
            switch (this.scanMode) {
                case DELTA:
                    if (snapshot.commitKind() != Snapshot.CommitKind.APPEND) {
                        return Collections.emptyList();
                    }
                    break;
                case CHANGELOG:
                    if (snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE) {
                        return Collections.emptyList();
                    }
                    break;
                default:
                    throw new UnsupportedOperationException("Unsupported scan mode: " + this.scanMode);
            }
            return manifestsReader.read(snapshot, this.scanMode).filteredManifests;
        }, (List) LongStream.range(this.startingSnapshotId.longValue() + 1, this.endingSnapshotId + 1).boxed().collect(Collectors.toList()), snapshotReader.parallelism());
        snapshotReader.getClass();
        Iterator randomlyExecute2 = ManifestReadThreadPool.randomlyExecute(snapshotReader::readManifest, Lists.newArrayList(randomlyExecute), snapshotReader.parallelism());
        while (randomlyExecute2.hasNext()) {
            ManifestEntry manifestEntry = (ManifestEntry) randomlyExecute2.next();
            Preconditions.checkArgument(manifestEntry.kind() == FileKind.ADD, "Delta or changelog should only have ADD files.");
            concurrentHashMap.compute(Pair.of(manifestEntry.partition(), Integer.valueOf(manifestEntry.bucket())), (pair, list) -> {
                if (list == null) {
                    list = new ArrayList();
                }
                list.add(manifestEntry.file());
                return list;
            });
        }
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry : concurrentHashMap.entrySet()) {
            BinaryRow binaryRow = (BinaryRow) ((Pair) entry.getKey()).getLeft();
            int intValue = ((Integer) ((Pair) entry.getKey()).getRight()).intValue();
            String path = snapshotReader.pathFactory().bucketPath(binaryRow, intValue).toString();
            for (SplitGenerator.SplitGroup splitGroup : snapshotReader.splitGenerator().splitForBatch((List) entry.getValue())) {
                arrayList.add(DataSplit.builder().withSnapshot(this.endingSnapshotId).withPartition(binaryRow).withBucket(intValue).withDataFiles(splitGroup.files).rawConvertible(splitGroup.rawConvertible).withBucketPath(path).build());
            }
        }
        return StartingScanner.fromPlan(new PlanImpl(null, Long.valueOf(this.endingSnapshotId), arrayList));
    }

    public Optional<StartingScanner.Result> checkScanSnapshotIdValidity() {
        Long earliestSnapshotId = this.snapshotManager.earliestSnapshotId();
        Long latestSnapshotId = this.snapshotManager.latestSnapshotId();
        if (earliestSnapshotId == null || latestSnapshotId == null) {
            LOG.warn("There is currently no snapshot. Waiting for snapshot generation.");
            return Optional.of(new StartingScanner.NoSnapshot());
        }
        Preconditions.checkArgument(this.startingSnapshotId.longValue() <= this.endingSnapshotId, "Starting snapshotId %s must less than ending snapshotId %s.", this.startingSnapshotId, Long.valueOf(this.endingSnapshotId));
        Preconditions.checkArgument(this.startingSnapshotId.longValue() >= earliestSnapshotId.longValue() - 1 && this.endingSnapshotId <= latestSnapshotId.longValue(), "The specified scan snapshotId range [%s, %s] is out of available snapshotId range [%s, %s].", this.startingSnapshotId, Long.valueOf(this.endingSnapshotId), earliestSnapshotId, latestSnapshotId);
        return Optional.empty();
    }
}
