/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.append.cluster;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.cluster.IncrementalClusterStrategy;
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.mergetree.LevelSortedRun;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IncrementalClusterManager {
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalClusterManager.class);
    private final SnapshotReader snapshotReader;
    private final IncrementalClusterStrategy incrementalClusterStrategy;
    private final CoreOptions.OrderType clusterCurve;
    private final List<String> clusterKeys;
    private int maxLevel;

    public IncrementalClusterManager(FileStoreTable table) {
        Preconditions.checkArgument(table.bucketMode() == BucketMode.BUCKET_UNAWARE, "only append unaware-bucket table support incremental clustering.");
        this.snapshotReader = table.newSnapshotReader().dropStats();
        CoreOptions options = table.coreOptions();
        Preconditions.checkArgument(options.clusteringIncrementalEnabled(), "Only support incremental clustering when '%s' is true.", CoreOptions.CLUSTERING_INCREMENTAL.key());
        this.incrementalClusterStrategy = new IncrementalClusterStrategy(table.schemaManager(), options.clusteringColumns(), options.maxSizeAmplificationPercent(), options.sortedRunSizeRatio(), options.numSortedRunCompactionTrigger());
        this.clusterCurve = options.clusteringStrategy(options.clusteringColumns().size());
        this.clusterKeys = options.clusteringColumns();
        this.maxLevel = options.numLevels();
    }

    public Map<BinaryRow, CompactUnit> prepareForCluster(boolean fullCompaction) {
        Map<BinaryRow, List<LevelSortedRun>> partitionLevels = this.constructLevels();
        if (LOG.isDebugEnabled()) {
            partitionLevels.forEach((partition, levelSortedRuns) -> {
                String runsInfo = levelSortedRuns.stream().map(lsr -> String.format("level-%s:%s", lsr.level(), lsr.run().files().size())).collect(Collectors.joining(","));
                LOG.debug("Partition {} has {} runs: [{}]", new Object[]{partition, levelSortedRuns.size(), runsInfo});
            });
        }
        Map<BinaryRow, Optional> units = partitionLevels.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> this.incrementalClusterStrategy.pick(this.maxLevel, (List)entry.getValue(), fullCompaction)));
        Map<BinaryRow, CompactUnit> filteredUnits = units.entrySet().stream().filter(entry -> ((Optional)entry.getValue()).isPresent()).collect(Collectors.toMap(Map.Entry::getKey, entry -> (CompactUnit)((Optional)entry.getValue()).get()));
        if (LOG.isDebugEnabled()) {
            filteredUnits.forEach((partition, compactUnit) -> {
                String filesInfo = compactUnit.files().stream().map(file -> String.format("%s,%s,%s", file.fileName(), file.level(), file.fileSize())).collect(Collectors.joining(", "));
                LOG.debug("Partition {}, outputLevel:{}, clustered with {} files: [{}]", new Object[]{partition, compactUnit.outputLevel(), compactUnit.files().size(), filesInfo});
            });
        }
        return filteredUnits;
    }

    public Map<BinaryRow, List<LevelSortedRun>> constructLevels() {
        List<DataSplit> dataSplits = this.snapshotReader.read().dataSplits();
        this.maxLevel = Math.max(this.maxLevel, dataSplits.stream().flatMap(split -> split.dataFiles().stream()).mapToInt(DataFileMeta::level).max().orElse(-1) + 1);
        Preconditions.checkArgument(this.maxLevel > 1, "Number of levels must be at least 2.");
        HashMap<BinaryRow, List> partitionFiles = new HashMap<BinaryRow, List>();
        for (DataSplit dataSplit : dataSplits) {
            partitionFiles.computeIfAbsent(dataSplit.partition(), k -> new ArrayList()).addAll(dataSplit.dataFiles());
        }
        return partitionFiles.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> this.constructPartitionLevels((List)entry.getValue())));
    }

    public List<LevelSortedRun> constructPartitionLevels(List<DataFileMeta> partitionFiles) {
        ArrayList<LevelSortedRun> partitionLevels = new ArrayList<LevelSortedRun>();
        Map<Integer, List<DataFileMeta>> levelMap = partitionFiles.stream().collect(Collectors.groupingBy(DataFileMeta::level));
        for (Map.Entry<Integer, List<DataFileMeta>> entry : levelMap.entrySet()) {
            int level = entry.getKey();
            if (level == 0) {
                for (DataFileMeta level0File : entry.getValue()) {
                    partitionLevels.add(new LevelSortedRun(level, SortedRun.fromSingle(level0File)));
                }
                continue;
            }
            partitionLevels.add(new LevelSortedRun(level, SortedRun.fromSorted(entry.getValue())));
        }
        partitionLevels.sort(Comparator.comparing(LevelSortedRun::level));
        return partitionLevels;
    }

    public List<DataSplit> toSplits(BinaryRow partition, List<DataFileMeta> files) {
        ArrayList<DataSplit> splits = new ArrayList<DataSplit>();
        DataSplit.Builder builder = DataSplit.builder().withPartition(partition).withBucket(0).withTotalBuckets(1).isStreaming(false);
        SplitGenerator splitGenerator = this.snapshotReader.splitGenerator();
        List<SplitGenerator.SplitGroup> splitGroups = splitGenerator.splitForBatch(files);
        for (SplitGenerator.SplitGroup splitGroup : splitGroups) {
            List<DataFileMeta> dataFiles = splitGroup.files;
            String bucketPath = this.snapshotReader.pathFactory().bucketPath(partition, 0).toString();
            builder.withDataFiles(dataFiles).rawConvertible(splitGroup.rawConvertible).withBucketPath(bucketPath);
            splits.add(builder.build());
        }
        return splits;
    }

    public List<DataFileMeta> upgrade(List<DataFileMeta> filesAfterCluster, int outputLevel) {
        return filesAfterCluster.stream().map(file -> file.upgrade(outputLevel)).collect(Collectors.toList());
    }

    public CoreOptions.OrderType clusterCurve() {
        return this.clusterCurve;
    }

    public List<String> clusterKeys() {
        return this.clusterKeys;
    }
}

