/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cep;

import com.ververica.config.AppConfig;
import com.ververica.models.Alert;
import com.ververica.models.AlertType;
import com.ververica.models.ClickEvent;
import com.ververica.utils.StreamingUtils;
import java.io.Serializable;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

public class PurchaseIntentScoringCEP {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        MultipleParameterTool params = MultipleParameterTool.fromArgs((String[])args);
        Properties properties = AppConfig.buildArgsProps(new Properties(), params);
        KafkaSource<ClickEvent> clickstreamKafkaSource = StreamingUtils.createClickEventConsumer(properties);
        WatermarkStrategy watermarkStrategy = WatermarkStrategy.forBoundedOutOfOrderness((Duration)Duration.ofSeconds(10L)).withTimestampAssigner((SerializableTimestampAssigner & Serializable)(event, timestamp) -> event.getEventTime());
        SingleOutputStreamOperator events = environment.fromSource(clickstreamKafkaSource, watermarkStrategy, "Clickstream Events Source").name("ClickstreamEventsSource");
        Pattern purchaseIntentPattern = Pattern.begin((String)"initial_view").where((IterativeCondition)new SimpleCondition<ClickEvent>(){

            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("view");
            }
        }).followedBy("repeat_view").where((IterativeCondition)new SimpleCondition<ClickEvent>(){

            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("view");
            }
        }).times(3).within(Time.minutes((long)15L));
        PatternStream purchaseIntentStream = CEP.pattern((DataStream)events.keyBy(ClickEvent::getUserSession), (Pattern)purchaseIntentPattern);
        SingleOutputStreamOperator purchaseIntentAlerts = purchaseIntentStream.select((PatternSelectFunction)new PatternSelectFunction<ClickEvent, Alert>(){

            public Alert select(Map<String, List<ClickEvent>> pattern) {
                ClickEvent initialView = pattern.get("initial_view").get(0);
                String message = "High purchase intent detected for user " + initialView.getUserId() + " on product " + initialView.getProductId();
                return new Alert(initialView.getUserSession(), initialView.getUserId(), AlertType.PRICE_SENSITIVITY, message);
            }
        }).name("PurchaseIntentAlertsPattern").uid("PurchaseIntentAlertsPattern");
        KafkaSink<Alert> kafkaAlertSink = StreamingUtils.createKafkaAlertSink(properties);
        purchaseIntentAlerts.sinkTo(kafkaAlertSink);
        environment.execute("E-commerce CEP Patterns Alerting");
    }
}

