/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.hologres.client.impl.binlog.handler;

import com.alibaba.hologres.client.HoloConfig;
import com.alibaba.hologres.client.exception.ExceptionCode;
import com.alibaba.hologres.client.exception.HoloClientException;
import com.alibaba.hologres.client.impl.binlog.ArrayBuffer;
import com.alibaba.hologres.client.impl.binlog.BinlogEventType;
import com.alibaba.hologres.client.impl.binlog.BinlogRecordCollector;
import com.alibaba.hologres.client.impl.binlog.HoloBinlogDecoder;
import com.alibaba.hologres.client.impl.binlog.action.BinlogAction;
import com.alibaba.hologres.client.impl.handler.ActionHandler;
import com.alibaba.hologres.client.impl.util.ConnectionUtil;
import com.alibaba.hologres.client.model.binlog.BinlogHeartBeatRecord;
import com.alibaba.hologres.client.model.binlog.BinlogRecord;
import com.alibaba.hologres.client.utils.Tuple;
import com.alibaba.hologres.org.postgresql.PGProperty;
import com.alibaba.hologres.org.postgresql.jdbc.PgConnection;
import com.alibaba.hologres.org.postgresql.replication.LogSequenceNumber;
import com.alibaba.hologres.org.postgresql.replication.PGReplicationStream;
import com.alibaba.hologres.org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import java.nio.ByteBuffer;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BinlogActionHandler
extends ActionHandler<BinlogAction> {
    public static final Logger LOG = LoggerFactory.getLogger(BinlogActionHandler.class);
    final Properties info;
    final String originalUrl;
    final int binlogReadBatchSize;
    final int maxRetryCount;
    final boolean binlogIgnoreBeforeUpdate;
    final boolean binlogIgnoreDelete;
    final boolean isEnableDirectConnection;
    final long binlogHeartBeatIntervalMs;
    final AtomicBoolean started;
    final ArrayBuffer<BinlogRecord> binlogRecordArray;
    int retryCount;

    public BinlogActionHandler(AtomicBoolean started, HoloConfig config, boolean isShadingEnv, boolean isFixed) {
        super(config);
        this.started = started;
        this.info = new Properties();
        PGProperty.USER.set(this.info, config.getUsername());
        PGProperty.PASSWORD.set(this.info, config.getPassword());
        PGProperty.ASSUME_MIN_SERVER_VERSION.set(this.info, "9.4");
        PGProperty.APPLICATION_NAME.set(this.info, "holo_client_replication");
        PGProperty.REPLICATION.set(this.info, "database");
        PGProperty.SOCKET_TIMEOUT.set(this.info, "120");
        String jdbcUrl = config.getJdbcUrl();
        if (isShadingEnv && jdbcUrl.startsWith("jdbc:postgresql:")) {
            jdbcUrl = "jdbc:hologres:" + jdbcUrl.substring("jdbc:postgresql:".length());
        }
        if (isFixed) {
            jdbcUrl = ConnectionUtil.generateFixedUrl(jdbcUrl);
        }
        this.originalUrl = jdbcUrl;
        this.binlogReadBatchSize = config.getBinlogReadBatchSize();
        this.maxRetryCount = config.getRetryCount();
        this.binlogIgnoreBeforeUpdate = config.getBinlogIgnoreBeforeUpdate();
        this.binlogIgnoreDelete = config.getBinlogIgnoreDelete();
        this.binlogHeartBeatIntervalMs = config.getBinlogHeartBeatIntervalMs();
        this.binlogRecordArray = new ArrayBuffer(this.binlogReadBatchSize, BinlogRecord[].class);
        this.isEnableDirectConnection = config.isEnableDirectConnection();
    }

    @Override
    public void handle(BinlogAction action) {
        this.doHandle(action);
    }

    private void resetRetryCount() {
        this.retryCount = this.maxRetryCount;
    }

    private void doHandle(BinlogAction action) {
        ConnectionContext connContext = new ConnectionContext(action, action.getLsn(), action.getTimestamp());
        HoloBinlogDecoder decoder = null;
        try {
            decoder = new HoloBinlogDecoder(action.getSupplier(), (Boolean)this.binlogIgnoreDelete, (Boolean)this.binlogIgnoreBeforeUpdate);
        }
        catch (HoloClientException e) {
            action.getCollector().exceptionally(action.getShardId(), e);
            return;
        }
        this.resetRetryCount();
        while (this.started.get()) {
            try {
                connContext.init();
                this.fetch(action.getShardId(), action.getCollector(), connContext, decoder, action.getCommitJob());
            }
            catch (SQLException e) {
                if (--this.retryCount < 1) {
                    action.getCollector().exceptionally(action.getShardId(), e);
                    break;
                }
                LOG.warn("shardId " + action.getShardId() + " binlog read fail, retry", e);
            }
            catch (HoloClientException | InterruptedException e) {
                action.getCollector().exceptionally(action.getShardId(), e);
                break;
            }
            catch (Throwable e) {
                action.getCollector().exceptionally(action.getShardId(), e);
                throw e;
            }
            finally {
                connContext.close();
            }
        }
    }

    private void fetch(int shardId, BinlogRecordCollector collector, ConnectionContext connContext, HoloBinlogDecoder decoder, Queue<Tuple<CompletableFuture<Void>, Long>> commitJob) throws SQLException, HoloClientException, InterruptedException {
        while (this.started.get()) {
            this.tryFlush(connContext, commitJob);
            if (this.binlogRecordArray.isReadable()) {
                while (this.started.get() && this.binlogRecordArray.remain() > 0) {
                    this.tryFlush(connContext, commitJob);
                    collector.emit(shardId, this.binlogRecordArray);
                }
            }
            ByteBuffer byteBuffer = connContext.pgReplicationStream.read();
            this.binlogRecordArray.beginWrite();
            decoder.decode(shardId, byteBuffer, this.binlogRecordArray);
            this.binlogRecordArray.beginRead();
            this.resetRetryCount();
            if (this.binlogRecordArray.remain() == 0) {
                long current;
                if (this.binlogHeartBeatIntervalMs > -1L && (current = System.currentTimeMillis()) - connContext.getTimestamp() > this.binlogHeartBeatIntervalMs) {
                    connContext.updateTimestamp(current);
                    BinlogHeartBeatRecord record = new BinlogHeartBeatRecord(decoder.getSchema(), connContext.startLsn, BinlogEventType.HeartBeat, current * 1000L);
                    record.setShardId(shardId);
                    this.binlogRecordArray.beginWrite();
                    this.binlogRecordArray.add(record);
                    this.binlogRecordArray.beginRead();
                }
            } else {
                BinlogRecord lastRecord = this.binlogRecordArray.last();
                connContext.setEmittedLsn(lastRecord.getBinlogLsn(), lastRecord.getBinlogTimestamp() / 1000L);
            }
            while (this.started.get() && this.binlogRecordArray.remain() > 0) {
                this.tryFlush(connContext, commitJob);
                collector.emit(shardId, this.binlogRecordArray);
            }
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void tryFlush(ConnectionContext connContext, Queue<Tuple<CompletableFuture<Void>, Long>> commitJob) throws SQLException {
        Tuple<CompletableFuture<Void>, Long> job = commitJob.poll();
        if (job == null) {
            return;
        }
        int flushRetryCount = this.maxRetryCount;
        boolean done = false;
        try {
            while (!done && --flushRetryCount > 0) {
                try {
                    if (!connContext.isInit()) {
                        connContext.init();
                    }
                    connContext.pgReplicationStream.setFlushedLSN(LogSequenceNumber.valueOf((Long)job.r));
                    connContext.pgReplicationStream.forceUpdateStatus();
                    ((CompletableFuture)job.l).complete(null);
                    done = true;
                }
                catch (SQLException e) {
                    if (flushRetryCount <= 0) throw e;
                    connContext.close();
                    continue;
                    return;
                }
            }
        }
        catch (SQLException e) {
            ((CompletableFuture)job.l).completeExceptionally(e);
            throw e;
        }
        finally {
            if (!((CompletableFuture)job.l).isDone()) {
                ((CompletableFuture)job.l).completeExceptionally(new HoloClientException(ExceptionCode.INTERNAL_ERROR, "unknown exception when flush binlog lsn"));
            }
        }
    }

    @Override
    public String getCostMsMetricName() {
        return null;
    }

    class ConnectionContext {
        PgConnection conn = null;
        PGReplicationStream pgReplicationStream = null;
        private final BinlogAction action;
        private long startLsn;
        private String startTime;
        private long timestamp;

        public ConnectionContext(BinlogAction action, long emittedLsn, String startTime) {
            this.action = action;
            this.startLsn = emittedLsn;
            this.startTime = startTime;
            this.timestamp = -1L;
        }

        public void setEmittedLsn(long emittedLsn, long timestamp) {
            this.startLsn = emittedLsn;
            this.timestamp = timestamp;
            this.startTime = null;
        }

        public void updateTimestamp(long timestamp) {
            this.timestamp = timestamp;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public void init() throws SQLException {
            try {
                String url = BinlogActionHandler.this.originalUrl;
                if (BinlogActionHandler.this.isEnableDirectConnection) {
                    url = ConnectionUtil.getDirectConnectionJdbcUrl(BinlogActionHandler.this.originalUrl, BinlogActionHandler.this.info);
                }
                this.conn = DriverManager.getConnection(url, BinlogActionHandler.this.info).unwrap(PgConnection.class);
                ChainedLogicalStreamBuilder logicalStreamBuilder = (ChainedLogicalStreamBuilder)this.conn.getReplicationAPI().replicationStream().logical().withSlotOption("parallel_index", this.action.getShardId()).withSlotOption("batch_size", BinlogActionHandler.this.binlogReadBatchSize).withStatusInterval(10, TimeUnit.SECONDS);
                if (this.action.getSlotName() != null) {
                    logicalStreamBuilder.withSlotName(this.action.getSlotName());
                } else {
                    logicalStreamBuilder.withSlotOption("table_name", this.action.getTableName());
                }
                if (this.startLsn > -1L) {
                    logicalStreamBuilder.withSlotOption("start_lsn", String.valueOf(this.startLsn));
                }
                if (this.startTime != null) {
                    logicalStreamBuilder.withSlotOption("start_time", this.startTime);
                }
                LOG.info("shard {} start, start_lsn={}, start_time={}", this.action.getShardId(), this.startLsn, this.startTime);
                this.pgReplicationStream = logicalStreamBuilder.start();
            }
            catch (SQLException e) {
                this.close();
                throw e;
            }
        }

        public boolean isInit() {
            return this.conn != null;
        }

        public void close() {
            if (this.conn != null) {
                try {
                    this.conn.close();
                }
                catch (SQLException sQLException) {
                    // empty catch block
                }
                this.conn = null;
            }
            this.pgReplicationStream = null;
        }
    }
}

