package org.apache.paimon.flink.action.cdc.pulsar;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.flink.connector.pulsar.source.PulsarSource;
import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.flink.action.cdc.format.DataFormatFactory;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.lookup.GetTopicsResult;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.class */
public class PulsarActionUtils {
    public static final ConfigOption<String> VALUE_FORMAT = ConfigOptions.key("value.format").stringType().noDefaultValue().withDescription("Defines the format identifier for encoding value data.");
    public static final ConfigOption<List<String>> TOPIC = ConfigOptions.key("topic").stringType().asList().noDefaultValue().withDescription("Topic name(s) from which the data is read. It also supports topic list by separating topic by semicolon like 'topic-1;topic-2'. Note, only one of \"topic-pattern\" and \"topic\" can be specified.");
    public static final ConfigOption<String> TOPIC_PATTERN = ConfigOptions.key("topic-pattern").stringType().noDefaultValue().withDescription("The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of \"topic-pattern\" and \"topic\" can be specified.");
    static final ConfigOption<String> PULSAR_START_CURSOR_FROM_MESSAGE_ID = ConfigOptions.key("pulsar.startCursor.fromMessageId").stringType().defaultValue(SnapshotManager.EARLIEST).withDescription("Using a unique identifier of a single message to seek the start position. The common format is a triple '<long>ledgerId,<long>entryId,<int>partitionIndex'. Specially, you can set it to EARLIEST (-1, -1, -1) or LATEST (Long.MAX_VALUE, Long.MAX_VALUE, -1).");
    static final ConfigOption<Long> PULSAR_START_CURSOR_FORM_PUBLISH_TIME = ConfigOptions.key("pulsar.startCursor.fromPublishTime").longType().noDefaultValue().withDescription("Using the message publish time to seek the start position.");
    static final ConfigOption<Boolean> PULSAR_START_CURSOR_FROM_MESSAGE_ID_INCLUSIVE = ConfigOptions.key("pulsar.startCursor.fromMessageIdInclusive").booleanType().defaultValue(true).withDescription("Whether to include the given message id. This option only works when the message id is not EARLIEST or LATEST.");
    static final ConfigOption<String> PULSAR_STOP_CURSOR_AT_MESSAGE_ID = ConfigOptions.key("pulsar.stopCursor.atMessageId").stringType().noDefaultValue().withDescription("Stop consuming when the message id is equal or greater than the specified message id. Message that is equal to the specified message id will not be consumed. The common format is a triple '<long>ledgerId,<long>entryId,<int>partitionIndex'. Specially, you can set it to LATEST (Long.MAX_VALUE, Long.MAX_VALUE, -1).");
    static final ConfigOption<String> PULSAR_STOP_CURSOR_AFTER_MESSAGE_ID = ConfigOptions.key("pulsar.stopCursor.afterMessageId").stringType().noDefaultValue().withDescription("Stop consuming when the message id is greater than the specified message id. Message that is equal to the specified message id will be consumed. The common format is a triple '<long>ledgerId,<long>entryId,<int>partitionIndex'. Specially, you can set it to LATEST (Long.MAX_VALUE, Long.MAX_VALUE, -1).");
    static final ConfigOption<Long> PULSAR_STOP_CURSOR_AT_EVENT_TIME = ConfigOptions.key("pulsar.stopCursor.atEventTime").longType().noDefaultValue().withDescription("Stop consuming when message event time is greater than or equals the specified timestamp. Message that even time is equal to the specified timestamp will not be consumed.");
    static final ConfigOption<Long> PULSAR_STOP_CURSOR_AFTER_EVENT_TIME = ConfigOptions.key("pulsar.stopCursor.afterEventTime").longType().noDefaultValue().withDescription("Stop consuming when message event time is greater than the specified timestamp. Message that even time is equal to the specified timestamp will be consumed.");
    static final ConfigOption<Boolean> PULSAR_SOURCE_UNBOUNDED = ConfigOptions.key("pulsar.source.unbounded").booleanType().defaultValue(true).withDescription("To specify the boundedness of a stream.");

    /* loaded from: input_file:org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils$PulsarConsumerWrapper.class */
    private static class PulsarConsumerWrapper implements MessageQueueSchemaUtils.ConsumerWrapper {
        private final Consumer<byte[]> consumer;
        private final String topic;
        private final DeserializationSchema<CdcSourceRecord> deserializationSchema;

