/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.spark.procedure;

import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.append.AppendCompactCoordinator;
import org.apache.paimon.append.AppendCompactTask;
import org.apache.paimon.append.cluster.IncrementalClusterManager;
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.spark.SparkUtils;
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionUtils;
import org.apache.paimon.spark.commands.PaimonSparkWriter;
import org.apache.paimon.spark.procedure.BaseProcedure;
import org.apache.paimon.spark.procedure.ProcedureBuilder;
import org.apache.paimon.spark.procedure.ProcedureParameter;
import org.apache.paimon.spark.sort.TableSorter;
import org.apache.paimon.spark.util.ScanPlanHelper$;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.table.sink.AppendCompactTaskSerializer;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.CommitMessageSerializer;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.ProcedureUtils;
import org.apache.paimon.utils.SerializationUtils;
import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.utils.TimeUtils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.PaimonUtils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class CompactProcedure
extends BaseProcedure {
    private static final Logger LOG = LoggerFactory.getLogger(CompactProcedure.class);
    private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{ProcedureParameter.required("table", DataTypes.StringType), ProcedureParameter.optional("partitions", DataTypes.StringType), ProcedureParameter.optional("compact_strategy", DataTypes.StringType), ProcedureParameter.optional("order_strategy", DataTypes.StringType), ProcedureParameter.optional("order_by", DataTypes.StringType), ProcedureParameter.optional("where", DataTypes.StringType), ProcedureParameter.optional("options", DataTypes.StringType), ProcedureParameter.optional("partition_idle_time", DataTypes.StringType)};
    private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{new StructField("result", DataTypes.BooleanType, true, Metadata.empty())});
    private static final String MINOR = "minor";
    private static final String FULL = "full";

    protected CompactProcedure(TableCatalog tableCatalog) {
        super(tableCatalog);
    }

    @Override
    public ProcedureParameter[] parameters() {
        return PARAMETERS;
    }

    @Override
    public StructType outputType() {
        return OUTPUT_TYPE;
    }

    @Override
    public InternalRow[] call(InternalRow args) {
        Duration partitionIdleTime;
        Identifier tableIdent = this.toIdentifier(args.getString(0), PARAMETERS[0].name());
        String partitions = this.blank(args, 1) ? null : args.getString(1);
        String compactStrategy = this.blank(args, 2) ? null : args.getString(2);
        String sortType = this.blank(args, 3) ? CoreOptions.OrderType.NONE.name() : args.getString(3);
        List sortColumns = this.blank(args, 4) ? Collections.emptyList() : Arrays.asList(args.getString(4).split(","));
        String where = this.blank(args, 5) ? null : args.getString(5);
        String options = args.isNullAt(6) ? null : args.getString(6);
        Duration duration = partitionIdleTime = this.blank(args, 7) ? null : TimeUtils.parseDuration(args.getString(7));
        if (CoreOptions.OrderType.NONE.name().equals(sortType) && !sortColumns.isEmpty()) {
            throw new IllegalArgumentException("order_strategy \"none\" cannot work with order_by columns.");
        }
        if (partitionIdleTime != null && !CoreOptions.OrderType.NONE.name().equals(sortType)) {
            throw new IllegalArgumentException("sort compact do not support 'partition_idle_time'.");
        }
        if (compactStrategy != null && !compactStrategy.equalsIgnoreCase(FULL) && !compactStrategy.equalsIgnoreCase(MINOR)) {
            throw new IllegalArgumentException(String.format("The compact strategy only supports 'full' or 'minor', but '%s' is configured.", compactStrategy));
        }
        Preconditions.checkArgument(partitions == null || where == null, "partitions and where cannot be used together.");
        String finalWhere = partitions != null ? CompactProcedure.toWhere(partitions) : where;
        return this.modifyPaimonTable(tableIdent, table -> {
            Preconditions.checkArgument(table instanceof FileStoreTable);
            Preconditions.checkArgument(sortColumns.stream().noneMatch(table.partitionKeys()::contains), "order_by should not contain partition cols, because it is meaningless, your order_by cols are %s, and partition cols are %s", sortColumns, table.partitionKeys());
            DataSourceV2Relation relation = this.createRelation(tableIdent);
            Expression condition = null;
            if (!StringUtils.isNullOrWhitespaceOnly(finalWhere)) {
                condition = ExpressionUtils.resolveFilter(this.spark(), relation, finalWhere);
                Preconditions.checkArgument(ExpressionUtils.isValidPredicate(this.spark(), condition, table.partitionKeys().toArray(new String[0])), "Only partition predicate is supported, your predicate is %s, but partition keys are %s", condition, table.partitionKeys());
            }
            HashMap<String, String> dynamicOptions = new HashMap<String, String>();
            ProcedureUtils.putIfNotEmpty(dynamicOptions, CoreOptions.WRITE_ONLY.key(), "false");
            ProcedureUtils.putAllOptions(dynamicOptions, options);
            table = table.copy(dynamicOptions);
            if (((FileStoreTable)table).coreOptions().clusteringIncrementalEnabled() && !CoreOptions.OrderType.NONE.name().equals(sortType)) {
                throw new IllegalArgumentException("The table has enabled incremental clustering, do not support sort compact.");
            }
            InternalRow internalRow = this.newInternalRow(this.execute((FileStoreTable)table, compactStrategy, sortType, sortColumns, relation, condition, partitionIdleTime));
            return new InternalRow[]{internalRow};
        });
    }

    @Override
    public String description() {
        return "This procedure execute compact action on paimon table.";
    }

    private boolean blank(InternalRow args, int index) {
        return args.isNullAt(index) || StringUtils.isNullOrWhitespaceOnly(args.getString(index));
    }

    private boolean execute(FileStoreTable table, String compactStrategy, String sortType, List<String> sortColumns, DataSourceV2Relation relation, @Nullable Expression condition, @Nullable Duration partitionIdleTime) {
        BucketMode bucketMode = table.bucketMode();
        CoreOptions.OrderType orderType = CoreOptions.OrderType.of(sortType);
        boolean clusterIncrementalEnabled = table.coreOptions().clusteringIncrementalEnabled();
        if (compactStrategy == null) {
            compactStrategy = clusterIncrementalEnabled ? MINOR : FULL;
        }
        boolean fullCompact = compactStrategy.equalsIgnoreCase(FULL);
        RowType partitionType = table.schema().logicalPartitionType();
        Predicate partitionFilter = condition == null ? null : (Predicate)ExpressionUtils.convertConditionToPaimonPredicate(condition, (Seq<Attribute>)relation.output(), partitionType, false).getOrElse(null);
        PartitionPredicate partitionPredicate = PartitionPredicate.fromPredicate(partitionType, partitionFilter);
        if (orderType.equals((Object)CoreOptions.OrderType.NONE)) {
            JavaSparkContext javaSparkContext = new JavaSparkContext(this.spark().sparkContext());
            switch (bucketMode) {
                case HASH_FIXED: 
                case HASH_DYNAMIC: {
                    this.compactAwareBucketTable(table, fullCompact, partitionPredicate, partitionIdleTime, javaSparkContext);
                    break;
                }
                case BUCKET_UNAWARE: {
                    if (clusterIncrementalEnabled) {
                        this.clusterIncrementalUnAwareBucketTable(table, fullCompact, relation);
                        break;
                    }
                    this.compactUnAwareBucketTable(table, partitionPredicate, partitionIdleTime, javaSparkContext);
                    break;
                }
                default: {
                    throw new UnsupportedOperationException("Spark compact with " + (Object)((Object)bucketMode) + " is not support yet.");
                }
            }
        } else {
            switch (bucketMode) {
                case BUCKET_UNAWARE: {
                    this.sortCompactUnAwareBucketTable(table, orderType, sortColumns, relation, partitionFilter);
                    break;
                }
                default: {
                    throw new UnsupportedOperationException("Spark compact with sort_type " + sortType + " only support unaware-bucket append-only table yet.");
                }
            }
        }
        return true;
    }

    private void compactAwareBucketTable(FileStoreTable table, boolean fullCompact, @Nullable PartitionPredicate partitionPredicate, @Nullable Duration partitionIdleTime, JavaSparkContext javaSparkContext) {
        SnapshotReader snapshotReader = table.newSnapshotReader();
        if (partitionPredicate != null) {
            snapshotReader.withPartitionFilter(partitionPredicate);
        }
        Set<BinaryRow> partitionToBeCompacted = this.getHistoryPartition(snapshotReader, partitionIdleTime);
        List partitionBuckets = snapshotReader.bucketEntries().stream().map(entry -> Pair.of(entry.partition(), entry.bucket())).distinct().filter(pair -> partitionToBeCompacted.contains(pair.getKey())).map(p -> Pair.of(SerializationUtils.serializeBinaryRow((BinaryRow)p.getLeft()), p.getRight())).collect(Collectors.toList());
        if (partitionBuckets.isEmpty()) {
            LOG.info("Partition bucket is empty, no compact job to execute.");
            return;
        }
        int readParallelism = this.readParallelism(partitionBuckets, this.spark());
        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
        JavaRDD commitMessageJavaRDD = javaSparkContext.parallelize(partitionBuckets, readParallelism).mapPartitions((FlatMapFunction & Serializable)pairIterator -> {
            IOManager ioManager = SparkUtils.createIOManager();
            BatchTableWrite write = writeBuilder.newWrite();
            write.withIOManager(ioManager);
            try {
                while (pairIterator.hasNext()) {
                    Pair pair = (Pair)pairIterator.next();
                    write.compact(SerializationUtils.deserializeBinaryRow((byte[])pair.getLeft()), (Integer)pair.getRight(), fullCompact);
                }
                CommitMessageSerializer serializer = new CommitMessageSerializer();
                List<CommitMessage> messages = write.prepareCommit();
                ArrayList<byte[]> serializedMessages = new ArrayList<byte[]>(messages.size());
                for (CommitMessage commitMessage : messages) {
                    serializedMessages.add(serializer.serialize(commitMessage));
                }
                Iterator<CommitMessage> iterator = serializedMessages.iterator();
                return iterator;
            }
            finally {
                write.close();
                ioManager.close();
            }
        });
        try (BatchTableCommit commit = writeBuilder.newCommit();){
            CommitMessageSerializer serializer = new CommitMessageSerializer();
            List serializedMessages = commitMessageJavaRDD.collect();
            ArrayList<CommitMessage> messages = new ArrayList<CommitMessage>(serializedMessages.size());
            for (byte[] serializedMessage : serializedMessages) {
                messages.add(serializer.deserialize(serializer.getVersion(), serializedMessage));
            }
            commit.commit(messages);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void compactUnAwareBucketTable(FileStoreTable table, @Nullable PartitionPredicate partitionPredicate, @Nullable Duration partitionIdleTime, JavaSparkContext javaSparkContext) {
        List<Object> compactionTasks;
        try {
            compactionTasks = new AppendCompactCoordinator(table, false, partitionPredicate).run();
        }
        catch (EndOfScanException e) {
            compactionTasks = new ArrayList();
        }
        if (partitionIdleTime != null) {
            Map<BinaryRow, Long> partitionInfo = table.newSnapshotReader().partitionEntries().stream().collect(Collectors.toMap(PartitionEntry::partition, PartitionEntry::lastFileCreationTime));
            long historyMilli = LocalDateTime.now().minus(partitionIdleTime).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
            compactionTasks = compactionTasks.stream().filter(task -> (Long)partitionInfo.get(task.partition()) <= historyMilli).collect(Collectors.toList());
        }
        if (compactionTasks.isEmpty()) {
            LOG.info("Task plan is empty, no compact job to execute.");
            return;
        }
        AppendCompactTaskSerializer serializer = new AppendCompactTaskSerializer();
        ArrayList<byte[]> serializedTasks = new ArrayList<byte[]>();
        try {
            for (AppendCompactTask compactionTask : compactionTasks) {
                serializedTasks.add(serializer.serialize(compactionTask));
            }
        }
        catch (IOException e) {
            throw new RuntimeException("serialize compaction task failed");
        }
        int readParallelism = this.readParallelism(serializedTasks, this.spark());
        String commitUser = CoreOptions.createCommitUser(table.coreOptions().toConfiguration());
        JavaRDD commitMessageJavaRDD = javaSparkContext.parallelize(serializedTasks, readParallelism).mapPartitions((FlatMapFunction & Serializable)taskIterator -> {
            BaseAppendFileStoreWrite write = (BaseAppendFileStoreWrite)table.store().newWrite(commitUser);
            CoreOptions coreOptions = table.coreOptions();
            if (coreOptions.rowTrackingEnabled()) {
                write.withWriteType(SpecialFields.rowTypeWithRowTracking(table.rowType()));
            }
            AppendCompactTaskSerializer ser = new AppendCompactTaskSerializer();
            ArrayList<byte[]> messages = new ArrayList<byte[]>();
            try {
                CommitMessageSerializer messageSer = new CommitMessageSerializer();
                while (taskIterator.hasNext()) {
                    AppendCompactTask task = ser.deserialize(ser.getVersion(), (byte[])taskIterator.next());
                    messages.add(messageSer.serialize(task.doCompact(table, write)));
                }
                Iterator iterator = messages.iterator();
                return iterator;
            }
            finally {
                write.close();
            }
        });
        try (TableCommitImpl commit = table.newCommit(commitUser);){
            CommitMessageSerializer messageSerializerser = new CommitMessageSerializer();
            List serializedMessages = commitMessageJavaRDD.collect();
            ArrayList<CommitMessage> messages = new ArrayList<CommitMessage>(serializedMessages.size());
            for (byte[] serializedMessage : serializedMessages) {
                messages.add(messageSerializerser.deserialize(messageSerializerser.getVersion(), serializedMessage));
            }
            commit.commit(messages);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private Set<BinaryRow> getHistoryPartition(SnapshotReader snapshotReader, @Nullable Duration partitionIdleTime) {
        Set partitionInfo = snapshotReader.partitionEntries().stream().map(partitionEntry -> Pair.of(partitionEntry.partition(), partitionEntry.lastFileCreationTime())).collect(Collectors.toSet());
        if (partitionIdleTime != null) {
            long historyMilli = LocalDateTime.now().minus(partitionIdleTime).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
            partitionInfo = partitionInfo.stream().filter(partition -> (Long)partition.getValue() <= historyMilli).collect(Collectors.toSet());
        }
        return partitionInfo.stream().map(Pair::getKey).collect(Collectors.toSet());
    }

    private void sortCompactUnAwareBucketTable(FileStoreTable table, CoreOptions.OrderType orderType, List<String> sortColumns, DataSourceV2Relation relation, @Nullable Predicate partitionFilter) {
        SnapshotReader snapshotReader = table.newSnapshotReader();
        if (partitionFilter != null) {
            snapshotReader.withPartitionFilter(partitionFilter);
        }
        Map<BinaryRow, DataSplit[]> packedSplits = this.packForSort(snapshotReader.read().dataSplits());
        TableSorter sorter = TableSorter.getSorter(table, orderType, sortColumns);
        Dataset datasetForWrite = packedSplits.values().stream().map(split -> {
            Dataset<Row> dataset = PaimonUtils.createDataset(this.spark(), ScanPlanHelper$.MODULE$.createNewScanPlan((DataSplit[])split, relation));
            return sorter.sort(dataset);
        }).reduce(Dataset::union).orElse(null);
        if (datasetForWrite != null) {
            PaimonSparkWriter writer = PaimonSparkWriter.apply(table);
            writer.writeBuilder().withOverwrite();
            writer.commit(writer.write((Dataset<Row>)datasetForWrite));
        }
    }

    private void clusterIncrementalUnAwareBucketTable(FileStoreTable table, boolean fullCompaction, DataSourceV2Relation relation) {
        IncrementalClusterManager incrementalClusterManager = new IncrementalClusterManager(table);
        Map<BinaryRow, CompactUnit> compactUnits = incrementalClusterManager.prepareForCluster(fullCompaction);
        Map<BinaryRow, DataSplit[]> partitionSplits = compactUnits.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> incrementalClusterManager.toSplits((BinaryRow)entry.getKey(), ((CompactUnit)entry.getValue()).files()).toArray(new DataSplit[0])));
        TableSorter sorter = TableSorter.getSorter(table, incrementalClusterManager.clusterCurve(), incrementalClusterManager.clusterKeys());
        LOG.info("Start to sort in partition, cluster curve is {}, cluster keys is {}", (Object)incrementalClusterManager.clusterCurve(), incrementalClusterManager.clusterKeys());
        Dataset datasetForWrite = partitionSplits.values().stream().map(split -> {
            Dataset<Row> dataset = PaimonUtils.createDataset(this.spark(), ScanPlanHelper$.MODULE$.createNewScanPlan((DataSplit[])split, relation));
            return sorter.sort(dataset);
        }).reduce(Dataset::union).orElse(null);
        if (datasetForWrite != null) {
            PaimonSparkWriter writer = PaimonSparkWriter.apply(table).writeOnly();
            Seq<CommitMessage> commitMessages = writer.write((Dataset<Row>)datasetForWrite);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Commit messages after writing:{}", commitMessages);
            }
            HashMap<BinaryRow, List> partitionClustered = new HashMap<BinaryRow, List>();
            for (CommitMessage commitMessage : JavaConverters.seqAsJavaList(commitMessages)) {
                Preconditions.checkArgument(commitMessage.bucket() == 0);
                partitionClustered.computeIfAbsent(commitMessage.partition(), k -> new ArrayList()).addAll(((CommitMessageImpl)commitMessage).newFilesIncrement().newFiles());
            }
            ArrayList<CommitMessageImpl> clusterMessages = new ArrayList<CommitMessageImpl>();
            for (Map.Entry entry2 : partitionClustered.entrySet()) {
                BinaryRow partition = (BinaryRow)entry2.getKey();
                List<DataFileMeta> clusterBefore = compactUnits.get(partition).files();
                List<DataFileMeta> clusterAfter = incrementalClusterManager.upgrade((List)entry2.getValue(), compactUnits.get(partition).outputLevel());
                LOG.info("Partition {}: upgrade file level to {}", (Object)partition, (Object)compactUnits.get(partition).outputLevel());
                CompactIncrement compactIncrement = new CompactIncrement(clusterBefore, clusterAfter, Collections.emptyList());
                clusterMessages.add(new CommitMessageImpl(partition, 0, table.coreOptions().bucket(), DataIncrement.emptyIncrement(), compactIncrement));
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Commit messages after reorganizing:{}", clusterMessages);
            }
            writer.commit((Seq<CommitMessage>)JavaConverters.asScalaBuffer(clusterMessages).toSeq());
        }
    }

    private Map<BinaryRow, DataSplit[]> packForSort(List<DataSplit> dataSplits) {
        return dataSplits.stream().collect(Collectors.groupingBy(DataSplit::partition, Collectors.collectingAndThen(Collectors.toList(), list -> list.toArray(new DataSplit[0]))));
    }

    private int readParallelism(List<?> groupedTasks, SparkSession spark) {
        int readParallelism;
        int sparkParallelism = Math.max(spark.sparkContext().defaultParallelism(), spark.sessionState().conf().numShufflePartitions());
        if (sparkParallelism > (readParallelism = Math.min(groupedTasks.size(), sparkParallelism))) {
            LOG.warn(String.format("Spark default parallelism (%s) is greater than bucket or task parallelism (%s),we use %s as the final read parallelism", sparkParallelism, readParallelism, readParallelism));
        }
        return readParallelism;
    }

    @VisibleForTesting
    static String toWhere(String partitions) {
        List<Map<String, String>> maps = ParameterUtils.getPartitions(partitions.split(";"));
        return maps.stream().map(a -> a.entrySet().stream().map(entry -> (String)entry.getKey() + "=" + (String)entry.getValue()).reduce((s0, s1) -> s0 + " AND " + s1)).filter(Optional::isPresent).map(Optional::get).map(a -> "(" + a + ")").reduce((a, b) -> a + " OR " + b).orElse(null);
    }

    public static ProcedureBuilder builder() {
        return new BaseProcedure.Builder<CompactProcedure>(){

            @Override
            public CompactProcedure doBuild() {
                return new CompactProcedure(this.tableCatalog());
            }
        };
    }
}

