package org.apache.paimon.flink.source.operator;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import java.util.OptionalLong;
import java.util.TreeMap;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/source/operator/MonitorFunction.class */
public class MonitorFunction extends RichSourceFunction<Split> implements CheckpointedFunction, CheckpointListener {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(MonitorFunction.class);
    private final ReadBuilder readBuilder;
    private final long monitorInterval;
    private final boolean emitSnapshotWatermark;
    private volatile boolean isRunning = true;
    private transient StreamTableScan scan;
    private transient SourceFunction.SourceContext<Split> ctx;
    private transient ListState<Long> checkpointState;
    private transient ListState<Tuple2<Long, Long>> nextSnapshotState;
    private transient TreeMap<Long, Long> nextSnapshotPerCheckpoint;

    public MonitorFunction(ReadBuilder readBuilder, long j, boolean z) {
        this.readBuilder = readBuilder;
        this.monitorInterval = j;
        this.emitSnapshotWatermark = z;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        this.scan = this.readBuilder.newStreamScan();
        this.checkpointState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("next-snapshot", LongSerializer.INSTANCE));
        this.nextSnapshotState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("next-snapshot-per-checkpoint", new TupleSerializer(Tuple2.class, new TypeSerializer[]{LongSerializer.INSTANCE, LongSerializer.INSTANCE})));
        this.nextSnapshotPerCheckpoint = new TreeMap<>();
        if (!functionInitializationContext.isRestored()) {
            LOG.info("No state to restore for the {}.", getClass().getSimpleName());
            return;
        }
        LOG.info("Restoring state for the {}.", getClass().getSimpleName());
        ArrayList arrayList = new ArrayList();
        Iterator it = ((Iterable) this.checkpointState.get()).iterator();
        while (it.hasNext()) {
            arrayList.add((Long) it.next());
        }
        Preconditions.checkArgument(arrayList.size() <= 1, getClass().getSimpleName() + " retrieved invalid state.");
        if (arrayList.size() == 1) {
            this.scan.restore((Long) arrayList.get(0));
        }
        for (Tuple2 tuple2 : (Iterable) this.nextSnapshotState.get()) {
            this.nextSnapshotPerCheckpoint.put(tuple2.f0, tuple2.f1);
        }
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        this.checkpointState.clear();
        Long checkpoint = this.scan.checkpoint();
        if (checkpoint != null) {
            this.checkpointState.add(checkpoint);
            this.nextSnapshotPerCheckpoint.put(Long.valueOf(functionSnapshotContext.getCheckpointId()), checkpoint);
        }
        ArrayList arrayList = new ArrayList();
        this.nextSnapshotPerCheckpoint.forEach((l, l2) -> {
            arrayList.add(new Tuple2(l, l2));
        });
        this.nextSnapshotState.update(arrayList);
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} checkpoint {}.", getClass().getSimpleName(), checkpoint);
        }
    }

    public void run(SourceFunction.SourceContext<Split> sourceContext) throws Exception {
        boolean isEmpty;
        Long watermark;
        this.ctx = sourceContext;
        while (this.isRunning) {
            synchronized (sourceContext.getCheckpointLock()) {
                if (!this.isRunning) {
                    return;
                }
                try {
                    List<Split> splits = this.scan.plan().splits();
                    isEmpty = splits.isEmpty();
                    sourceContext.getClass();
                    splits.forEach((v1) -> {
                        r1.collect(v1);
                    });
                    if (this.emitSnapshotWatermark && (watermark = this.scan.watermark()) != null) {
                        sourceContext.emitWatermark(new Watermark(watermark.longValue()));
                    }
                } catch (EndOfScanException e) {
                    LOG.info("Catching EndOfStreamException, the stream is finished.");
                    return;
                }
            }
            if (isEmpty) {
                Thread.sleep(this.monitorInterval);
            }
        }
    }

    public void notifyCheckpointComplete(long j) {
        NavigableMap<Long, Long> headMap = this.nextSnapshotPerCheckpoint.headMap(Long.valueOf(j), true);
        OptionalLong max = headMap.values().stream().mapToLong((v0) -> {
            return v0.longValue();
        }).max();
        StreamTableScan streamTableScan = this.scan;
        streamTableScan.getClass();
        max.ifPresent((v1) -> {
            r1.notifyCheckpointComplete(v1);
        });
        headMap.clear();
    }

    public void cancel() {
        if (this.ctx == null) {
            this.isRunning = false;
            return;
        }
        synchronized (this.ctx.getCheckpointLock()) {
            this.isRunning = false;
        }
    }

    public static DataStream<RowData> buildSource(StreamExecutionEnvironment streamExecutionEnvironment, String str, TypeInformation<RowData> typeInformation, ReadBuilder readBuilder, long j, boolean z, boolean z2, BucketMode bucketMode) {
        SingleOutputStreamOperator forceNonParallel = streamExecutionEnvironment.addSource(new MonitorFunction(readBuilder, j, z), str + "-Monitor", new JavaTypeInfo(Split.class)).forceNonParallel();
        return (bucketMode == BucketMode.BUCKET_UNAWARE ? shuffleUnwareBucket(forceNonParallel) : shuffleNonUnwareBucket(forceNonParallel, z2)).transform(str + "-Reader", typeInformation, new ReadOperator(readBuilder));
    }

    private static DataStream<Split> shuffleUnwareBucket(SingleOutputStreamOperator<Split> singleOutputStreamOperator) {
        return singleOutputStreamOperator.rebalance();
    }

    private static DataStream<Split> shuffleNonUnwareBucket(SingleOutputStreamOperator<Split> singleOutputStreamOperator, boolean z) {
        return singleOutputStreamOperator.partitionCustom((tuple2, i) -> {
            return z ? ChannelComputer.select((BinaryRow) tuple2.f0, ((Integer) tuple2.f1).intValue(), i) : ChannelComputer.select(((Integer) tuple2.f1).intValue(), i);
        }, split -> {
            DataSplit dataSplit = (DataSplit) split;
            return Tuple2.of(dataSplit.partition(), Integer.valueOf(dataSplit.bucket()));
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1314721904:
                if (implMethodName.equals("lambda$shuffleNonUnwareBucket$3fe6d9a8$1")) {
                    z = false;
                    break;
                }
                break;
            case -761658187:
                if (implMethodName.equals("lambda$shuffleNonUnwareBucket$96770dbf$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/Partitioner") && serializedLambda.getFunctionalInterfaceMethodName().equals("partition") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;I)I") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/source/operator/MonitorFunction") && serializedLambda.getImplMethodSignature().equals("(ZLorg/apache/flink/api/java/tuple/Tuple2;I)I")) {
                    boolean booleanValue = ((Boolean) serializedLambda.getCapturedArg(0)).booleanValue();
                    return (tuple2, i) -> {
                        return booleanValue ? ChannelComputer.select((BinaryRow) tuple2.f0, ((Integer) tuple2.f1).intValue(), i) : ChannelComputer.select(((Integer) tuple2.f1).intValue(), i);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/source/operator/MonitorFunction") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/paimon/table/source/Split;)Lorg/apache/flink/api/java/tuple/Tuple2;")) {
                    return split -> {
                        DataSplit dataSplit = (DataSplit) split;
                        return Tuple2.of(dataSplit.partition(), Integer.valueOf(dataSplit.bucket()));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