        PulsarConsumerWrapper(Consumer<byte[]> consumer, String str, DeserializationSchema<CdcSourceRecord> deserializationSchema) {
            this.consumer = consumer;
            this.topic = str;
            this.deserializationSchema = deserializationSchema;
        }

        @Override // org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils.ConsumerWrapper
        public List<CdcSourceRecord> getRecords(int i) {
            try {
                Message receive = this.consumer.receive(i, TimeUnit.MILLISECONDS);
                return receive == null ? Collections.emptyList() : Collections.singletonList(this.deserializationSchema.deserialize((byte[]) receive.getValue()));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        @Override // org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils.ConsumerWrapper
        public String topic() {
            return this.topic;
        }

        @Override // java.lang.AutoCloseable
        public void close() throws PulsarClientException {
            this.consumer.close();
        }
    }

    public static PulsarSource<CdcSourceRecord> buildPulsarSource(Configuration configuration, DeserializationSchema<CdcSourceRecord> deserializationSchema) {
        PulsarSourceBuilder builder = PulsarSource.builder();
        builder.setServiceUrl((String) configuration.get(PulsarOptions.PULSAR_SERVICE_URL)).setAdminUrl((String) configuration.get(PulsarOptions.PULSAR_ADMIN_URL)).setSubscriptionName((String) configuration.get(PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME)).setDeserializationSchema(deserializationSchema);
        Optional optional = configuration.getOptional(TOPIC);
        builder.getClass();
        optional.ifPresent(builder::setTopics);
        Optional optional2 = configuration.getOptional(TOPIC_PATTERN);
        builder.getClass();
        optional2.ifPresent(builder::setTopicPattern);
        Optional optional3 = configuration.getOptional(PulsarSourceOptions.PULSAR_CONSUMER_NAME);
        builder.getClass();
        optional3.ifPresent(builder::setConsumerName);
        if (configuration.contains(PULSAR_START_CURSOR_FORM_PUBLISH_TIME)) {
            Preconditions.checkArgument(!configuration.contains(PULSAR_START_CURSOR_FROM_MESSAGE_ID), "");
            builder.setStartCursor(StartCursor.fromPublishTime(((Long) configuration.get(PULSAR_START_CURSOR_FORM_PUBLISH_TIME)).longValue()));
        } else {
            String str = (String) configuration.get(PULSAR_START_CURSOR_FROM_MESSAGE_ID);
            if (str.equalsIgnoreCase(SnapshotManager.EARLIEST)) {
                builder.setStartCursor(StartCursor.earliest());
            } else if (str.equalsIgnoreCase(SnapshotManager.LATEST)) {
                builder.setStartCursor(StartCursor.latest());
            } else {
                builder.setStartCursor(StartCursor.fromMessageId(toMessageId(str), ((Boolean) configuration.get(PULSAR_START_CURSOR_FROM_MESSAGE_ID_INCLUSIVE)).booleanValue()));
            }
        }
        StopCursor never = StopCursor.never();
        int i = 0;
        if (configuration.contains(PULSAR_STOP_CURSOR_AT_MESSAGE_ID)) {
            never = StopCursor.atMessageId(toMessageId((String) configuration.get(PULSAR_STOP_CURSOR_AT_MESSAGE_ID)));
            i = 0 + 1;
        }
        if (configuration.contains(PULSAR_STOP_CURSOR_AFTER_MESSAGE_ID)) {
            never = StopCursor.afterMessageId(toMessageId((String) configuration.get(PULSAR_STOP_CURSOR_AT_MESSAGE_ID)));
            i++;
        }
        if (configuration.contains(PULSAR_STOP_CURSOR_AT_EVENT_TIME)) {
            never = StopCursor.atEventTime(((Long) configuration.get(PULSAR_STOP_CURSOR_AT_EVENT_TIME)).longValue());
            i++;
        }
        if (configuration.contains(PULSAR_STOP_CURSOR_AFTER_EVENT_TIME)) {
            never = StopCursor.atEventTime(((Long) configuration.get(PULSAR_STOP_CURSOR_AFTER_EVENT_TIME)).longValue());
            i++;
        }
        Preconditions.checkArgument(i <= 1, "You can set at most one of the stop cursor options.");
        if (((Boolean) configuration.get(PULSAR_SOURCE_UNBOUNDED)).booleanValue()) {
            builder.setUnboundedStopCursor(never);
        } else {
            builder.setBoundedStopCursor(never);
        }
        String str2 = (String) configuration.get(PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME);
        if (str2 != null) {
            String str3 = (String) configuration.get(PulsarOptions.PULSAR_AUTH_PARAMS);
            Map map = (Map) configuration.get(PulsarOptions.PULSAR_AUTH_PARAM_MAP);
            Preconditions.checkArgument((str3 == null && map == null) ? false : true, "You should set '%s' or '%s'", PulsarOptions.PULSAR_AUTH_PARAMS.key(), PulsarOptions.PULSAR_AUTH_PARAM_MAP.key());
            Preconditions.checkArgument(str3 == null || map == null, "You can only set one of '%s' and '%s'", PulsarOptions.PULSAR_AUTH_PARAMS.key(), PulsarOptions.PULSAR_AUTH_PARAM_MAP.key());
            if (str3 != null) {
                builder.setAuthentication(str2, str3);
            } else {
                builder.setAuthentication(str2, map);
            }
        }
        builder.setConfig(configuration);
        return builder.build();
    }

    private static MessageId toMessageId(String str) {
        if (str.equalsIgnoreCase(SnapshotManager.EARLIEST)) {
            return MessageId.earliest;
        }
        if (str.equalsIgnoreCase(SnapshotManager.LATEST)) {
            return MessageId.latest;
        }
        String[] split = str.split(",");
        Preconditions.checkArgument(split.length == 3, "Please use format '<long>ledgerId,<long>entryId,<int>partitionIndex' for message id");
        return DefaultImplementation.getDefaultImplementation().newMessageId(Long.parseLong(split[0].trim()), Long.parseLong(split[1].trim()), Integer.parseInt(split[2].trim()));
    }

    public static DataFormat getDataFormat(Configuration configuration) {
        return DataFormatFactory.createDataFormat((String) configuration.get(VALUE_FORMAT));
    }

    public static MessageQueueSchemaUtils.ConsumerWrapper createPulsarConsumer(Configuration configuration, DeserializationSchema<CdcSourceRecord> deserializationSchema) {
        try {
            SourceConfiguration sourceConfiguration = new SourceConfiguration(configuration);
            PulsarClient createClient = PulsarClientFactory.createClient(sourceConfiguration);
            ConsumerBuilder createConsumerBuilder = PulsarSourceConfigUtils.createConsumerBuilder(createClient, Schema.BYTES, sourceConfiguration);
            createConsumerBuilder.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
            String findOneTopic = findOneTopic(configuration, () -> {
                return createClient;
            });
            TopicPartition topicPartition = new TopicPartition(findOneTopic);
            createConsumerBuilder.topic(new String[]{topicPartition.getFullTopicName()});
            if (!TopicRangeUtils.isFullTopicRanges(topicPartition.getRanges())) {
                KeySharedPolicy.KeySharedPolicySticky ranges = KeySharedPolicy.stickyHashRange().ranges(topicPartition.getPulsarRanges());
                ranges.setAllowOutOfOrderDelivery(sourceConfiguration.isAllowKeySharedOutOfOrderDelivery());
                createConsumerBuilder.keySharedPolicy(ranges);
            }
            return new PulsarConsumerWrapper(createConsumerBuilder.subscribe(), findOneTopic, deserializationSchema);
        } catch (PulsarClientException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public static String findOneTopic(Configuration configuration) {
        return findOneTopic(configuration, () -> {
            try {
                return PulsarClientFactory.createClient(new SourceConfiguration(configuration));
            } catch (PulsarClientException e) {
                throw new RuntimeException((Throwable) e);
            }
        });
    }

    private static String findOneTopic(Configuration configuration, Supplier<PulsarClient> supplier) {
        if (configuration.contains(TOPIC)) {
            return (String) ((List) configuration.get(TOPIC)).get(0);
        }
        TopicName topicName = TopicName.get((String) configuration.get(TOPIC_PATTERN));
        String topicName2 = topicName.toString();
        Pattern compile = Pattern.compile(topicName2.split("://")[1]);
        String namespaceName = topicName.getNamespaceObject().toString();
        LookupService lookup = supplier.get().getLookup();
        NamespaceName namespaceName2 = NamespaceName.get(namespaceName);
        try {
            String pattern = compile.toString();
            if (!pattern.endsWith(".*")) {
                pattern = null;
            }
            List topics = ((GetTopicsResult) lookup.getTopicsUnderNamespace(namespaceName2, CommandGetTopicsOfNamespace.Mode.ALL, pattern, (String) null).get()).getTopics();
            if (topics == null || topics.isEmpty()) {
                throw new RuntimeException("Cannot find topics match the topic-pattern " + topicName2);
            }
            return (String) topics.get(0);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}
