package org.apache.paimon.flink.source;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import javaewah.RunningLengthWord;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.logical.RowType;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.PredicateConverter;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
import org.apache.paimon.options.OptionsUtils;
import org.apache.paimon.predicate.PartitionPredicateVisitor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/source/FlinkTableSource.class */
public abstract class FlinkTableSource implements ScanTableSource, SupportsFilterPushDown, SupportsProjectionPushDown, SupportsLimitPushDown {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkTableSource.class);
    protected static final String FLINK_INFER_SCAN_PARALLELISM = String.format("%s%s", OptionsUtils.PAIMON_PREFIX, FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key());
    protected final Table table;

    @Nullable
    protected Predicate predicate;

    @Nullable
    protected int[][] projectFields;

    @Nullable
    protected Long limit;
    protected SplitStatistics splitStatistics;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/paimon/flink/source/FlinkTableSource$SplitStatistics.class */
    public static class SplitStatistics {
        private final int splitNumber;
        private final long totalRowCount;

        protected SplitStatistics(int i, long j) {
            this.splitNumber = i;
            this.totalRowCount = j;
        }

        public int splitNumber() {
            return this.splitNumber;
        }

        public long totalRowCount() {
            return this.totalRowCount;
        }
    }

    public FlinkTableSource(Table table) {
        this(table, null, (int[][]) null, null);
    }

    public FlinkTableSource(Table table, @Nullable Predicate predicate, @Nullable int[][] iArr, @Nullable Long l) {
        this.table = table;
        this.predicate = predicate;
        this.projectFields = iArr;
        this.limit = l;
    }

    public SupportsFilterPushDown.Result applyFilters(List<ResolvedExpression> list) {
        List<String> partitionKeys = this.table.partitionKeys();
        RowType logicalType = LogicalTypeConversion.toLogicalType(this.table.rowType());
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        PartitionPredicateVisitor partitionPredicateVisitor = new PartitionPredicateVisitor(partitionKeys);
        for (ResolvedExpression resolvedExpression : list) {
            Optional<Predicate> convert = PredicateConverter.convert(logicalType, resolvedExpression);
            if (convert.isPresent()) {
                Predicate predicate = convert.get();
                if (isStreaming() || !((Boolean) predicate.visit(partitionPredicateVisitor)).booleanValue()) {
                    arrayList.add(resolvedExpression);
                } else {
                    arrayList2.add(resolvedExpression);
                }
                arrayList3.add(predicate);
            } else {
                arrayList.add(resolvedExpression);
            }
        }
        this.predicate = arrayList3.isEmpty() ? null : PredicateBuilder.and(arrayList3);
        LOG.info("Consumed filters: {} of {}", arrayList2, list);
        return SupportsFilterPushDown.Result.of(list, arrayList);
    }

    public boolean supportsNestedProjection() {
        return false;
    }

    public void applyProjection(int[][] iArr) {
        this.projectFields = iArr;
    }

    public void applyLimit(long j) {
        this.limit = Long.valueOf(j);
    }

    public abstract boolean isStreaming();

    /* JADX INFO: Access modifiers changed from: protected */
    @Nullable
    public Integer inferSourceParallelism(StreamExecutionEnvironment streamExecutionEnvironment) {
        Options fromMap = Options.fromMap(this.table.options());
        Configuration configuration = streamExecutionEnvironment.getConfiguration();
        if (configuration.containsKey(FLINK_INFER_SCAN_PARALLELISM)) {
            fromMap.set((ConfigOption<ConfigOption<Boolean>>) FlinkConnectorOptions.INFER_SCAN_PARALLELISM, (ConfigOption<Boolean>) Boolean.valueOf(Boolean.parseBoolean((String) configuration.toMap().get(FLINK_INFER_SCAN_PARALLELISM))));
        }
        Integer num = (Integer) fromMap.get(FlinkConnectorOptions.SCAN_PARALLELISM);
        if (num == null && ((Boolean) fromMap.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)).booleanValue()) {
            if (isStreaming()) {
                num = Integer.valueOf(Math.max(1, ((Integer) fromMap.get(CoreOptions.BUCKET)).intValue()));
            } else {
                scanSplitsForInference();
                Integer valueOf = Integer.valueOf(this.splitStatistics.splitNumber());
                if (null != this.limit && this.limit.longValue() > 0) {
                    valueOf = Integer.valueOf(Math.min(valueOf.intValue(), this.limit.longValue() >= RunningLengthWord.largestliteralcount ? Integer.MAX_VALUE : this.limit.intValue()));
                }
                num = Integer.valueOf(Math.min(Integer.valueOf(Math.max(1, valueOf.intValue())).intValue(), ((Integer) fromMap.get(FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM)).intValue()));
            }
        }
        return num;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void scanSplitsForInference() {
        if (this.splitStatistics == null) {
            if (!(this.table instanceof DataTable)) {
                List<Split> splits = this.table.newReadBuilder().withFilter(this.predicate).newScan().plan().splits();
                this.splitStatistics = new SplitStatistics(splits.size(), splits.stream().mapToLong((v0) -> {
                    return v0.rowCount();
                }).sum());
                return;
            }
            long j = 0;
            long j2 = 0;
            for (PartitionEntry partitionEntry : this.table.newReadBuilder().withFilter(this.predicate).newScan().listPartitionEntries()) {
                j += partitionEntry.fileSizeInBytes();
                j2 += partitionEntry.recordCount();
            }
            this.splitStatistics = new SplitStatistics((int) ((j / ((DataTable) this.table).coreOptions().splitTargetSize()) + 1), j2);
        }
    }

    public Table getTable() {
        return this.table;
    }
}
