package org.apache.paimon.flink.source;

import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Either;
import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.ExecutorUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.ThreadUtils;

/* loaded from: input_file:org/apache/paimon/flink/source/AppendBypassCoordinateOperator.class */
public class AppendBypassCoordinateOperator<CommitT> extends AbstractStreamOperator<Either<CommitT, UnawareAppendCompactionTask>> implements OneInputStreamOperator<CommitT, Either<CommitT, UnawareAppendCompactionTask>>, ProcessingTimeService.ProcessingTimeCallback {
    private static final long MAX_PENDING_TASKS = 5000;
    private final FileStoreTable table;
    private transient ScheduledExecutorService executorService;
    private transient LinkedBlockingQueue<UnawareAppendCompactionTask> compactTasks;

    public AppendBypassCoordinateOperator(FileStoreTable fileStoreTable, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) {
        this.table = fileStoreTable;
        this.processingTimeService = processingTimeService;
        this.chainingStrategy = ChainingStrategy.HEAD;
    }

    public void open() throws Exception {
        super.open();
        Preconditions.checkArgument(getRuntimeContext().getNumberOfParallelSubtasks() == 1, "Compaction Coordinator parallelism in paimon MUST be one.");
        long millis = this.table.coreOptions().continuousDiscoveryInterval().toMillis();
        this.compactTasks = new LinkedBlockingQueue<>();
        UnawareAppendTableCompactionCoordinator unawareAppendTableCompactionCoordinator = new UnawareAppendTableCompactionCoordinator(this.table, true, null);
        this.executorService = Executors.newSingleThreadScheduledExecutor(ThreadUtils.newDaemonThreadFactory("Compaction Coordinator"));
        this.executorService.scheduleWithFixedDelay(() -> {
            asyncPlan(unawareAppendTableCompactionCoordinator);
        }, 0L, millis, TimeUnit.MILLISECONDS);
        getProcessingTimeService().scheduleWithFixedDelay(this, 0L, millis);
    }

    private void asyncPlan(UnawareAppendTableCompactionCoordinator unawareAppendTableCompactionCoordinator) {
        while (this.compactTasks.size() < 5000) {
            List<UnawareAppendCompactionTask> run = unawareAppendTableCompactionCoordinator.run();
            this.compactTasks.addAll(run);
            if (run.isEmpty()) {
                return;
            }
        }
    }

    public void onProcessingTime(long j) {
        while (true) {
            UnawareAppendCompactionTask poll = this.compactTasks.poll();
            if (poll == null) {
                return;
            } else {
                this.output.collect(new StreamRecord(Either.Right(poll)));
            }
        }
    }

    public void processElement(StreamRecord<CommitT> streamRecord) throws Exception {
        this.output.collect(new StreamRecord(Either.Left(streamRecord.getValue())));
    }

    public void close() throws Exception {
        ExecutorUtils.gracefulShutdown(1L, TimeUnit.MINUTES, this.executorService);
        super.close();
    }
}
