package org.apache.paimon.flink.sink.cdc;

import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StateUtils;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.sink.StoreSinkWriteState;
import org.apache.paimon.flink.sink.StoreSinkWriteStateImpl;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.ExecutorThreadFactory;

/* loaded from: input_file:org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.class */
public class CdcRecordStoreMultiWriteOperator extends PrepareCommitOperator<CdcMultiplexRecord, MultiTableCommittable> {
    private static final long serialVersionUID = 1;
    private final StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider;
    private final String initialCommitUser;
    private final Catalog.Loader catalogLoader;
    private MemoryPoolFactory memoryPoolFactory;
    private Catalog catalog;
    private Map<Identifier, FileStoreTable> tables;
    private StoreSinkWriteState state;
    private Map<Identifier, StoreSinkWrite> writes;
    private String commitUser;
    private ExecutorService compactExecutor;

    public CdcRecordStoreMultiWriteOperator(Catalog.Loader loader, StoreSinkWrite.WithWriteBufferProvider withWriteBufferProvider, String str, Options options) {
        super(options);
        this.catalogLoader = loader;
        this.storeSinkWriteProvider = withWriteBufferProvider;
        this.initialCommitUser = str;
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.catalog = this.catalogLoader.load();
        this.commitUser = (String) StateUtils.getSingleValueFromState(stateInitializationContext, "commit_user_state", String.class, this.initialCommitUser);
        this.state = new StoreSinkWriteStateImpl(stateInitializationContext, (str, binaryRow, i) -> {
            return true;
        });
        this.tables = new HashMap();
        this.writes = new HashMap();
        this.compactExecutor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory(Thread.currentThread().getName() + "-CdcMultiWrite-Compaction"));
    }

    public void processElement(StreamRecord<CdcMultiplexRecord> streamRecord) throws Exception {
        CdcMultiplexRecord cdcMultiplexRecord = (CdcMultiplexRecord) streamRecord.getValue();
        Identifier create = Identifier.create(cdcMultiplexRecord.databaseName(), cdcMultiplexRecord.tableName());
        FileStoreTable table = getTable(create);
        if (this.memoryPoolFactory == null) {
            this.memoryPoolFactory = new MemoryPoolFactory(this.memoryPool != null ? this.memoryPool : new HeapMemorySegmentPool(table.coreOptions().writeBufferSize(), table.coreOptions().pageSize()));
        }
        StoreSinkWrite computeIfAbsent = this.writes.computeIfAbsent(create, identifier -> {
            return this.storeSinkWriteProvider.provide(table, this.commitUser, this.state, getContainingTask().getEnvironment().getIOManager(), this.memoryPoolFactory, getMetricGroup());
        });
        ((StoreSinkWriteImpl) computeIfAbsent).withCompactExecutor(this.compactExecutor);
        Optional<GenericRow> genericRow = CdcRecordUtils.toGenericRow(cdcMultiplexRecord.record(), table.schema().fields());
        if (!genericRow.isPresent()) {
            FileStoreTable fileStoreTable = table;
            while (true) {
                fileStoreTable = fileStoreTable.copyWithLatestSchema();
                this.tables.put(create, fileStoreTable);
                genericRow = CdcRecordUtils.toGenericRow(cdcMultiplexRecord.record(), fileStoreTable.schema().fields());
                if (genericRow.isPresent()) {
                    break;
                } else {
                    Thread.sleep(((Duration) fileStoreTable.coreOptions().toConfiguration().get(CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME)).toMillis());
                }
            }
            computeIfAbsent.replace(fileStoreTable);
        }
        try {
            computeIfAbsent.write(genericRow.get());
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    private FileStoreTable getTable(Identifier identifier) throws InterruptedException {
        FileStoreTable fileStoreTable = this.tables.get(identifier);
        if (fileStoreTable == null) {
            while (true) {
                try {
                    fileStoreTable = (FileStoreTable) this.catalog.getTable(identifier);
                    this.tables.put(identifier, fileStoreTable);
                    break;
                } catch (Catalog.TableNotExistException e) {
                    Thread.sleep(CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME.defaultValue().toMillis());
                }
            }
        }
        if (fileStoreTable.bucketMode() != BucketMode.HASH_FIXED) {
            throw new UnsupportedOperationException(String.format("Combine mode Sink only supports FIXED bucket mode, but %s is %s", fileStoreTable.name(), fileStoreTable.bucketMode()));
        }
        return fileStoreTable;
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        Iterator<StoreSinkWrite> it = this.writes.values().iterator();
        while (it.hasNext()) {
            it.next().snapshotState();
        }
        this.state.snapshotState();
    }

    @Override // org.apache.paimon.flink.sink.PrepareCommitOperator
    public void close() throws Exception {
        super.close();
        Iterator<StoreSinkWrite> it = this.writes.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        if (this.compactExecutor != null) {
            this.compactExecutor.shutdownNow();
        }
        if (this.catalog != null) {
            this.catalog.close();
            this.catalog = null;
        }
    }

    @Override // org.apache.paimon.flink.sink.PrepareCommitOperator
    protected List<MultiTableCommittable> prepareCommit(boolean z, long j) throws IOException {
        LinkedList linkedList = new LinkedList();
        for (Map.Entry<Identifier, StoreSinkWrite> entry : this.writes.entrySet()) {
            Identifier key = entry.getKey();
            linkedList.addAll((Collection) entry.getValue().prepareCommit(z, j).stream().map(committable -> {
                return MultiTableCommittable.fromCommittable(key, committable);
            }).collect(Collectors.toList()));
        }
        return linkedList;
    }

    @VisibleForTesting
    public Map<Identifier, FileStoreTable> tables() {
        return this.tables;
    }

    @VisibleForTesting
    public Map<Identifier, StoreSinkWrite> writes() {
        return this.writes;
    }

    @VisibleForTesting
    public String commitUser() {
        return this.commitUser;
    }
}
