package org.apache.paimon.flink.sink;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fileindex.FileIndexCommon;
import org.apache.paimon.fileindex.FileIndexFormat;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileIndexWriter;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;

/* loaded from: input_file:org/apache/paimon/flink/sink/RewriteFileIndexSink.class */
public class RewriteFileIndexSink extends FlinkWriteSink<ManifestEntry> {

    /* loaded from: input_file:org/apache/paimon/flink/sink/RewriteFileIndexSink$FileIndexModificationOperator.class */
    private static class FileIndexModificationOperator extends PrepareCommitOperator<ManifestEntry, Committable> {
        private static final long serialVersionUID = 1;
        private final FileStoreTable table;
        private transient FileIndexProcessor fileIndexProcessor;
        private transient List<CommitMessage> messages;

        public FileIndexModificationOperator(Options options, FileStoreTable fileStoreTable) {
            super(options);
            this.table = fileStoreTable;
        }

        @Override // org.apache.paimon.flink.sink.PrepareCommitOperator
        public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<Committable>> output) {
            super.setup(streamTask, streamConfig, output);
            this.fileIndexProcessor = new FileIndexProcessor(this.table);
            this.messages = new ArrayList();
        }

        public void processElement(StreamRecord<ManifestEntry> streamRecord) throws Exception {
            ManifestEntry manifestEntry = (ManifestEntry) streamRecord.getValue();
            BinaryRow partition = manifestEntry.partition();
            int bucket = manifestEntry.bucket();
            DataFileMeta file = manifestEntry.file();
            this.messages.add(new CommitMessageImpl(partition, bucket, DataIncrement.emptyIncrement(), new CompactIncrement(Collections.singletonList(file), Collections.singletonList(this.fileIndexProcessor.process(partition, bucket, file)), Collections.emptyList())));
        }

