package org.apache.paimon.flink.action.cdc;

import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils;
import org.apache.paimon.flink.action.cdc.mongodb.MongoDBRecordParser;
import org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils;
import org.apache.paimon.flink.action.cdc.mysql.MySqlRecordParser;
import org.apache.paimon.flink.action.cdc.postgres.PostgresActionUtils;
import org.apache.paimon.flink.action.cdc.postgres.PostgresRecordParser;
import org.apache.paimon.flink.action.cdc.pulsar.PulsarActionUtils;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.utils.Preconditions;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/SyncJobHandler.class */
public class SyncJobHandler {
    private final SourceType sourceType;
    private final Configuration cdcSourceConfig;
    private final boolean isTableSync = false;
    private final String sinkLocation;

    /* loaded from: input_file:org/apache/paimon/flink/action/cdc/SyncJobHandler$SourceType.class */
    public enum SourceType {
        MYSQL("MySQL Source", "MySQL-Paimon %s Sync: %s"),
        KAFKA("Kafka Source", "Kafka-Paimon %s Sync: %s"),
        MONGODB("MongoDB Source", "MongoDB-Paimon %s Sync: %s"),
        PULSAR("Pulsar Source", "Pulsar-Paimon %s Sync: %s"),
        POSTGRES("Postgres Source", "Postgres-Paimon %s Sync: %s");

        private final String sourceName;
        private final String defaultJobNameFormat;

        SourceType(String str, String str2) {
            this.sourceName = str;
            this.defaultJobNameFormat = str2;
        }
    }

    public SyncJobHandler(SourceType sourceType, Map<String, String> map, String str) {
        this.sourceType = sourceType;
        this.cdcSourceConfig = Configuration.fromMap(map);
        this.sinkLocation = str;
    }

    public SyncJobHandler(SourceType sourceType, Map<String, String> map, String str, String str2) {
        this.sourceType = sourceType;
        this.cdcSourceConfig = Configuration.fromMap(map);
        this.sinkLocation = str + "." + str2;
    }

    public String provideSourceName() {
        return this.sourceType.sourceName;
    }

    public String provideDefaultJobName() {
        String str = this.sourceType.defaultJobNameFormat;
        Object[] objArr = new Object[2];
        objArr[0] = this.isTableSync ? "Table" : "Database";
        objArr[1] = this.sinkLocation;
        return String.format(str, objArr);
    }

    public void registerJdbcDriver() {
        if (this.sourceType == SourceType.MYSQL) {
            MySqlActionUtils.registerJdbcDriver();
        } else if (this.sourceType == SourceType.POSTGRES) {
            PostgresActionUtils.registerJdbcDriver();
        }
    }

