/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cep;

import com.ververica.config.AppConfig;
import com.ververica.models.Alert;
import com.ververica.models.AlertType;
import com.ververica.models.ClickEvent;
import com.ververica.utils.StreamingUtils;
import java.io.Serializable;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

public class CrossSellOpCEP {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        MultipleParameterTool params = MultipleParameterTool.fromArgs((String[])args);
        Properties properties = AppConfig.buildArgsProps(new Properties(), params);
        KafkaSource<ClickEvent> clickstreamKafkaSource = StreamingUtils.createClickEventConsumer(properties);
        WatermarkStrategy watermarkStrategy = WatermarkStrategy.forBoundedOutOfOrderness((Duration)Duration.ofSeconds(10L)).withTimestampAssigner((SerializableTimestampAssigner & Serializable)(event, timestamp) -> event.getEventTime());
        SingleOutputStreamOperator events = environment.fromSource(clickstreamKafkaSource, watermarkStrategy, "Clickstream Events Source").name("ClickstreamEventsSource");
        Pattern crossSellPattern = Pattern.begin((String)"first_view").where((IterativeCondition)new SimpleCondition<ClickEvent>(){

            public boolean filter(ClickEvent event) {
                System.out.println("Filtering event: " + event);
                return event.getEventType().equals("view");
            }
        }).next("second_view").where((IterativeCondition)new SimpleCondition<ClickEvent>(){

            public boolean filter(ClickEvent event) {
                System.out.println("Filtering event: " + event);
                return event.getEventType().equals("view");
            }
        }).within(Time.minutes((long)5L));
        PatternStream crossSellPatternStream = CEP.pattern((DataStream)events.keyBy(ClickEvent::getUserSession).assignTimestampsAndWatermarks(watermarkStrategy), (Pattern)crossSellPattern);
        SingleOutputStreamOperator crossSellAlerts = crossSellPatternStream.select((PatternSelectFunction)new PatternSelectFunction<ClickEvent, Alert>(){

            public Alert select(Map<String, List<ClickEvent>> pattern) {
                ClickEvent firstView = pattern.get("first_view").get(0);
                ClickEvent secondView = pattern.get("second_view").get(0);
                if (!firstView.getCategoryCode().equals(secondView.getCategoryCode())) {
                    String message = "Cross-sell opportunity detected for user " + firstView.getUserId() + ": viewed products in categories " + firstView.getCategoryCode() + " and " + secondView.getCategoryCode();
                    return new Alert(secondView.getUserSession(), secondView.getUserId(), AlertType.CROSS_UPSELL, message);
                }
                return null;
            }
        }).filter(Objects::nonNull).name("CrossSellAlertsPattern").uid("CrossSellAlertsPattern");
        KafkaSink<Alert> kafkaAlertSink = StreamingUtils.createKafkaAlertSink(properties);
        crossSellAlerts.sinkTo(kafkaAlertSink);
        environment.execute("E-commerce CEP Patterns Alerting");
    }
}

