package org.apache.paimon.flink.action.cdc;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor;
import org.apache.paimon.flink.action.cdc.watermark.CdcWatermarkStrategy;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/SynchronizationActionBase.class */
public abstract class SynchronizationActionBase extends ActionBase {
    private static final long DEFAULT_CHECKPOINT_INTERVAL = 180000;
    protected final String database;
    protected final Configuration cdcSourceConfig;
    protected final SyncJobHandler syncJobHandler;
    protected final boolean allowUpperCase;
    protected Map<String, String> tableConfig;
    protected TypeMapping typeMapping;
    protected CdcMetadataConverter[] metadataConverters;

    public SynchronizationActionBase(String str, String str2, Map<String, String> map, Map<String, String> map2, SyncJobHandler syncJobHandler) {
        super(str, map);
        this.tableConfig = new HashMap();
        this.typeMapping = TypeMapping.defaultMapping();
        this.metadataConverters = new CdcMetadataConverter[0];
        this.database = str2;
        this.cdcSourceConfig = Configuration.fromMap(map2);
        this.syncJobHandler = syncJobHandler;
        this.allowUpperCase = this.catalog.allowUpperCase();
        this.syncJobHandler.registerJdbcDriver();
    }

    public SynchronizationActionBase withTableConfig(Map<String, String> map) {
        this.tableConfig = map;
        return this;
    }

    public SynchronizationActionBase withTypeMapping(TypeMapping typeMapping) {
        this.typeMapping = typeMapping;
        return this;
    }

    public SynchronizationActionBase withMetadataColumns(List<String> list) {
        Stream<String> stream = list.stream();
        SyncJobHandler syncJobHandler = this.syncJobHandler;
        syncJobHandler.getClass();
        this.metadataConverters = (CdcMetadataConverter[]) stream.map(syncJobHandler::provideMetadataConverter).toArray(i -> {
            return new CdcMetadataConverter[i];
        });
        return this;
    }

    @VisibleForTesting
    public Map<String, String> tableConfig() {
        return this.tableConfig;
    }

    @Override // org.apache.paimon.flink.action.Action
    public void build() throws Exception {
        this.syncJobHandler.checkRequiredOption();
        this.catalog.createDatabase(this.database, true);
        validateCaseSensitivity();
        beforeBuildingSourceSink();
        buildSink(buildDataStreamSource(buildSource()).flatMap(recordParse()).name("Parse"), buildEventParserFactory());
    }

    protected abstract void validateCaseSensitivity();

    protected void beforeBuildingSourceSink() throws Exception {
    }

    protected Object buildSource() {
        return this.syncJobHandler.provideSource();
    }

    protected CdcTimestampExtractor createCdcTimestampExtractor() {
        throw new IllegalArgumentException("Unsupported timestamp extractor for current cdc source.");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void validateRuntimeExecutionMode() {
        Preconditions.checkArgument(this.env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING, "It's only support STREAMING mode for flink-cdc sync table action.");
    }

    private DataStreamSource<CdcSourceRecord> buildDataStreamSource(Object obj) {
        if (!(obj instanceof Source)) {
            if (obj instanceof SourceFunction) {
                return this.env.addSource((SourceFunction) obj, this.syncJobHandler.provideSourceName());
            }
            throw new UnsupportedOperationException("Unrecognized source type");
        }
        boolean z = this.tableConfig.containsKey(CoreOptions.TAG_AUTOMATIC_CREATION.key()) && Objects.equals(this.tableConfig.get(CoreOptions.TAG_AUTOMATIC_CREATION.key()), CoreOptions.TagCreationMode.WATERMARK.toString());
        Options fromMap = Options.fromMap(this.tableConfig);
        Duration duration = (Duration) fromMap.get(FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT);
        String str = (String) fromMap.get(FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_GROUP);
        WatermarkStrategy withWatermarkAlignment = z ? str != null ? new CdcWatermarkStrategy(createCdcTimestampExtractor()).withWatermarkAlignment(str, (Duration) fromMap.get(FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT), (Duration) fromMap.get(FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL)) : new CdcWatermarkStrategy(createCdcTimestampExtractor()) : WatermarkStrategy.noWatermarks();
        if (duration != null) {
            withWatermarkAlignment = withWatermarkAlignment.withIdleness(duration);
        }
        return this.env.fromSource((Source) obj, withWatermarkAlignment, this.syncJobHandler.provideSourceName());
    }

    protected abstract FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> recordParse();

    protected abstract EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory();

    protected abstract void buildSink(DataStream<RichCdcMultiplexRecord> dataStream, EventParser.Factory<RichCdcMultiplexRecord> factory);

    /* JADX INFO: Access modifiers changed from: protected */
    public FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable fileStoreTable) {
        HashMap hashMap = new HashMap(this.tableConfig);
        hashMap.remove(CoreOptions.BUCKET.key());
        Map<String, String> options = fileStoreTable.options();
        Set<String> set = CoreOptions.IMMUTABLE_OPTIONS;
        hashMap.entrySet().removeIf(entry -> {
            return set.contains(entry.getKey()) || Objects.equals(options.get(entry.getKey()), entry.getValue());
        });
        if (hashMap.isEmpty()) {
            return fileStoreTable;
        }
        try {
            this.catalog.alterTable(identifier, (List<SchemaChange>) hashMap.entrySet().stream().map(entry2 -> {
                return SchemaChange.setOption((String) entry2.getKey(), (String) entry2.getValue());
            }).collect(Collectors.toList()), false);
            return fileStoreTable.copy((Map<String, String>) hashMap);
        } catch (Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException | Catalog.TableNotExistException e) {
            throw new RuntimeException("This is unexpected.", e);
        }
    }

    @Override // org.apache.paimon.flink.action.Action
    public void run() throws Exception {
        build();
        if (!this.env.getCheckpointConfig().isCheckpointingEnabled()) {
            this.env.enableCheckpointing(DEFAULT_CHECKPOINT_INTERVAL);
        }
        execute(this.syncJobHandler.provideDefaultJobName());
    }
}
