/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cep;

import com.ververica.config.AppConfig;
import com.ververica.models.Alert;
import com.ververica.models.AlertType;
import com.ververica.models.ClickEvent;
import com.ververica.serdes.ClickstreamSerdes;
import com.ververica.utils.StreamingUtils;
import java.io.Serializable;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.types.Either;

public class AbandonedCartCEP {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        MultipleParameterTool params = MultipleParameterTool.fromArgs((String[])args);
        Properties properties = AppConfig.buildArgsProps(new Properties(), params);
        KafkaSource clickstreamKafkaSource = KafkaSource.builder().setBootstrapServers("").setTopics("clickevent").setGroupId("clickstream.consumer").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new ClickstreamSerdes()).setProperties(properties).build();
        WatermarkStrategy watermarkStrategy = WatermarkStrategy.forBoundedOutOfOrderness((Duration)Duration.ofSeconds(10L)).withTimestampAssigner((SerializableTimestampAssigner & Serializable)(event, timestamp) -> event.getEventTime());
        SingleOutputStreamOperator events = environment.fromSource(clickstreamKafkaSource, watermarkStrategy, "Clickstream Events Source").name("ClickstreamEventsSource");
        Pattern abandonmentPattern = Pattern.begin((String)"cart_add").where((IterativeCondition)new SimpleCondition<ClickEvent>(){

            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("cart") && event.getPrice() > 200.0;
            }
        }).followedBy("purchase").where((IterativeCondition)new SimpleCondition<ClickEvent>(){

            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("purchase");
            }
        }).within(Time.minutes((long)30L));
        PatternStream abandonmentPatternStream = CEP.pattern((DataStream)events.keyBy(ClickEvent::getUserSession), (Pattern)abandonmentPattern);
        SingleOutputStreamOperator abandonedmentAlert = abandonmentPatternStream.select((PatternTimeoutFunction)new PatternTimeoutFunction<ClickEvent, Alert>(){

            public Alert timeout(Map<String, List<ClickEvent>> pattern, long timeoutTimestamp) {
                ClickEvent cartAdd = pattern.get("cart_add").get(0);
                String message = "High-value cart abandonment detected for user '" + cartAdd.getUserId() + "' priced at " + cartAdd.getPrice() + ". No purchase within 30 minutes.";
                return new Alert(cartAdd.getUserSession(), cartAdd.getUserId(), AlertType.CART_ABANDONMENT, message);
            }
        }, (PatternSelectFunction)new PatternSelectFunction<ClickEvent, Alert>(){

            public Alert select(Map<String, List<ClickEvent>> pattern) {
                ClickEvent cartAdd = pattern.get("cart_add").get(0);
                ClickEvent purchase = pattern.get("purchase").get(0);
                String message = "Purchase completed for user " + purchase.getUserId() + " on product " + purchase.getProductId() + " priced at " + purchase.getPrice();
                return new Alert(cartAdd.getUserSession(), cartAdd.getUserId(), AlertType.PURCHASE_COMPLETION, message);
            }
        }).map((MapFunction)new MapFunction<Either<Alert, Alert>, Alert>(){

            public Alert map(Either<Alert, Alert> alert) throws Exception {
                if (alert.isLeft()) {
                    return (Alert)alert.left();
                }
                return (Alert)alert.right();
            }
        }).name("AbandonmentAlertsPattern").uid("AbandonmentAlertsPattern");
        KafkaSink<Alert> kafkaAlertSink = StreamingUtils.createKafkaAlertSink(properties);
        abandonedmentAlert.sinkTo(kafkaAlertSink);
        environment.execute("E-commerce CEP Patterns Alerting");
    }
}

