package org.apache.paimon.flink.sink;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.paimon.data.serializer.VersionedSerializer;
import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.utils.SerializableSupplier;

/* loaded from: input_file:org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.class */
public class RestoreAndFailCommittableStateManager<GlobalCommitT> implements CommittableStateManager<GlobalCommitT> {
    private static final long serialVersionUID = 1;
    private final SerializableSupplier<VersionedSerializer<GlobalCommitT>> committableSerializer;
    private ListState<GlobalCommitT> streamingCommitterState;

    public RestoreAndFailCommittableStateManager(SerializableSupplier<VersionedSerializer<GlobalCommitT>> serializableSupplier) {
        this.committableSerializer = serializableSupplier;
    }

    @Override // org.apache.paimon.flink.sink.CommittableStateManager
    public void initializeState(StateInitializationContext stateInitializationContext, Committer<?, GlobalCommitT> committer) throws Exception {
        this.streamingCommitterState = new SimpleVersionedListState(stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE)), new VersionedSerializerWrapper(this.committableSerializer.get()));
        ArrayList arrayList = new ArrayList();
        Iterable iterable = (Iterable) this.streamingCommitterState.get();
        arrayList.getClass();
        iterable.forEach(arrayList::add);
        this.streamingCommitterState.clear();
        recover(arrayList, committer);
    }

    private void recover(List<GlobalCommitT> list, Committer<?, GlobalCommitT> committer) throws Exception {
        if (committer.filterAndCommit(list) > 0) {
            throw new RuntimeException("This exception is intentionally thrown after committing the restored checkpoints. By restarting the job we hope that writers can start writing based on these new commits.");
        }
    }

    @Override // org.apache.paimon.flink.sink.CommittableStateManager
    public void snapshotState(StateSnapshotContext stateSnapshotContext, List<GlobalCommitT> list) throws Exception {
        this.streamingCommitterState.update(list);
    }
}