    public void checkRequiredOption() {
        switch (this.sourceType) {
            case MYSQL:
                CdcActionCommonUtils.checkRequiredOptions(this.cdcSourceConfig, CdcActionCommonUtils.MYSQL_CONF, MySqlSourceOptions.HOSTNAME, MySqlSourceOptions.USERNAME, MySqlSourceOptions.PASSWORD, MySqlSourceOptions.DATABASE_NAME);
                if (this.isTableSync) {
                    CdcActionCommonUtils.checkRequiredOptions(this.cdcSourceConfig, CdcActionCommonUtils.MYSQL_CONF, MySqlSourceOptions.TABLE_NAME);
                    return;
                } else {
                    Preconditions.checkArgument(!this.cdcSourceConfig.contains(MySqlSourceOptions.TABLE_NAME), MySqlSourceOptions.TABLE_NAME.key() + " cannot be set for mysql_sync_database. If you want to sync several MySQL tables into one Paimon table, use mysql_sync_table instead.");
                    return;
                }
            case POSTGRES:
                CdcActionCommonUtils.checkRequiredOptions(this.cdcSourceConfig, CdcActionCommonUtils.POSTGRES_CONF, PostgresSourceOptions.HOSTNAME, PostgresSourceOptions.USERNAME, PostgresSourceOptions.PASSWORD, PostgresSourceOptions.DATABASE_NAME, PostgresSourceOptions.SCHEMA_NAME, PostgresSourceOptions.SLOT_NAME);
                if (this.isTableSync) {
                    CdcActionCommonUtils.checkRequiredOptions(this.cdcSourceConfig, CdcActionCommonUtils.POSTGRES_CONF, PostgresSourceOptions.TABLE_NAME);
                    return;
                } else {
                    Preconditions.checkArgument(!this.cdcSourceConfig.contains(PostgresSourceOptions.TABLE_NAME), PostgresSourceOptions.TABLE_NAME.key() + " cannot be set for postgres_sync_database. If you want to sync several PostgreSQL tables into one Paimon table, use postgres_sync_table instead.");
                    return;
                }
            case KAFKA:
                CdcActionCommonUtils.checkRequiredOptions(this.cdcSourceConfig, CdcActionCommonUtils.KAFKA_CONF, KafkaConnectorOptions.VALUE_FORMAT, KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS);
                CdcActionCommonUtils.checkOneRequiredOption(this.cdcSourceConfig, CdcActionCommonUtils.KAFKA_CONF, KafkaConnectorOptions.TOPIC, KafkaConnectorOptions.TOPIC_PATTERN);
                return;
            case PULSAR:
                CdcActionCommonUtils.checkRequiredOptions(this.cdcSourceConfig, CdcActionCommonUtils.PULSAR_CONF, PulsarActionUtils.VALUE_FORMAT, PulsarOptions.PULSAR_SERVICE_URL, PulsarOptions.PULSAR_ADMIN_URL, PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME);
                CdcActionCommonUtils.checkOneRequiredOption(this.cdcSourceConfig, CdcActionCommonUtils.PULSAR_CONF, PulsarActionUtils.TOPIC, PulsarActionUtils.TOPIC_PATTERN);
                return;
            case MONGODB:
                CdcActionCommonUtils.checkRequiredOptions(this.cdcSourceConfig, CdcActionCommonUtils.MONGODB_CONF, MongoDBSourceOptions.HOSTS, MongoDBSourceOptions.DATABASE);
                if (this.isTableSync) {
                    CdcActionCommonUtils.checkRequiredOptions(this.cdcSourceConfig, CdcActionCommonUtils.MONGODB_CONF, MongoDBSourceOptions.COLLECTION);
                    return;
                }
                return;
            default:
                throw new UnsupportedOperationException("Unknown source type " + this.sourceType);
        }
    }

    public Source<CdcSourceRecord, ?, ?> provideSource() {
        switch (this.sourceType) {
            case KAFKA:
                return KafkaActionUtils.buildKafkaSource(this.cdcSourceConfig, provideDataFormat().createKafkaDeserializer(this.cdcSourceConfig));
            case PULSAR:
                return PulsarActionUtils.buildPulsarSource(this.cdcSourceConfig, provideDataFormat().createPulsarDeserializer(this.cdcSourceConfig));
            default:
                throw new UnsupportedOperationException("Cannot get source from source type" + this.sourceType);
        }
    }

    public FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> provideRecordParser(List<ComputedColumn> list, TypeMapping typeMapping, CdcMetadataConverter[] cdcMetadataConverterArr) {
        switch (this.sourceType) {
            case MYSQL:
                return new MySqlRecordParser(this.cdcSourceConfig, list, typeMapping, cdcMetadataConverterArr);
            case POSTGRES:
                return new PostgresRecordParser(this.cdcSourceConfig, list, typeMapping, cdcMetadataConverterArr);
            case KAFKA:
            case PULSAR:
                return provideDataFormat().createParser(typeMapping, list);
            case MONGODB:
                return new MongoDBRecordParser(list, this.cdcSourceConfig);
            default:
                throw new UnsupportedOperationException("Unknown source type " + this.sourceType);
        }
    }

    public DataFormat provideDataFormat() {
        switch (this.sourceType) {
            case KAFKA:
                return KafkaActionUtils.getDataFormat(this.cdcSourceConfig);
            case PULSAR:
                return PulsarActionUtils.getDataFormat(this.cdcSourceConfig);
            default:
                throw new UnsupportedOperationException("Cannot get DataFormat from source type" + this.sourceType);
        }
    }

    public MessageQueueSchemaUtils.ConsumerWrapper provideConsumer() {
        switch (this.sourceType) {
            case KAFKA:
                return KafkaActionUtils.getKafkaEarliestConsumer(this.cdcSourceConfig, provideDataFormat().createKafkaDeserializer(this.cdcSourceConfig));
            case PULSAR:
                return PulsarActionUtils.createPulsarConsumer(this.cdcSourceConfig, provideDataFormat().createPulsarDeserializer(this.cdcSourceConfig));
            default:
                throw new UnsupportedOperationException("Cannot get consumer from source type" + this.sourceType);
        }
    }

    public CdcMetadataConverter provideMetadataConverter(String str) {
        return CdcMetadataProcessor.converter(this.sourceType, str);
    }
}
