package org.apache.paimon.flink.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.WrappedManifestCommittable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;

/* loaded from: input_file:org/apache/paimon/flink/sink/StoreMultiCommitter.class */
public class StoreMultiCommitter implements Committer<MultiTableCommittable, WrappedManifestCommittable> {
    private final Catalog catalog;
    private final Committer.Context context;
    private final Map<Identifier, StoreCommitter> tableCommitters;
    private final boolean ignoreEmptyCommit;
    private final Map<String, String> dynamicOptions;

    public StoreMultiCommitter(Catalog.Loader loader, Committer.Context context) {
        this(loader, context, false, Collections.emptyMap());
    }

    public StoreMultiCommitter(Catalog.Loader loader, Committer.Context context, boolean z, Map<String, String> map) {
        this.catalog = loader.load();
        this.context = context;
        this.ignoreEmptyCommit = z;
        this.dynamicOptions = map;
        this.tableCommitters = new HashMap();
    }

    @Override // org.apache.paimon.flink.sink.Committer
    public boolean forceCreatingSnapshot() {
        return true;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.paimon.flink.sink.Committer
    public WrappedManifestCommittable combine(long j, long j2, List<MultiTableCommittable> list) {
        return combine(j, j2, new WrappedManifestCommittable(j, j2), list);
    }

    @Override // org.apache.paimon.flink.sink.Committer
    public WrappedManifestCommittable combine(long j, long j2, WrappedManifestCommittable wrappedManifestCommittable, List<MultiTableCommittable> list) {
        for (MultiTableCommittable multiTableCommittable : list) {
            ManifestCommittable computeCommittableIfAbsent = wrappedManifestCommittable.computeCommittableIfAbsent(Identifier.create(multiTableCommittable.getDatabase(), multiTableCommittable.getTable()), j, j2);
            switch (multiTableCommittable.kind()) {
                case FILE:
                    computeCommittableIfAbsent.addFileCommittable((CommitMessage) multiTableCommittable.wrappedCommittable());
                    break;
                case LOG_OFFSET:
                    LogOffsetCommittable logOffsetCommittable = (LogOffsetCommittable) multiTableCommittable.wrappedCommittable();
                    computeCommittableIfAbsent.addLogOffset(logOffsetCommittable.bucket(), logOffsetCommittable.offset());
                    break;
            }
        }
        return wrappedManifestCommittable;
    }

    @Override // org.apache.paimon.flink.sink.Committer
    public void commit(List<WrappedManifestCommittable> list) throws IOException, InterruptedException {
        if (list.isEmpty()) {
            return;
        }
        Map<Identifier, List<ManifestCommittable>> groupByTable = groupByTable(list);
        groupByTable.keySet().forEach(this::getStoreCommitter);
        long checkpointId = list.get(0).checkpointId();
        long watermark = list.get(0).watermark();
        for (Map.Entry<Identifier, StoreCommitter> entry : this.tableCommitters.entrySet()) {
            List<ManifestCommittable> list2 = groupByTable.get(entry.getKey());
            StoreCommitter value = entry.getValue();
            if (list2 != null) {
                value.commit(list2);
            } else if (value.forceCreatingSnapshot()) {
                value.commit(Collections.singletonList(value.combine(checkpointId, watermark, Collections.emptyList())));
            }
        }
    }

    @Override // org.apache.paimon.flink.sink.Committer
    public int filterAndCommit(List<WrappedManifestCommittable> list, boolean z) throws IOException {
        int i = 0;
        for (Map.Entry<Identifier, List<ManifestCommittable>> entry : groupByTable(list).entrySet()) {
            i += getStoreCommitter(entry.getKey()).filterAndCommit(entry.getValue(), z);
        }
        return i;
    }

    private Map<Identifier, List<ManifestCommittable>> groupByTable(List<WrappedManifestCommittable> list) {
        return (Map) list.stream().flatMap(wrappedManifestCommittable -> {
            return wrappedManifestCommittable.manifestCommittables().entrySet().stream().map(entry -> {
                return Tuple2.of(entry.getKey(), entry.getValue());
            });
        }).collect(Collectors.groupingBy(tuple2 -> {
            return (Identifier) tuple2.f0;
        }, Collectors.mapping(tuple22 -> {
            return (ManifestCommittable) tuple22.f1;
        }, Collectors.toList())));
    }

    @Override // org.apache.paimon.flink.sink.Committer
    public Map<Long, List<MultiTableCommittable>> groupByCheckpoint(Collection<MultiTableCommittable> collection) {
        HashMap hashMap = new HashMap();
        for (MultiTableCommittable multiTableCommittable : collection) {
            ((List) hashMap.computeIfAbsent(Long.valueOf(multiTableCommittable.checkpointId()), l -> {
                return new ArrayList();
            })).add(multiTableCommittable);
        }
        return hashMap;
    }

    private StoreCommitter getStoreCommitter(Identifier identifier) {
        StoreCommitter storeCommitter = this.tableCommitters.get(identifier);
        if (storeCommitter == null) {
            try {
                FileStoreTable fileStoreTable = (FileStoreTable) this.catalog.getTable(identifier).copy(this.dynamicOptions);
                storeCommitter = new StoreCommitter(fileStoreTable, fileStoreTable.newCommit(this.context.commitUser()).ignoreEmptyCommit(this.ignoreEmptyCommit), this.context);
                this.tableCommitters.put(identifier, storeCommitter);
            } catch (Catalog.TableNotExistException e) {
                throw new RuntimeException(String.format("Failed to get committer for table %s", identifier.getFullName()), e);
            }
        }
        return storeCommitter;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        Iterator<StoreCommitter> it = this.tableCommitters.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        if (this.catalog != null) {
            this.catalog.close();
        }
    }
}
