/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.table.source;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.utils.BinPacking;
import org.apache.paimon.utils.Preconditions;

public class DataEvolutionSplitGenerator
implements SplitGenerator {
    private final long targetSplitSize;
    private final long openFileCost;

    public DataEvolutionSplitGenerator(long targetSplitSize, long openFileCost) {
        this.targetSplitSize = targetSplitSize;
        this.openFileCost = openFileCost;
    }

    @Override
    public boolean alwaysRawConvertible() {
        return false;
    }

    @Override
    public List<SplitGenerator.SplitGroup> splitForBatch(List<DataFileMeta> input) {
        List<List<DataFileMeta>> files = DataEvolutionSplitGenerator.split(input);
        Function<List, Long> weightFunc = file -> Math.max(file.stream().mapToLong(DataFileMeta::fileSize).sum(), this.openFileCost);
        return BinPacking.packForOrdered(files, weightFunc, this.targetSplitSize).stream().map(f -> {
            boolean rawConvertible = f.stream().allMatch(file -> file.size() == 1);
            List<DataFileMeta> groupFiles = f.stream().flatMap(Collection::stream).collect(Collectors.toList());
            return rawConvertible ? SplitGenerator.SplitGroup.rawConvertibleGroup(groupFiles) : SplitGenerator.SplitGroup.nonRawConvertibleGroup(groupFiles);
        }).collect(Collectors.toList());
    }

    @Override
    public List<SplitGenerator.SplitGroup> splitForStreaming(List<DataFileMeta> files) {
        return this.splitForBatch(files);
    }

    public static List<List<DataFileMeta>> split(List<DataFileMeta> files) {
        ArrayList<List<DataFileMeta>> splitByRowId = new ArrayList<List<DataFileMeta>>();
        files.sort(Comparator.comparingLong(value -> value.firstRowId() == null ? Long.MIN_VALUE : value.firstRowId()).thenComparing((f1, f2) -> Long.compare(f2.maxSequenceNumber(), f1.maxSequenceNumber())));
        long lastRowId = -1L;
        long checkRowIdStart = 0L;
        ArrayList<DataFileMeta> currentSplit = new ArrayList<DataFileMeta>();
        for (DataFileMeta file : files) {
            Long firstRowId = file.firstRowId();
            if (firstRowId == null) {
                splitByRowId.add(Collections.singletonList(file));
                continue;
            }
            if (firstRowId != lastRowId) {
                if (!currentSplit.isEmpty()) {
                    splitByRowId.add(currentSplit);
                }
                Preconditions.checkArgument(firstRowId >= checkRowIdStart, "There are overlapping files in the split: \n %s, the wrong file is: \n %s", files.stream().map(Object::toString).collect(Collectors.joining(",")), file);
                currentSplit = new ArrayList();
                lastRowId = firstRowId;
                checkRowIdStart = firstRowId + file.rowCount();
            }
            currentSplit.add(file);
        }
        if (!currentSplit.isEmpty()) {
            splitByRowId.add(currentSplit);
        }
        return splitByRowId;
    }
}

