/*
 * Decompiled with CFR 0.152.
 */
package shaded.adbmysql.io.debezium.connector.mysql.legacy;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shaded.adbmysql.io.debezium.connector.mysql.HaltingPredicate;
import shaded.adbmysql.io.debezium.connector.mysql.MySqlPartition;
import shaded.adbmysql.io.debezium.connector.mysql.legacy.BinlogReader;
import shaded.adbmysql.io.debezium.connector.mysql.legacy.MySqlTaskContext;
import shaded.adbmysql.io.debezium.connector.mysql.legacy.Reader;
import shaded.adbmysql.io.debezium.connector.mysql.legacy.SourceInfo;
import shaded.adbmysql.io.debezium.document.Document;

public class ReconcilingBinlogReader
implements Reader {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReconcilingBinlogReader.class);
    private final BinlogReader binlogReaderA;
    private final BinlogReader binlogReaderB;
    private final BinlogReader unifiedReader;
    private BinlogReader reconcilingReader;
    private Boolean aReaderLeading = null;
    private final AtomicReference<MySqlPartition> partition = new AtomicReference();
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final AtomicBoolean completed = new AtomicBoolean(false);
    private final AtomicReference<Consumer<MySqlPartition>> uponCompletion = new AtomicReference();
    private final long serverId;

    public ReconcilingBinlogReader(BinlogReader binlogReaderA, BinlogReader binlogReaderB, BinlogReader unifiedReader, long serverId) {
        this.binlogReaderA = binlogReaderA;
        this.binlogReaderB = binlogReaderB;
        this.unifiedReader = unifiedReader;
        this.serverId = serverId;
    }

    @Override
    public String name() {
        return "reconcilingBinlogReader";
    }

    @Override
    public Reader.State state() {
        if (this.running.get()) {
            return Reader.State.RUNNING;
        }
        return this.completed.get() ? Reader.State.STOPPED : Reader.State.STOPPING;
    }

    @Override
    public void uponCompletion(Consumer<MySqlPartition> handler) {
        this.uponCompletion.set(handler);
    }

    @Override
    public void start(MySqlPartition partition) {
        if (this.running.compareAndSet(false, true)) {
            this.partition.set(partition);
            this.completed.set(false);
            this.determineLeadingReader();
            MySqlTaskContext laggingReaderContext = this.getLaggingReader().context;
            OffsetLimitPredicate offsetLimitPredicate = new OffsetLimitPredicate(this.getLeadingReader().getLastOffset(), laggingReaderContext.gtidSourceFilter());
            this.reconcilingReader = new BinlogReader("innerReconcilingReader", laggingReaderContext, offsetLimitPredicate, this.serverId);
            this.reconcilingReader.start(partition);
        }
    }

    @Override
    public void stop() {
        if (this.running.compareAndSet(true, false)) {
            try {
                LOGGER.info("Stopping the {} reader", (Object)this.reconcilingReader.name());
                this.reconcilingReader.stop();
                this.reconcilingReader.context.shutdown();
            }
            catch (Throwable t) {
                LOGGER.error("Unexpected error stopping the {} reader", (Object)this.reconcilingReader.name());
            }
        }
    }

    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        List<SourceRecord> innerReaderPoll = this.reconcilingReader.poll();
        if (innerReaderPoll == null) {
            this.completeSuccessfully();
        }
        return innerReaderPoll;
    }

    private void completeSuccessfully() {
        if (this.completed.compareAndSet(false, true)) {
            this.stop();
            this.setupUnifiedReader();
            LOGGER.info("Completed reconciliation of parallel readers.");
            Consumer completionHandler = this.uponCompletion.getAndSet(null);
            if (completionHandler != null) {
                completionHandler.accept(this.partition.get());
            }
        }
    }

    private void setupUnifiedReader() {
        this.unifiedReader.context.loadHistory(this.getLeadingReader().context.source());
        this.unifiedReader.context.source().setFilterDataFromConfig(this.unifiedReader.context.config());
        Map<String, ?> keyedOffset = this.reconcilingReader.getLastOffset() == null ? this.getLeadingReader().getLastOffset() : this.reconcilingReader.getLastOffset();
        this.unifiedReader.context.source().setCompletedGtidSet((String)keyedOffset.get("gtids"));
        this.unifiedReader.context.source().setBinlogStartPoint((String)keyedOffset.get("file"), (Long)keyedOffset.get("pos"));
    }

    private void determineLeadingReader() {
        boolean noOffsets;
        Map<String, ?> aOffset = this.binlogReaderA.getLastOffset();
        Map<String, ?> bOffset = this.binlogReaderB.getLastOffset();
        boolean aNotStopped = this.binlogReaderA.state() != Reader.State.STOPPED;
        boolean bNotStopped = this.binlogReaderB.state() != Reader.State.STOPPED;
        boolean bl = noOffsets = aOffset == null && bOffset == null;
        if (noOffsets || aNotStopped || bNotStopped) {
            throw new IllegalStateException("Cannot determine leading reader until both source readers have completed.");
        }
        if (aOffset == null) {
            this.aReaderLeading = false;
        } else if (bOffset == null) {
            this.aReaderLeading = true;
        } else {
            Document aDocument = SourceInfo.createDocumentFromOffset(aOffset);
            Document bDocument = SourceInfo.createDocumentFromOffset(bOffset);
            this.aReaderLeading = SourceInfo.isPositionAtOrBefore(bDocument, aDocument, this.binlogReaderA.context.gtidSourceFilter());
        }
        if (this.aReaderLeading.booleanValue()) {
            LOGGER.info("old tables leading; reading only from new tables");
        } else {
            LOGGER.info("new tables leading; reading only from old tables");
        }
    }

    BinlogReader getLeadingReader() {
        this.checkLaggingLeadingInfo();
        return this.aReaderLeading != false ? this.binlogReaderA : this.binlogReaderB;
    }

    BinlogReader getLaggingReader() {
        this.checkLaggingLeadingInfo();
        return this.aReaderLeading != false ? this.binlogReaderB : this.binlogReaderA;
    }

    private void checkLaggingLeadingInfo() {
        if (this.aReaderLeading == null) {
            throw new IllegalStateException("Cannot return leading or lagging readers until this reader has started.");
        }
    }

    static class OffsetLimitPredicate
    implements HaltingPredicate {
        private final Document leadingReaderFinalOffsetDocument;
        private final Predicate<String> gtidFilter;

        OffsetLimitPredicate(Map<String, ?> leadingReaderFinalOffset, Predicate<String> gtidFilter) {
            this.leadingReaderFinalOffsetDocument = SourceInfo.createDocumentFromOffset(leadingReaderFinalOffset);
            this.gtidFilter = gtidFilter;
        }

        @Override
        public boolean accepts(SourceRecord sourceRecord) {
            Document offsetDocument = SourceInfo.createDocumentFromOffset(sourceRecord.sourceOffset());
            return !SourceInfo.isPositionAtOrBefore(this.leadingReaderFinalOffsetDocument, offsetDocument, this.gtidFilter);
        }
    }
}

