/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.hologres.client;

import com.alibaba.hologres.client.HoloConfig;
import com.alibaba.hologres.client.Subscribe;
import com.alibaba.hologres.client.exception.ExceptionCode;
import com.alibaba.hologres.client.exception.HoloClientException;
import com.alibaba.hologres.client.impl.binlog.ArrayBuffer;
import com.alibaba.hologres.client.impl.binlog.BinlogEventType;
import com.alibaba.hologres.client.impl.binlog.BinlogRecordCollector;
import com.alibaba.hologres.client.impl.binlog.Committer;
import com.alibaba.hologres.client.model.Record;
import com.alibaba.hologres.client.model.binlog.BinlogRecord;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BinlogShardGroupReader
implements Closeable {
    public static final Logger LOGGER = LoggerFactory.getLogger(BinlogShardGroupReader.class);
    private final HoloConfig config;
    private final Subscribe subscribe;
    private final Map<Integer, Committer> committerMap;
    private final AtomicBoolean started;
    BlockingQueue<BinlogRecord> queue;
    volatile HoloClientException exception = null;
    Collector collector;
    List<Thread> threadList = new ArrayList<Thread>();
    int bufferPosition = 0;
    List<BinlogRecord> buffer = new ArrayList<BinlogRecord>();

    public BinlogShardGroupReader(HoloConfig config, Subscribe subscribe, int shardCount, Map<Integer, Committer> committerMap, AtomicBoolean started) {
        this.config = config;
        this.subscribe = subscribe;
        this.committerMap = committerMap;
        this.queue = new ArrayBlockingQueue<BinlogRecord>(Math.max(1024, committerMap.size() * config.getBinlogReadBatchSize() / 2));
        this.started = started;
        this.collector = new Collector();
    }

    private void tryFetch(long target) throws InterruptedException, TimeoutException, HoloClientException {
        if (this.buffer.size() <= this.bufferPosition) {
            if (this.buffer.size() > 0) {
                this.buffer.clear();
            }
            BinlogRecord r = null;
            while (r == null) {
                if (null != this.exception) {
                    throw this.exception;
                }
                if (System.nanoTime() > target) {
                    throw new TimeoutException();
                }
                r = this.queue.poll(1000L, TimeUnit.MILLISECONDS);
                if (r == null) continue;
                this.buffer.add(r);
                this.queue.drainTo(this.buffer);
                this.bufferPosition = 0;
            }
        }
    }

    public Collector getCollector() {
        return this.collector;
    }

    public BinlogRecord getBinlogRecord() throws HoloClientException, InterruptedException, TimeoutException {
        return this.getBinlogRecord(-1L);
    }

    public BinlogRecord getBinlogRecord(long timeout) throws HoloClientException, InterruptedException, TimeoutException {
        long target;
        if (null != this.exception) {
            throw this.exception;
        }
        Record r = null;
        long l = target = timeout > 0L ? System.nanoTime() + timeout * 1000000L : Long.MAX_VALUE;
        while (r == null) {
            this.tryFetch(target);
            if (this.buffer.size() > this.bufferPosition) {
                r = this.buffer.get(this.bufferPosition++);
            }
            if (r == null) continue;
            Committer committer = this.committerMap.get(r.getShardId());
            if (committer == null) {
                throw new HoloClientException(ExceptionCode.INTERNAL_ERROR, "reader for shard " + r.getShardId() + " is not exists!");
            }
            committer.updateLastReadLsn(((BinlogRecord)r).getBinlogLsn());
            if ((((BinlogRecord)r).getBinlogEventType() != BinlogEventType.DELETE || !this.config.getBinlogIgnoreDelete()) && (((BinlogRecord)r).getBinlogEventType() != BinlogEventType.BEFORE_UPDATE || !this.config.getBinlogIgnoreBeforeUpdate())) continue;
            r = null;
        }
        return r;
    }

    @Override
    public void close() {
        this.cancel();
    }

    public void commit(long timeoutMs) throws HoloClientException, TimeoutException, InterruptedException {
        ArrayList<CompletableFuture<Void>> futureList = new ArrayList<CompletableFuture<Void>>();
        for (Map.Entry<Integer, Committer> entry : this.committerMap.entrySet()) {
            futureList.add(this.commitFlushedLsn(entry.getValue(), entry.getKey(), entry.getValue().getLastReadLsn(), timeoutMs));
        }
        long targetMs = System.currentTimeMillis() + timeoutMs;
        boolean index = false;
        for (CompletableFuture completableFuture : futureList) {
            long currentMs = System.currentTimeMillis();
            if (currentMs < targetMs) {
                try {
                    completableFuture.get(targetMs - currentMs, TimeUnit.MILLISECONDS);
                    continue;
                }
                catch (ExecutionException e) {
                    Throwable cause = e.getCause();
                    if (cause instanceof HoloClientException) {
                        throw (HoloClientException)cause;
                    }
                    throw new HoloClientException(ExceptionCode.INTERNAL_ERROR, "commit fail", cause);
                }
                catch (TimeoutException e) {
                    throw e;
                }
            }
            throw new TimeoutException();
        }
    }

    public CompletableFuture<Void> commitFlushedLsn(Committer committer, int shardId, long lsn, long timeoutMs) throws TimeoutException, InterruptedException {
        LOGGER.info("begin commit {} shardId {} flushedLsn to {}", this.subscribe.getTableName(), shardId, lsn);
        return committer.commit(lsn, timeoutMs).thenRun(() -> LOGGER.info("done commit {} shardId {} flushedLsn to {}", this.subscribe.getTableName(), shardId, lsn));
    }

    public void commitFlushedLsn(int shardId, long lsn, long timeoutMs) throws HoloClientException, TimeoutException, InterruptedException {
        Committer committer = this.committerMap.get(shardId);
        if (committer != null) {
            CompletableFuture<Void> future = this.commitFlushedLsn(committer, shardId, lsn, timeoutMs);
            try {
                future.get(timeoutMs, TimeUnit.MILLISECONDS);
            }
            catch (ExecutionException e) {
                Throwable cause = e.getCause();
                if (cause instanceof HoloClientException) {
                    throw (HoloClientException)cause;
                }
                throw new HoloClientException(ExceptionCode.INTERNAL_ERROR, "commit fail", cause);
            }
        } else {
            throw new HoloClientException(ExceptionCode.INVALID_REQUEST, "unknown shard " + shardId);
        }
    }

    public void addThread(Thread thread) {
        this.threadList.add(thread);
    }

    public void cancel() {
        this.started.set(false);
        while (this.queue.size() > 0) {
            this.queue.clear();
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        for (Thread thread : this.threadList) {
            if (!thread.isAlive()) continue;
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            thread.interrupt();
        }
    }

    public boolean isCanceled() {
        return !this.started.get();
    }

    class Collector
    implements BinlogRecordCollector {
        Collector() {
        }

        @Override
        public BinlogRecord emit(int shardId, ArrayBuffer<BinlogRecord> recordList) throws InterruptedException {
            BinlogRecord record;
            boolean succ;
            BinlogRecord lastSuccessRecord = null;
            while (succ = BinlogShardGroupReader.this.queue.offer(record = recordList.peek(), 1000L, TimeUnit.MILLISECONDS)) {
                lastSuccessRecord = recordList.pop();
                if (recordList.remain() > 0) continue;
            }
            return lastSuccessRecord;
        }

        @Override
        public void exceptionally(int shardId, Throwable e) {
            LOGGER.error("shard id " + shardId + "fetch binlog fail", e);
            BinlogShardGroupReader.this.exception = e instanceof HoloClientException ? (HoloClientException)e : new HoloClientException(ExceptionCode.INTERNAL_ERROR, "shard id " + shardId + " fetch binlog fail", e);
        }
    }
}