        @Override // org.apache.paimon.flink.sink.PrepareCommitOperator
        protected List<Committable> prepareCommit(boolean z, long j) throws IOException {
            ArrayList arrayList = new ArrayList(this.messages);
            this.messages.clear();
            return (List) arrayList.stream().map(commitMessage -> {
                return new Committable(j, Committable.Kind.FILE, commitMessage);
            }).collect(Collectors.toList());
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/sink/RewriteFileIndexSink$FileIndexProcessor.class */
    public static class FileIndexProcessor {
        private final FileStoreTable table;
        private final FileIndexOptions fileIndexOptions;
        private final FileIO fileIO;
        private final FileStorePathFactory pathFactory;
        private final Map<Pair<BinaryRow, Integer>, DataFilePathFactory> dataFilePathFactoryMap = new HashMap();
        private final SchemaCache schemaInfoCache;
        private final long sizeInMeta;

        public FileIndexProcessor(FileStoreTable fileStoreTable) {
            this.table = fileStoreTable;
            this.fileIndexOptions = fileStoreTable.coreOptions().indexColumnsOptions();
            this.fileIO = fileStoreTable.fileIO();
            this.pathFactory = fileStoreTable.store().pathFactory();
            this.schemaInfoCache = new SchemaCache(this.fileIndexOptions, new SchemaManager(this.fileIO, fileStoreTable.location()));
            this.sizeInMeta = fileStoreTable.coreOptions().fileIndexInManifestThreshold();
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r9v0, types: [java.lang.Object, org.apache.paimon.io.DataFileMeta] */
        public DataFileMeta process(BinaryRow binaryRow, int i, DataFileMeta dataFileMeta) throws IOException {
            Map hashMap;
            Path dataFileToFileIndexPath;
            DataFilePathFactory computeIfAbsent = this.dataFilePathFactoryMap.computeIfAbsent(Pair.of(binaryRow, Integer.valueOf(i)), pair -> {
                return this.pathFactory.createDataFilePathFactory(binaryRow, i);
            });
            SchemaInfo schemaInfo = this.schemaInfoCache.schemaInfo(dataFileMeta.schemaId());
            ArrayList arrayList = new ArrayList(dataFileMeta.extraFiles());
            List list = (List) dataFileMeta.extraFiles().stream().filter(str -> {
                return str.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX);
            }).collect(Collectors.toList());
            arrayList.removeAll(list);
            if (list.isEmpty()) {
                hashMap = new HashMap();
                dataFileToFileIndexPath = DataFilePathFactory.dataFileToFileIndexPath(computeIfAbsent.toPath(dataFileMeta.fileName()));
            } else {
                String str2 = (String) list.get(0);
                FileIndexFormat.Reader createReader = FileIndexFormat.createReader(this.fileIO.newInputStream(computeIfAbsent.toPath(str2)), schemaInfo.fileSchema);
                Throwable th = null;
                try {
                    try {
                        hashMap = createReader.readAll();
                        if (createReader != null) {
                            if (0 != 0) {
                                try {
                                    createReader.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                createReader.close();
                            }
                        }
                        dataFileToFileIndexPath = DataFilePathFactory.createNewFileIndexFilePath(computeIfAbsent.toPath(str2));
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (createReader != null) {
                        if (th != null) {
                            try {
                                createReader.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            createReader.close();
                        }
                    }
                    throw th3;
                }
            }
            Iterator it = new HashSet(hashMap.entrySet()).iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                Object obj = (String) entry.getKey();
                if (schemaInfo.projectedColFullNames.contains(obj)) {
                    Map map = (Map) hashMap.get(obj);
                    for (String str3 : ((Map) entry.getValue()).keySet()) {
                        if (!map.containsKey(str3)) {
                            map.remove(str3);
                        }
                    }
                } else {
                    hashMap.remove(obj);
                }
            }
            DataFileIndexWriter create = DataFileIndexWriter.create(this.fileIO, dataFileToFileIndexPath, schemaInfo.fileSchema.project(schemaInfo.projectedIndexCols), this.fileIndexOptions, schemaInfo.colNameMapping);
            if (create != null) {
                RecordReader<InternalRow> createReader2 = this.table.newReadBuilder().withProjection(schemaInfo.projectedIndexCols).newRead().createReader(DataSplit.builder().withPartition(binaryRow).withBucket(i).withBucketPath(this.pathFactory.bucketPath(binaryRow, i).toString()).withDataFiles(Collections.singletonList(dataFileMeta)).rawConvertible(true).build());
                Throwable th5 = null;
                try {
                    try {
                        create.getClass();
                        createReader2.forEachRemaining(create::write);
                        if (createReader2 != null) {
                            if (0 != 0) {
                                try {
                                    createReader2.close();
                                } catch (Throwable th6) {
                                    th5.addSuppressed(th6);
                                }
                            } else {
                                createReader2.close();
                            }
                        }
                        Map map2 = hashMap;
                        create.serializeMaintainers().forEach((str4, map3) -> {
                            ((Map) map2.computeIfAbsent(str4, str4 -> {
                                return new HashMap();
                            })).putAll(map3);
                        });
                    } finally {
                    }
                } catch (Throwable th7) {
                    if (createReader2 != null) {
                        if (th5 != null) {
                            try {
                                createReader2.close();
                            } catch (Throwable th8) {
                                th5.addSuppressed(th8);
                            }
                        } else {
                            createReader2.close();
                        }
                    }
                    throw th7;
                }
            }
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            FileIndexFormat.Writer createWriter = FileIndexFormat.createWriter(byteArrayOutputStream);
            Throwable th9 = null;
            try {
                try {
                    if (!hashMap.isEmpty()) {
                        createWriter.writeColumnIndexes(hashMap);
                    }
                    if (createWriter != null) {
                        if (0 != 0) {
                            try {
                                createWriter.close();
                            } catch (Throwable th10) {
                                th9.addSuppressed(th10);
                            }
                        } else {
                            createWriter.close();
                        }
                    }
                    if (byteArrayOutputStream.size() <= this.sizeInMeta) {
                        return byteArrayOutputStream.size() == 0 ? dataFileMeta.copy(arrayList) : dataFileMeta.copy(byteArrayOutputStream.toByteArray());
                    }
                    PositionOutputStream newOutputStream = this.fileIO.newOutputStream(dataFileToFileIndexPath, true);
                    Throwable th11 = null;
                    try {
                        try {
                            newOutputStream.write(byteArrayOutputStream.toByteArray());
                            if (newOutputStream != null) {
                                if (0 != 0) {
                                    try {
                                        newOutputStream.close();
                                    } catch (Throwable th12) {
                                        th11.addSuppressed(th12);
                                    }
                                } else {
                                    newOutputStream.close();
                                }
                            }
                            arrayList.add(dataFileToFileIndexPath.getName());
                            return dataFileMeta.copy(arrayList);
                        } finally {
                        }
                    } catch (Throwable th13) {
                        if (newOutputStream != null) {
                            if (th11 != null) {
                                try {
                                    newOutputStream.close();
                                } catch (Throwable th14) {
                                    th11.addSuppressed(th14);
                                }
                            } else {
                                newOutputStream.close();
                            }
                        }
                        throw th13;
                    }
                } finally {
                }
            } catch (Throwable th15) {
                if (createWriter != null) {
                    if (th9 != null) {
                        try {
                            createWriter.close();
                        } catch (Throwable th16) {
                            th9.addSuppressed(th16);
                        }
                    } else {
                        createWriter.close();
                    }
                }
                throw th15;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/paimon/flink/sink/RewriteFileIndexSink$SchemaCache.class */
    public static class SchemaCache {
        private final FileIndexOptions fileIndexOptions;
        private final SchemaManager schemaManager;
        private final TableSchema currentSchema;
        private final Map<Long, SchemaInfo> schemaInfos = new HashMap();
        private final Set<Long> fileSchemaIds = new HashSet();

        public SchemaCache(FileIndexOptions fileIndexOptions, SchemaManager schemaManager) {
            this.fileIndexOptions = fileIndexOptions;
            this.schemaManager = schemaManager;
            this.currentSchema = schemaManager.latest().orElseThrow(RuntimeException::new);
        }

        public SchemaInfo schemaInfo(long j) {
            String columnName;
            if (!this.fileSchemaIds.contains(Long.valueOf(j))) {
                RowType logicalRowType = this.schemaManager.schema(j).logicalRowType();
                Map<String, String> createIndexNameMapping = j == this.currentSchema.id() ? null : createIndexNameMapping(this.currentSchema.fields(), logicalRowType.getFields());
                ArrayList arrayList = new ArrayList();
                HashSet hashSet = new HashSet();
                Iterator<Map.Entry<FileIndexOptions.Column, Map<String, Options>>> it = this.fileIndexOptions.entrySet().iterator();
                while (it.hasNext()) {
                    FileIndexOptions.Column key = it.next().getKey();
                    if (createIndexNameMapping != null) {
                        columnName = createIndexNameMapping.getOrDefault(key.getColumnName(), null);
                        if (columnName == null) {
                        }
                    } else {
                        columnName = key.getColumnName();
                    }
                    arrayList.add(columnName);
                    hashSet.add(key.isNestedColumn() ? FileIndexCommon.toMapKey(columnName, key.getNestedColumnName()) : key.getColumnName());
                }
                Map<Long, SchemaInfo> map = this.schemaInfos;
                Long valueOf = Long.valueOf(j);
                Stream stream = arrayList.stream();
                logicalRowType.getClass();
                map.put(valueOf, new SchemaInfo(logicalRowType, createIndexNameMapping, stream.mapToInt(logicalRowType::getFieldIndex).toArray(), hashSet));
                this.fileSchemaIds.add(Long.valueOf(j));
            }
            return this.schemaInfos.get(Long.valueOf(j));
        }

        private static Map<String, String> createIndexNameMapping(List<DataField> list, List<DataField> list2) {
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            for (DataField dataField : list) {
                hashMap2.put(Integer.valueOf(dataField.id()), dataField.name());
            }
            for (DataField dataField2 : list2) {
                String str = (String) hashMap2.getOrDefault(Integer.valueOf(dataField2.id()), null);
                if (str != null) {
                    hashMap.put(str, dataField2.name());
                }
            }
            return hashMap;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/paimon/flink/sink/RewriteFileIndexSink$SchemaInfo.class */
    public static class SchemaInfo {
        private final RowType fileSchema;
        private final Map<String, String> colNameMapping;
        private final int[] projectedIndexCols;
        private final Set<String> projectedColFullNames;

        private SchemaInfo(RowType rowType, Map<String, String> map, int[] iArr, Set<String> set) {
            this.fileSchema = rowType;
            this.colNameMapping = map;
            this.projectedIndexCols = iArr;
            this.projectedColFullNames = set;
        }
    }

    public RewriteFileIndexSink(FileStoreTable fileStoreTable) {
        super(fileStoreTable, null);
    }

    @Override // org.apache.paimon.flink.sink.FlinkSink
    protected OneInputStreamOperator<ManifestEntry, Committable> createWriteOperator(StoreSinkWrite.Provider provider, String str) {
        return new FileIndexModificationOperator(this.table.coreOptions().toConfiguration(), this.table);
    }
}
