package org.apache.paimon.flink.source;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.source.KeyGroupPruner;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsKeyGroupPrune;
import org.apache.flink.table.connector.source.abilities.SupportsLookupCustomShuffle;
import org.apache.flink.table.connector.source.abilities.SupportsScanRange;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.paimon.flink.VvrConnectorOptions;
import org.apache.paimon.flink.incremental.IncrementalProcessingUtils;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.flink.lookup.FileStoreLookupFunction;
import org.apache.paimon.flink.lookup.KeyGroupPruneFileStoreLookupFunction;
import org.apache.paimon.flink.lookup.VvrFileStoreLookupFunction;
import org.apache.paimon.flink.lookup.partitioner.BucketIdExtractor;
import org.apache.paimon.flink.lookup.partitioner.BucketShufflePartitioner;
import org.apache.paimon.flink.lookup.partitioner.BucketShuffleStrategy;
import org.apache.paimon.flink.lookup.partitioner.ReplicatedBucketShuffleStrategy;
import org.apache.paimon.flink.lookup.partitioner.ShuffleStrategy;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/source/VvrDataTableSource.class */
public class VvrDataTableSource extends DataTableSource implements SupportsLookupCustomShuffle, SupportsScanRange, SupportsKeyGroupPrune<RowData> {
    private static final Logger LOG = LoggerFactory.getLogger(VvrDataTableSource.class);
    private final ObjectIdentifier tableIdentifier;
    private final boolean streaming;
    private final DynamicTableFactory.Context context;

    @Nullable
    private final LogStoreTableFactory logStoreTableFactory;

    @Nullable
    private final WatermarkStrategy<RowData> watermarkStrategy;

    @Nullable
    private final List<String> dynamicPartitionFilteringFields;

    @Nullable
    private BucketShufflePartitioner bucketShufflePartitioner;

    @Nullable
    private KeyGroupPruner<RowData> keyGroupPruner;

    public VvrDataTableSource(ObjectIdentifier objectIdentifier, Table table, boolean z, DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory) {
        this(objectIdentifier, table, z, context, logStoreTableFactory, null, (int[][]) null, null, null, null, null);
    }

    public VvrDataTableSource(ObjectIdentifier objectIdentifier, Table table, boolean z, DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory, @Nullable Predicate predicate, @Nullable int[][] iArr, @Nullable Long l, @Nullable WatermarkStrategy<RowData> watermarkStrategy, @Nullable List<String> list, @Nullable KeyGroupPruner<RowData> keyGroupPruner) {
        super(objectIdentifier, table, z, context, logStoreTableFactory);
        this.tableIdentifier = objectIdentifier;
        this.streaming = z;
        this.context = context;
        this.logStoreTableFactory = logStoreTableFactory;
        this.predicate = predicate;
        this.projectFields = iArr;
        this.limit = l;
        this.watermarkStrategy = watermarkStrategy;
        this.dynamicPartitionFilteringFields = list;
        this.keyGroupPruner = keyGroupPruner;
    }

    public Optional<SupportsLookupCustomShuffle.InputDataPartitioner> getPartitioner() {
        return Optional.ofNullable(this.bucketShufflePartitioner);
    }

    @Override // org.apache.paimon.flink.source.BaseDataTableSource
    protected FileStoreLookupFunction getFileStoreLookupFunction(LookupTableSource.LookupContext lookupContext, Table table, int[] iArr, int[] iArr2) {
        List<String> list = (List) Arrays.stream(iArr2).mapToObj(i -> {
            return table.rowType().getFieldNames().get(iArr[i]);
        }).collect(Collectors.toList());
        List<String> bucketKeys = ((FileStoreTable) table).schema().bucketKeys();
        ShuffleStrategy shuffleStrategy = null;
        if (supportBucketShufflePartitioner(list, bucketKeys) && lookupContext.preferCustomShuffle()) {
            int bucket = ((FileStoreTable) table).store().options().bucket();
            BucketIdExtractor bucketIdExtractor = new BucketIdExtractor(bucket, ((FileStoreTable) table).schema(), list, bucketKeys);
            if (((Boolean) Options.fromMap(table.options()).get(VvrConnectorOptions.LOOKUP_ENABLE_REPLICATED_SHUFFLE)).booleanValue()) {
                Integer num = (Integer) this.context.getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_HASH_REPLICATE_NUM);
                LOG.info("Using replicated bucket shuffle strategy, num buckets: {}, num replicate: {}", Integer.valueOf(bucket), num);
                shuffleStrategy = new ReplicatedBucketShuffleStrategy(bucket, num.intValue());
            } else {
                LOG.info("Using bucket shuffle strategy, num buckets: {}", Integer.valueOf(bucket));
                shuffleStrategy = new BucketShuffleStrategy(bucket);
            }
            this.bucketShufflePartitioner = new BucketShufflePartitioner(shuffleStrategy, bucketIdExtractor);
        }
        if (shuffleStrategy != null) {
            LOG.info("Paimon connector is using bucket shuffle partitioning strategy.");
            return new VvrFileStoreLookupFunction(table, iArr, iArr2, this.predicate, shuffleStrategy);
        }
        if (this.keyGroupPruner == null) {
            return new FileStoreLookupFunction(table, iArr, iArr2, this.predicate);
        }
        LOG.info("Paimon connector is using key group pruner.");
        return new KeyGroupPruneFileStoreLookupFunction(table, iArr, iArr2, this.predicate, this.keyGroupPruner);
    }

    @Override // org.apache.paimon.flink.source.DataTableSource
    /* renamed from: copy */
    public VvrDataTableSource mo1354copy() {
        return new VvrDataTableSource(this.tableIdentifier, this.table, this.streaming, this.context, this.logStoreTableFactory, this.predicate, this.projectFields, this.limit, this.watermarkStrategy, this.dynamicPartitionFilteringFields, this.keyGroupPruner);
    }

    @Override // org.apache.flink.table.connector.source.abilities.SupportsScanRange
    public ScanTableSource applyScanRange(long j, long j2) {
        return new VvrDataTableSource(this.tableIdentifier, IncrementalProcessingUtils.newTableWithScanRange(this.table, j, j2), this.streaming, this.context, this.logStoreTableFactory, this.predicate, this.projectFields, this.limit, this.watermarkStrategy, this.dynamicPartitionFilteringFields, this.keyGroupPruner);
    }

    public void applyKeyGroupPruner(KeyGroupPruner<RowData> keyGroupPruner) {
        this.keyGroupPruner = keyGroupPruner;
    }

    private boolean supportBucketShufflePartitioner(List<String> list, List<String> list2) {
        return BucketMode.HASH_FIXED.equals(((FileStoreTable) this.table).bucketMode()) && new HashSet(list).containsAll(list2);
    }
}
