package org.apache.paimon.flink.lookup;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.flink.query.RemoteTableQuery;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.ProjectedRow;

/* loaded from: input_file:org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.class */
public class PrimaryKeyPartialLookupTable implements LookupTable {
    private final QueryExecutorFactory executorFactory;
    private final FixedBucketFromPkExtractor extractor;

    @Nullable
    private final ProjectedRow keyRearrange;

    @Nullable
    private final ProjectedRow trimmedKeyRearrange;
    private Predicate specificPartition;

    @Nullable
    private Filter<InternalRow> cacheRowFilter;
    private QueryExecutor queryExecutor;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable$LocalQueryExecutor.class */
    public static class LocalQueryExecutor implements QueryExecutor {
        private final LocalTableQuery tableQuery;
        private final StreamTableScan scan;

        private LocalQueryExecutor(FileStoreTable fileStoreTable, int[] iArr, File file, @Nullable Predicate predicate, Set<Integer> set, @Nullable Filter<InternalRow> filter) {
            Filter<Integer> filter2;
            this.tableQuery = fileStoreTable.newLocalTableQuery().withValueProjection(iArr).withIOManager(new IOManagerImpl(file.toString()));
            if (filter != null) {
                this.tableQuery.withCacheRowFilter(filter);
            }
            ReadBuilder withFilter = fileStoreTable.newReadBuilder().withFilter(predicate);
            if (set == null) {
                filter2 = null;
            } else {
                set.getClass();
                filter2 = (v1) -> {
                    return r2.contains(v1);
                };
            }
            this.scan = withFilter.withBucketFilter(filter2).newStreamScan();
        }

        @Override // org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.QueryExecutor
        public InternalRow lookup(BinaryRow binaryRow, int i, InternalRow internalRow) throws IOException {
            return this.tableQuery.lookup(binaryRow, i, internalRow);
        }

        @Override // org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.QueryExecutor
        public void refresh() {
            while (true) {
                List<Split> splits = this.scan.plan().splits();
                if (splits.isEmpty()) {
                    return;
                }
                for (Split split : splits) {
                    if (!(split instanceof DataSplit)) {
                        throw new IllegalArgumentException("Unsupported split: " + split.getClass());
                    }
                    this.tableQuery.refreshFiles(((DataSplit) split).partition(), ((DataSplit) split).bucket(), ((DataSplit) split).beforeFiles(), ((DataSplit) split).dataFiles());
                }
            }
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.tableQuery.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable$QueryExecutor.class */
    public interface QueryExecutor extends Closeable {
        InternalRow lookup(BinaryRow binaryRow, int i, InternalRow internalRow) throws IOException;

        void refresh();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable$QueryExecutorFactory.class */
    public interface QueryExecutorFactory {
        QueryExecutor create(Predicate predicate, @Nullable Filter<InternalRow> filter);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable$RemoteQueryExecutor.class */
    public static class RemoteQueryExecutor implements QueryExecutor {
        private final RemoteTableQuery tableQuery;

        private RemoteQueryExecutor(FileStoreTable fileStoreTable, int[] iArr) {
            this.tableQuery = new RemoteTableQuery(fileStoreTable).withValueProjection(iArr);
        }

        @Override // org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.QueryExecutor
        public InternalRow lookup(BinaryRow binaryRow, int i, InternalRow internalRow) throws IOException {
            return this.tableQuery.lookup(binaryRow, i, internalRow);
        }

        @Override // org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.QueryExecutor
        public void refresh() {
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.tableQuery.close();
        }
    }

    private PrimaryKeyPartialLookupTable(QueryExecutorFactory queryExecutorFactory, FileStoreTable fileStoreTable, List<String> list) {
        this.executorFactory = queryExecutorFactory;
        if (fileStoreTable.bucketMode() != BucketMode.HASH_FIXED) {
            throw new UnsupportedOperationException("Unsupported mode for partial lookup: " + fileStoreTable.bucketMode());
        }
        this.extractor = new FixedBucketFromPkExtractor(fileStoreTable.schema());
        ProjectedRow projectedRow = null;
        if (!fileStoreTable.primaryKeys().equals(list)) {
            Stream<String> stream = fileStoreTable.primaryKeys().stream();
            list.getClass();
            projectedRow = ProjectedRow.from(stream.map((v1) -> {
                return r1.indexOf(v1);
            }).mapToInt(num -> {
                return num.intValue();
            }).toArray());
        }
        this.keyRearrange = projectedRow;
        List<String> trimmedPrimaryKeys = fileStoreTable.schema().trimmedPrimaryKeys();
        ProjectedRow projectedRow2 = null;
        if (!trimmedPrimaryKeys.equals(list)) {
            Stream<String> stream2 = trimmedPrimaryKeys.stream();
            list.getClass();
            projectedRow2 = ProjectedRow.from(stream2.map((v1) -> {
                return r1.indexOf(v1);
            }).mapToInt(num2 -> {
                return num2.intValue();
            }).toArray());
        }
        this.trimmedKeyRearrange = projectedRow2;
    }

    @VisibleForTesting
    QueryExecutor queryExecutor() {
        return this.queryExecutor;
    }

    @Override // org.apache.paimon.flink.lookup.LookupTable
    public void specificPartitionFilter(Predicate predicate) {
        this.specificPartition = predicate;
    }

    @Override // org.apache.paimon.flink.lookup.LookupTable
    public void open() throws Exception {
        this.queryExecutor = this.executorFactory.create(this.specificPartition, this.cacheRowFilter);
        refresh();
    }

    @Override // org.apache.paimon.flink.lookup.LookupTable
    public List<InternalRow> get(InternalRow internalRow) throws IOException {
        InternalRow internalRow2 = internalRow;
        if (this.keyRearrange != null) {
            internalRow2 = this.keyRearrange.replaceRow(internalRow2);
        }
        this.extractor.setRecord(internalRow2);
        int bucket = this.extractor.bucket();
        BinaryRow partition = this.extractor.partition();
        InternalRow internalRow3 = internalRow;
        if (this.trimmedKeyRearrange != null) {
            internalRow3 = this.trimmedKeyRearrange.replaceRow(internalRow3);
        }
        InternalRow lookup = this.queryExecutor.lookup(partition, bucket, internalRow3);
        return lookup == null ? Collections.emptyList() : Collections.singletonList(lookup);
    }

    @Override // org.apache.paimon.flink.lookup.LookupTable
    public void refresh() {
        this.queryExecutor.refresh();
    }

    @Override // org.apache.paimon.flink.lookup.LookupTable
    public void specifyCacheRowFilter(Filter<InternalRow> filter) {
        this.cacheRowFilter = filter;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.queryExecutor != null) {
            this.queryExecutor.close();
        }
    }

    public static PrimaryKeyPartialLookupTable createLocalTable(FileStoreTable fileStoreTable, int[] iArr, File file, List<String> list, Set<Integer> set) {
        return new PrimaryKeyPartialLookupTable((predicate, filter) -> {
            return new LocalQueryExecutor(new LookupFileStoreTable(fileStoreTable, (List<String>) list), iArr, file, predicate, set, filter);
        }, fileStoreTable, list);
    }

    public static PrimaryKeyPartialLookupTable createRemoteTable(FileStoreTable fileStoreTable, int[] iArr, List<String> list) {
        return new PrimaryKeyPartialLookupTable((predicate, filter) -> {
            return new RemoteQueryExecutor(fileStoreTable, iArr);
        }, fileStoreTable, list);
    }
}
