package org.apache.paimon.flink.sink;

import java.io.IOException;
import java.util.List;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.ProcessRecordAttributesUtil;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteState;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.ChannelComputer;

/* loaded from: input_file:org/apache/paimon/flink/sink/TableWriteOperator.class */
public abstract class TableWriteOperator<IN> extends PrepareCommitOperator<IN, Committable> {
    protected FileStoreTable table;
    private final StoreSinkWrite.Provider storeSinkWriteProvider;
    private final String initialCommitUser;
    private transient StoreSinkWriteState state;
    protected transient StoreSinkWrite write;

    public TableWriteOperator(FileStoreTable fileStoreTable, StoreSinkWrite.Provider provider, String str) {
        super(Options.fromMap(fileStoreTable.options()));
        this.table = fileStoreTable;
        this.storeSinkWriteProvider = provider;
        this.initialCommitUser = str;
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        boolean containLogSystem = containLogSystem();
        int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
        this.state = createState(stateInitializationContext, (str, binaryRow, i) -> {
            return (containLogSystem ? ChannelComputer.select(i, numberOfParallelSubtasks) : ChannelComputer.select(binaryRow, i, numberOfParallelSubtasks)) == getRuntimeContext().getIndexOfThisSubtask();
        });
        this.write = this.storeSinkWriteProvider.provide(this.table, getCommitUser(stateInitializationContext), this.state, getContainingTask().getEnvironment().getIOManager(), this.memoryPool, getMetricGroup());
    }

    protected StoreSinkWriteState createState(StateInitializationContext stateInitializationContext, StoreSinkWriteState.StateValueFilter stateValueFilter) throws Exception {
        return new StoreSinkWriteStateImpl(stateInitializationContext, stateValueFilter);
    }

    protected String getCommitUser(StateInitializationContext stateInitializationContext) throws Exception {
        return (String) StateUtils.getSingleValueFromState(stateInitializationContext, "commit_user_state", String.class, this.initialCommitUser);
    }

    public void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {
        ProcessRecordAttributesUtil.processWithWrite(recordAttributes, this.write);
        super.processRecordAttributes(recordAttributes);
    }

    protected abstract boolean containLogSystem();

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.write.snapshotState();
        this.state.snapshotState();
    }

    @Override // org.apache.paimon.flink.sink.PrepareCommitOperator
    public void close() throws Exception {
        super.close();
        if (this.write != null) {
            this.write.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.paimon.flink.sink.PrepareCommitOperator
    public List<Committable> prepareCommit(boolean z, long j) throws IOException {
        return this.write.prepareCommit(z, j);
    }

    @VisibleForTesting
    public StoreSinkWrite getWrite() {
        return this.write;
    }
}
