/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.utils;

import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.paimon.data.RandomAccessInputView;
import org.apache.paimon.data.SimpleCollectingOutputView;
import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.memory.ArraySegmentPool;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.utils.CloseableIterator;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.Pair;

public class ParallelExecution<T, E>
implements Closeable {
    private final Serializer<T> serializer;
    private final int pageSize;
    private final BlockingQueue<MemorySegment> idlePages;
    private final BlockingQueue<ParallelBatch<T, E>> results;
    private final ExecutorService executorService;
    private final AtomicReference<Throwable> exception;
    private final CountDownLatch latch;

    public ParallelExecution(Serializer<T> serializer, int pageSize, int parallelism, List<Supplier<Pair<RecordReader<T>, E>>> readers) {
        this.serializer = serializer;
        this.pageSize = pageSize;
        int totalPages = parallelism * 2;
        this.idlePages = new ArrayBlockingQueue<MemorySegment>(totalPages);
        for (int i = 0; i < totalPages; ++i) {
            this.idlePages.add(MemorySegment.allocateHeapMemory(pageSize));
        }
        this.executorService = new ThreadPoolExecutor(parallelism, parallelism, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ExecutorThreadFactory(Thread.currentThread().getName() + "-parallel"));
        this.results = new LinkedBlockingQueue<ParallelBatch<T, E>>();
        this.exception = new AtomicReference();
        this.latch = new CountDownLatch(readers.size());
        for (Supplier<Pair<RecordReader<T>, E>> readerSupplier : readers) {
            Serializer<T> duplicate = this.serializer.duplicate();
            this.executorService.submit(() -> this.asyncRead(readerSupplier, duplicate));
        }
    }

    @Nullable
    public ParallelBatch<T, E> take() throws InterruptedException, IOException {
        ParallelBatch<T, E> element;
        do {
            if (this.latch.getCount() == 0L && this.results.isEmpty()) {
                return null;
            }
            element = this.results.poll(2L, TimeUnit.SECONDS);
            if (this.exception.get() == null) continue;
            throw new IOException(this.exception.get());
        } while (element == null);
        return element;
    }

    private void asyncRead(Supplier<Pair<RecordReader<T>, E>> readerSupplier, Serializer<T> serializer) {
        Pair<RecordReader<T>, E> pair = readerSupplier.get();
        try (CloseableIterator<T> iterator = pair.getLeft().toCloseableIterator();){
            int count = 0;
            SimpleCollectingOutputView outputView = null;
            block13: while (iterator.hasNext()) {
                Object next = iterator.next();
                while (true) {
                    if (outputView == null) {
                        outputView = this.newOutputView();
                        count = 0;
                    }
                    try {
                        serializer.serialize(next, outputView);
                        ++count;
                        continue block13;
                    }
                    catch (EOFException e) {
                        if (count == 0) {
                            throw new RuntimeException(String.format("Current page size %s is too small, one record cannot fit into a single page. Please increase the 'page-size' table option.", new MemorySize(this.pageSize).toHumanReadableString()));
                        }
                        this.sendToResults(outputView, count, pair.getRight());
                        outputView = null;
                        continue;
                    }
                    break;
                }
            }
            if (outputView != null) {
                this.sendToResults(outputView, count, pair.getRight());
            }
            this.latch.countDown();
        }
        catch (Throwable e) {
            this.exception.set(e);
        }
    }

    private SimpleCollectingOutputView newOutputView() throws InterruptedException {
        MemorySegment page = this.idlePages.take();
        return new SimpleCollectingOutputView(new ArraySegmentPool(Collections.singletonList(page)), page.size());
    }

    private void sendToResults(SimpleCollectingOutputView outputView, int count, E extraMessage) {
        this.results.add(this.iterator(outputView.getCurrentSegment(), count, extraMessage));
    }

    @Override
    public void close() throws IOException {
        this.executorService.shutdownNow();
    }

    private ParallelBatch<T, E> iterator(final MemorySegment page, final int numRecords, final E extraMessage) {
        final RandomAccessInputView inputView = new RandomAccessInputView(new ArrayList<MemorySegment>(Collections.singletonList(page)), page.size());
        return new ParallelBatch<T, E>(){
            int numReturn = 0;

            @Override
            @Nullable
            public T next() throws IOException {
                if (this.numReturn >= numRecords) {
                    return null;
                }
                ++this.numReturn;
                return ParallelExecution.this.serializer.deserialize(inputView);
            }

            @Override
            public void releaseBatch() {
                ParallelExecution.this.idlePages.add(page);
            }

            @Override
            public E extraMessage() {
                return extraMessage;
            }
        };
    }

    public static interface ParallelBatch<T, E> {
        @Nullable
        public T next() throws IOException;

        public void releaseBatch();

        public E extraMessage();
    }
}

