/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cep;

import com.ververica.config.AppConfig;
import com.ververica.models.Alert;
import com.ververica.models.AlertType;
import com.ververica.models.ClickEvent;
import com.ververica.utils.StreamingUtils;
import java.io.Serializable;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.types.Either;

public class EcommerceCEPRunner {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        MultipleParameterTool params = MultipleParameterTool.fromArgs((String[])args);
        if (!params.has("bootstrap.servers")) {
            throw new IllegalArgumentException("\u8bf7\u63d0\u4f9b Kafka bootstrap.servers \u53c2\u6570\uff0c\u4f8b\u5982\uff1a--bootstrap.servers localhost:9092");
        }
        Properties properties = AppConfig.buildArgsProps(new Properties(), params);
        KafkaSource<ClickEvent> clickstreamKafkaSource = StreamingUtils.createClickEventConsumer(properties);
        WatermarkStrategy watermarkStrategy = WatermarkStrategy.forBoundedOutOfOrderness((Duration)Duration.ofSeconds(10L)).withTimestampAssigner((SerializableTimestampAssigner & Serializable)(event, timestamp) -> event.getEventTime());
        SingleOutputStreamOperator events = environment.fromSource(clickstreamKafkaSource, watermarkStrategy, "Clickstream Events Source").name("ClickstreamEventsSource").uid("ClickstreamEventsSource");
        events.print();
        Pattern crossSellPattern = Pattern.begin((String)"first_view").where((IterativeCondition)new SimpleCondition<ClickEvent>(){

            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("view");
            }
        }).next("second_view").where((IterativeCondition)new SimpleCondition<ClickEvent>(){

            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("view");
            }
        }).within(Time.minutes((long)5L));
        PatternStream crossSellPatternStream = CEP.pattern((DataStream)events.keyBy(ClickEvent::getUserSession).assignTimestampsAndWatermarks(watermarkStrategy), (Pattern)crossSellPattern);
        SingleOutputStreamOperator crossSellAlerts = crossSellPatternStream.select((PatternSelectFunction)new PatternSelectFunction<ClickEvent, Alert>(){

            public Alert select(Map<String, List<ClickEvent>> pattern) {
                ClickEvent firstView = pattern.get("first_view").get(0);
                ClickEvent secondView = pattern.get("second_view").get(0);
                if (!firstView.getCategoryCode().equals(secondView.getCategoryCode())) {
                    String message = "Cross-sell opportunity detected for user " + firstView.getUserId() + ": viewed products in categories " + firstView.getCategoryCode() + " and " + secondView.getCategoryCode();
                    return new Alert(secondView.getUserSession(), secondView.getUserId(), AlertType.CROSS_UPSELL, message);
                }
                return null;
            }
        }).filter(Objects::nonNull).name("CrossSellAlertsPattern").uid("CrossSellAlertsPattern");
        Pattern abandonmentPattern = Pattern.begin((String)"cart_add").where((IterativeCondition)new SimpleCondition<ClickEvent>(){

            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("cart") && event.getPrice() > 200.0;
            }
        }).followedBy("purchase").where((IterativeCondition)new SimpleCondition<ClickEvent>(){

            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("purchase");
            }
        }).within(Time.minutes((long)10L));
        PatternStream abandonmentPatternStream = CEP.pattern((DataStream)events.keyBy(ClickEvent::getUserSession).assignTimestampsAndWatermarks(watermarkStrategy), (Pattern)abandonmentPattern);
        SingleOutputStreamOperator abandonedmentAlert = abandonmentPatternStream.select((PatternTimeoutFunction)new PatternTimeoutFunction<ClickEvent, Alert>(){

            public Alert timeout(Map<String, List<ClickEvent>> pattern, long timeoutTimestamp) {
                ClickEvent cartAdd = pattern.get("cart_add").get(0);
                String message = "High-value cart abandonment detected for user '" + cartAdd.getUserId() + "' priced at " + cartAdd.getPrice() + ". No purchase within 30 minutes.";
                return new Alert(cartAdd.getUserSession(), cartAdd.getUserId(), AlertType.CART_ABANDONMENT, message);
            }
        }, (PatternSelectFunction)new PatternSelectFunction<ClickEvent, Alert>(){

            public Alert select(Map<String, List<ClickEvent>> pattern) {
                ClickEvent cartAdd = pattern.get("cart_add").get(0);
                ClickEvent purchase = pattern.get("purchase").get(0);
                String message = "Purchase completed for user " + purchase.getUserId() + " on product " + purchase.getProductId() + " priced at " + purchase.getPrice();
                return new Alert(cartAdd.getUserSession(), cartAdd.getUserId(), AlertType.PURCHASE_COMPLETION, message);
            }
        }).map((MapFunction)new MapFunction<Either<Alert, Alert>, Alert>(){

            public Alert map(Either<Alert, Alert> alert) throws Exception {
                if (alert.isLeft()) {
                    return (Alert)alert.left();
                }
                return (Alert)alert.right();
            }
        }).name("AbandonmentAlertsPattern").uid("AbandonmentAlertsPattern");
        Pattern purchaseIntentPattern = Pattern.begin((String)"initial_view").where((IterativeCondition)new SimpleCondition<ClickEvent>(){

            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("view");
            }
        }).followedBy("repeat_view").where((IterativeCondition)new SimpleCondition<ClickEvent>(){

            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("view");
            }
        }).times(3).within(Time.minutes((long)15L));
        PatternStream purchaseIntentStream = CEP.pattern((DataStream)events.keyBy(ClickEvent::getUserSession).assignTimestampsAndWatermarks(watermarkStrategy), (Pattern)purchaseIntentPattern);
        SingleOutputStreamOperator purchaseIntentAlerts = purchaseIntentStream.select((PatternSelectFunction)new PatternSelectFunction<ClickEvent, Alert>(){

            public Alert select(Map<String, List<ClickEvent>> pattern) {
                ClickEvent initialView = pattern.get("initial_view").get(0);
                String message = "High purchase intent detected for user " + initialView.getUserId() + " on product " + initialView.getProductId();
                return new Alert(initialView.getUserSession(), initialView.getUserId(), AlertType.PRICE_SENSITIVITY, message);
            }
        }).name("PurchaseIntentAlertsPattern").uid("PurchaseIntentAlertsPattern");
        Pattern priceSensitivityPattern = Pattern.begin((String)"initial_view").where((IterativeCondition)new SimpleCondition<ClickEvent>(){

            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("view");
            }
        }).next("view_price_drop").where((IterativeCondition)new IterativeCondition<ClickEvent>(){

            public boolean filter(ClickEvent event, IterativeCondition.Context<ClickEvent> ctx) throws Exception {
                ClickEvent initialView = (ClickEvent)ctx.getEventsForPattern("initial_view").iterator().next();
                return event.getEventType().equals("view") && event.getProductId().equals(initialView.getProductId()) && event.getPrice() < initialView.getPrice();
            }
        }).next("cart_after_price_drop").where((IterativeCondition)new SimpleCondition<ClickEvent>(){

            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("cart");
            }
        }).within(Time.minutes((long)10L));
        PatternStream priceSensitivityStream = CEP.pattern((DataStream)events.keyBy(ClickEvent::getUserId).assignTimestampsAndWatermarks(watermarkStrategy), (Pattern)priceSensitivityPattern);
        SingleOutputStreamOperator priceSensitivityAlerts = priceSensitivityStream.select((PatternSelectFunction)new PatternSelectFunction<ClickEvent, Alert>(){

            public Alert select(Map<String, List<ClickEvent>> pattern) {
                ClickEvent initialView = pattern.get("initial_view").get(0);
                String message = "Price-sensitive customer detected for user " + initialView.getUserId() + " on product " + initialView.getProductId() + " after a price drop.";
                return new Alert(initialView.getUserSession(), initialView.getUserId(), AlertType.PRICE_SENSITIVITY, message);
            }
        }).name("PriceSensitivityAlertsPattern").uid("PriceSensitivityAlertsPattern");
        Pattern churnPredictionPattern = Pattern.begin((String)"first_view").where((IterativeCondition)new SimpleCondition<ClickEvent>(){

            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("view");
            }
        }).timesOrMore(10).notNext("purchase").where((IterativeCondition)new SimpleCondition<ClickEvent>(){

            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("purchase");
            }
        }).within(Time.days((long)7L));
        PatternStream churnPredictionStream = CEP.pattern((DataStream)events.keyBy(ClickEvent::getUserId).assignTimestampsAndWatermarks(watermarkStrategy), (Pattern)churnPredictionPattern);
        SingleOutputStreamOperator churnPredictionAlerts = churnPredictionStream.select((PatternSelectFunction)new PatternSelectFunction<ClickEvent, Alert>(){

            public Alert select(Map<String, List<ClickEvent>> pattern) {
                ClickEvent firstView = pattern.get("first_view").get(0);
                String message = "Churn risk detected for user " + firstView.getUserId() + ": viewed multiple products over the week without making a purchase.";
                return new Alert(firstView.getUserSession(), firstView.getUserId(), AlertType.CHURN_RISK, message);
            }
        }).name("ChurnPredictionAlertsPattern").uid("ChurnPredictionAlertsPattern");
        DataStream alertStream = crossSellAlerts.union(new DataStream[]{abandonedmentAlert, priceSensitivityAlerts, purchaseIntentAlerts, churnPredictionAlerts});
        KafkaSink<Alert> kafkaAlertSink = StreamingUtils.createKafkaAlertSink(properties);
        alertStream.sinkTo(kafkaAlertSink);
        environment.execute("E-commerce CEP Patterns Alerting");
    }
}

