/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.utils;

import com.ververica.models.Alert;
import com.ververica.models.ClickEvent;
import com.ververica.serdes.AlertSerializer;
import com.ververica.serdes.ClickstreamSerdes;
import java.util.Properties;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingUtils {
    private static final Logger logger = LoggerFactory.getLogger(StreamingUtils.class);

    public static <K, V> void handleMessage(KafkaProducer<K, V> producer, String topic, K key, V value) {
        ProducerRecord<K, V> record = new ProducerRecord<K, V>(topic, key, value);
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                logger.error("Error while producing: ", exception);
            }
        });
    }

    public static <K, V> void closeProducer(KafkaProducer<K, V> producer) {
        producer.flush();
        producer.close();
    }

    public static KafkaSource<ClickEvent> createClickEventConsumer(Properties properties) {
        return KafkaSource.builder().setBootstrapServers(properties.getProperty("bootstrap.servers", "")).setTopics(properties.getProperty("clickstream_topic", "clickevent")).setGroupId(properties.getProperty("group", "clickstream.consumer")).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new ClickstreamSerdes()).setProperties(properties).build();
    }

    public static KafkaSink<Alert> createKafkaAlertSink(Properties properties) {
        return KafkaSink.builder().setBootstrapServers(properties.getProperty("bootstrap.servers", "")).setRecordSerializer(new AlertSerializer(properties.getProperty("alerts_topic", "alerts"))).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).setKafkaProducerConfig(properties).build();
    }
}

