package org.apache.paimon.flink.sink;

import java.util.Optional;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;

/* loaded from: input_file:org/apache/paimon/flink/sink/CompactorSinkBuilder.class */
public class CompactorSinkBuilder {
    private final FileStoreTable table;
    private DataStream<RowData> input;

    public CompactorSinkBuilder(FileStoreTable fileStoreTable) {
        this.table = fileStoreTable;
    }

    public CompactorSinkBuilder withInput(DataStream<RowData> dataStream) {
        this.input = dataStream;
        return this;
    }

    public DataStreamSink<?> build() {
        BucketMode bucketMode = this.table.bucketMode();
        switch (bucketMode) {
            case HASH_FIXED:
            case HASH_DYNAMIC:
                return buildForBucketAware();
            case BUCKET_UNAWARE:
            default:
                throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode);
        }
    }

    private DataStreamSink<?> buildForBucketAware() {
        return new CompactorSink(this.table).sinkFrom(FlinkStreamPartitioner.partition(this.input, new BucketsRowChannelComputer(), (Integer) Optional.ofNullable(this.table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key())).map(Integer::valueOf).orElse(null)));
    }
}
