package org.apache.paimon.flink.compact.changelog;

import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Either;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.table.FileStoreTable;

/* loaded from: input_file:org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.class */
public class ChangelogCompactWorkerOperator extends AbstractStreamOperator<Committable> implements OneInputStreamOperator<Either<Committable, ChangelogCompactTask>, Committable> {
    private final FileStoreTable table;

    public ChangelogCompactWorkerOperator(FileStoreTable fileStoreTable) {
        this.table = fileStoreTable;
    }

    public void processElement(StreamRecord<Either<Committable, ChangelogCompactTask>> streamRecord) throws Exception {
        if (((Either) streamRecord.getValue()).isLeft()) {
            this.output.collect(new StreamRecord(((Either) streamRecord.getValue()).left()));
        } else {
            ((ChangelogCompactTask) ((Either) streamRecord.getValue()).right()).doCompact(this.table).forEach(committable -> {
                this.output.collect(new StreamRecord(committable));
            });
        }
    }
}
