package org.apache.paimon.flink.sink;

import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.typeutils.EitherTypeInfo;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.paimon.flink.source.AppendBypassCoordinateOperatorFactory;
import org.apache.paimon.table.FileStoreTable;

/* loaded from: input_file:org/apache/paimon/flink/sink/UnawareBucketSink.class */
public abstract class UnawareBucketSink<T> extends FlinkWriteSink<T> {
    protected final FileStoreTable table;
    protected final LogSinkFunction logSinkFunction;

    @Nullable
    protected final Integer parallelism;

    public UnawareBucketSink(FileStoreTable fileStoreTable, @Nullable Map<String, String> map, LogSinkFunction logSinkFunction, @Nullable Integer num) {
        super(fileStoreTable, map);
        this.table = fileStoreTable;
        this.logSinkFunction = logSinkFunction;
        this.parallelism = num;
    }

    @Override // org.apache.paimon.flink.sink.FlinkSink
    public DataStream<Committable> doWrite(DataStream<T> dataStream, String str, @Nullable Integer num) {
        DataStream doWrite = super.doWrite(dataStream, str, this.parallelism);
        boolean z = !this.table.coreOptions().writeOnly();
        boolean z2 = dataStream.getExecutionEnvironment().getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
        if (z && z2) {
            doWrite = doWrite.transform("Compact Coordinator: " + this.table.name(), new EitherTypeInfo(new CommittableTypeInfo(), new CompactionTaskTypeInfo()), new AppendBypassCoordinateOperatorFactory(this.table)).forceNonParallel().transform("Compact Worker: " + this.table.name(), new CommittableTypeInfo(), new AppendBypassCompactWorkerOperator(this.table, str)).setParallelism(doWrite.getParallelism());
        }
        return doWrite;
    }
}
