/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.hologres.client.impl.collector;

import com.alibaba.hologres.client.Get;
import com.alibaba.hologres.client.HoloConfig;
import com.alibaba.hologres.client.exception.HoloClientException;
import com.alibaba.hologres.client.impl.ExecutionPool;
import com.alibaba.hologres.client.impl.collector.DefaultResizePolicy;
import com.alibaba.hologres.client.impl.collector.ResizePolicy;
import com.alibaba.hologres.client.impl.collector.TableCollector;
import com.alibaba.hologres.client.impl.util.ExceptionUtil;
import com.alibaba.hologres.client.model.Record;
import com.alibaba.hologres.client.model.TableName;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ActionCollector {
    private static final Logger LOGGER = LoggerFactory.getLogger(ActionCollector.class);
    Map<TableName, TableCollector> map;
    private ReentrantReadWriteLock flushLock = new ReentrantReadWriteLock();
    final HoloConfig config;
    final ExecutionPool pool;
    final ArrayBlockingQueue<Get> queue;
    private final ResizePolicy resizePolicy;
    private final long writerShardCountResizeIntervalNano;
    AtomicReference<HoloClientException> lastException = new AtomicReference<Object>(null);

    public ActionCollector(HoloConfig config, ExecutionPool pool, ArrayBlockingQueue<Get> queue) {
        this.map = new ConcurrentHashMap<TableName, TableCollector>();
        this.config = config;
        this.pool = pool;
        this.queue = queue;
        this.resizePolicy = new DefaultResizePolicy();
        this.resizePolicy.init(config);
        this.writerShardCountResizeIntervalNano = config.getWriterShardCountResizeIntervalMs() * 1000000L;
    }

    public long getByteSize() {
        return this.map.values().stream().collect(Collectors.summingLong(TableCollector::getByteSize));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void append(Record record) throws HoloClientException {
        this.flushLock.readLock().lock();
        try {
            TableCollector pairArray = this.map.computeIfAbsent(record.getTableName(), tableName -> new TableCollector(this.config, this.pool));
            pairArray.append(record);
            HoloClientException exception = this.lastException.getAndSet(null);
            if (null != exception) {
                throw exception;
            }
        }
        finally {
            this.flushLock.readLock().unlock();
        }
    }

    public void appendGet(Get get) {
        try {
            if (!this.queue.offer(get, 10000L, TimeUnit.MILLISECONDS)) {
                get.getFuture().completeExceptionally(new TimeoutException());
            }
        }
        catch (InterruptedException e) {
            get.getFuture().completeExceptionally(e);
        }
    }

    public void appendGet(List<Get> list) {
        try {
            boolean timeout = false;
            for (Get get : list) {
                if (timeout) {
                    get.getFuture().completeExceptionally(new TimeoutException());
                    continue;
                }
                if (this.queue.offer(get, 10000L, TimeUnit.MILLISECONDS)) continue;
                get.getFuture().completeExceptionally(new TimeoutException());
                timeout = true;
            }
        }
        catch (InterruptedException e) {
            for (Get get : list) {
                get.getFuture().completeExceptionally(e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void tryFlush() {
        this.flushLock.readLock().lock();
        try {
            Iterator<Map.Entry<TableName, TableCollector>> iter = this.map.entrySet().iterator();
            while (iter.hasNext()) {
                TableCollector array = iter.next().getValue();
                try {
                    array.flush(false);
                }
                catch (HoloClientException e) {
                    LOGGER.error("try flush fail", e);
                    this.lastException.accumulateAndGet(e, (lastOne, newOne) -> ExceptionUtil.merge(lastOne, newOne));
                }
            }
        }
        finally {
            this.flushLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void flush(boolean internal) throws HoloClientException {
        this.flushLock.writeLock().lock();
        try {
            HoloClientException last;
            HoloClientException exception = null;
            int doneCount = 0;
            AtomicInteger uncommittedActionCount = new AtomicInteger(0);
            boolean async = true;
            while (true) {
                doneCount = 0;
                uncommittedActionCount.set(0);
                Iterator<Map.Entry<TableName, TableCollector>> iter = this.map.entrySet().iterator();
                while (iter.hasNext()) {
                    TableCollector array = iter.next().getValue();
                    try {
                        if (!array.flush(true, async, uncommittedActionCount)) continue;
                        ++doneCount;
                    }
                    catch (HoloClientException e) {
                        exception = ExceptionUtil.merge(exception, e);
                    }
                }
                if (doneCount == this.map.size()) break;
                if (uncommittedActionCount.get() != 0) continue;
                async = false;
            }
            this.resize();
            if (exception != null) {
                this.lastException.accumulateAndGet(exception, (lastOne, newOne) -> ExceptionUtil.merge(lastOne, newOne));
            }
            if (!internal && (last = (HoloClientException)this.lastException.getAndSet(null)) != null) {
                throw last;
            }
        }
        finally {
            this.flushLock.writeLock().unlock();
        }
    }

    private void resize() {
        long currentNano = System.nanoTime();
        for (Map.Entry<TableName, TableCollector> entry : this.map.entrySet()) {
            int size;
            TableName tableName = entry.getKey();
            TableCollector tableCollector = entry.getValue();
            if (tableCollector.getStat().getNanoTime() + this.writerShardCountResizeIntervalNano >= currentNano) continue;
            int currentSize = tableCollector.getShardCount();
            if (currentSize != (size = this.resizePolicy.calculate(tableName, tableCollector.getStat(), tableCollector.getShardCount(), this.pool.getWorkerCount(), currentNano))) {
                LOGGER.info("resize table {} shard size , {} -> {}", tableName, currentSize, size);
                tableCollector.resize(size);
            }
            tableCollector.getStat().clear();
        }
    }
}

