package org.apache.paimon.flink.sink.cdc;

import javax.annotation.Nullable;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.paimon.annotation.Experimental;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Preconditions;

@Experimental
/* loaded from: input_file:org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.class */
public class CdcSinkBuilder<T> {
    private DataStream<T> input = null;
    private EventParser.Factory<T> parserFactory = null;
    private Table table = null;
    private Identifier identifier = null;
    private Catalog.Loader catalogLoader = null;

    @Nullable
    private Integer parallelism;

    public CdcSinkBuilder<T> withInput(DataStream<T> dataStream) {
        this.input = dataStream;
        return this;
    }

    public CdcSinkBuilder<T> withParserFactory(EventParser.Factory<T> factory) {
        this.parserFactory = factory;
        return this;
    }

    public CdcSinkBuilder<T> withTable(Table table) {
        this.table = table;
        return this;
    }

    public CdcSinkBuilder<T> withParallelism(@Nullable Integer num) {
        this.parallelism = num;
        return this;
    }

    public CdcSinkBuilder<T> withIdentifier(Identifier identifier) {
        this.identifier = identifier;
        return this;
    }

    public CdcSinkBuilder<T> withCatalogLoader(Catalog.Loader loader) {
        this.catalogLoader = loader;
        return this;
    }

    public DataStreamSink<?> build() {
        Preconditions.checkNotNull(this.input, "Input DataStream can not be null.");
        Preconditions.checkNotNull(this.parserFactory, "Event ParserFactory can not be null.");
        Preconditions.checkNotNull(this.table, "Paimon Table can not be null.");
        Preconditions.checkNotNull(this.identifier, "Paimon Table Identifier can not be null.");
        Preconditions.checkNotNull(this.catalogLoader, "Paimon Catalog Loader can not be null.");
        if (!(this.table instanceof FileStoreTable)) {
            throw new IllegalArgumentException("Table should be a data table, but is: " + this.table.getClass().getName());
        }
        FileStoreTable fileStoreTable = (FileStoreTable) this.table;
        SingleOutputStreamOperator parallelism = this.input.forward().process(new CdcParsingProcessFunction(this.parserFactory)).name("Side Output").setParallelism(this.input.getParallelism());
        SingleOutputStreamOperator name = SingleOutputStreamOperatorUtils.getSideOutput(parallelism, CdcParsingProcessFunction.NEW_DATA_FIELD_LIST_OUTPUT_TAG).process(new UpdatedDataFieldsProcessFunction(new SchemaManager(fileStoreTable.fileIO(), fileStoreTable.location()), this.identifier, this.catalogLoader)).name("Schema Evolution");
        name.getTransformation().setParallelism(1);
        name.getTransformation().setMaxParallelism(1);
        DataStream<CdcRecord> cdcRecordConvert = CaseSensitiveUtils.cdcRecordConvert(this.catalogLoader, parallelism);
        BucketMode bucketMode = fileStoreTable.bucketMode();
        switch (bucketMode) {
            case HASH_FIXED:
                return buildForFixedBucket(cdcRecordConvert);
            case HASH_DYNAMIC:
                return new CdcDynamicBucketSink((FileStoreTable) this.table).build(cdcRecordConvert, this.parallelism);
            case BUCKET_UNAWARE:
                return buildForUnawareBucket(cdcRecordConvert);
            default:
                throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode);
        }
    }

    private DataStreamSink<?> buildForFixedBucket(DataStream<CdcRecord> dataStream) {
        FileStoreTable fileStoreTable = (FileStoreTable) this.table;
        return new CdcFixedBucketSink(fileStoreTable).sinkFrom(FlinkStreamPartitioner.partition(dataStream, new CdcRecordChannelComputer(fileStoreTable.schema()), this.parallelism));
    }

    private DataStreamSink<?> buildForUnawareBucket(DataStream<CdcRecord> dataStream) {
        return new CdcUnawareBucketSink((FileStoreTable) this.table, this.parallelism).sinkFrom(dataStream.rebalance());
    }
}
