/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.connectors.analyticdb.debezium.task;

import com.ververica.cdc.connectors.analyticdb.debezium.dispatcher.EventDispatcherImpl;
import com.ververica.cdc.connectors.analyticdb.debezium.dispatcher.SignalEventDispatcher;
import com.ververica.cdc.connectors.analyticdb.debezium.reader.SnapshotSplitReader;
import com.ververica.cdc.connectors.analyticdb.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.analyticdb.source.offset.BinlogOffsetUtils;
import com.ververica.cdc.connectors.analyticdb.source.split.MySqlBinlogSplit;
import com.ververica.cdc.connectors.analyticdb.source.utils.RecordUtils;
import com.ververica.cdc.connectors.shaded.com.github.shyiko.mysql.binlog.event.ByteArrayEventData;
import com.ververica.cdc.connectors.shaded.com.github.shyiko.mysql.binlog.event.Event;
import com.ververica.cdc.connectors.shaded.com.github.shyiko.mysql.binlog.event.EventType;
import com.ververica.cdc.connectors.shaded.com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import java.nio.charset.StandardCharsets;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shaded.adbmysql.io.debezium.DebeziumException;
import shaded.adbmysql.io.debezium.connector.mysql.ADBMySqlConnection;
import shaded.adbmysql.io.debezium.connector.mysql.ADBMySqlStreamingChangeEventSource;
import shaded.adbmysql.io.debezium.connector.mysql.MySqlConnectorConfig;
import shaded.adbmysql.io.debezium.connector.mysql.MySqlOffsetContext;
import shaded.adbmysql.io.debezium.connector.mysql.MySqlPartition;
import shaded.adbmysql.io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
import shaded.adbmysql.io.debezium.connector.mysql.MySqlTaskContext;
import shaded.adbmysql.io.debezium.pipeline.ErrorHandler;
import shaded.adbmysql.io.debezium.pipeline.source.spi.ChangeEventSource;
import shaded.adbmysql.io.debezium.relational.TableId;
import shaded.adbmysql.io.debezium.util.Clock;

public class ADBMySqlBinlogSplitReadTask
extends ADBMySqlStreamingChangeEventSource {
    private static final Logger LOG = LoggerFactory.getLogger(ADBMySqlBinlogSplitReadTask.class);
    private final MySqlBinlogSplit binlogSplit;
    private final EventDispatcherImpl<TableId> eventDispatcher;
    private final SignalEventDispatcher signalEventDispatcher;
    private final ErrorHandler errorHandler;
    private final Predicate<Event> eventFilter;
    private ChangeEventSource.ChangeEventSourceContext context;

    public ADBMySqlBinlogSplitReadTask(MySqlConnectorConfig connectorConfig, ADBMySqlConnection connection, EventDispatcherImpl<TableId> dispatcher, SignalEventDispatcher signalEventDispatcher, ErrorHandler errorHandler, Clock clock, MySqlTaskContext taskContext, MySqlStreamingChangeEventSourceMetrics metrics, MySqlBinlogSplit binlogSplit, Predicate<Event> eventFilter) {
        super(connectorConfig, connection, dispatcher, errorHandler, clock, taskContext, metrics);
        this.binlogSplit = binlogSplit;
        this.eventDispatcher = dispatcher;
        this.errorHandler = errorHandler;
        this.signalEventDispatcher = signalEventDispatcher;
        this.eventFilter = eventFilter;
    }

    @Override
    public void execute(ChangeEventSource.ChangeEventSourceContext context, MySqlPartition partition, MySqlOffsetContext offsetContext) throws InterruptedException {
        this.context = context;
        super.execute(context, partition, offsetContext);
    }

    @Override
    protected void handleEvent(MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) {
        if (!this.eventFilter.test(event)) {
            return;
        }
        super.handleEvent(partition, offsetContext, event);
        if (this.isBoundedRead()) {
            String fileName;
            BinlogOffset currentBinlogOffset = RecordUtils.getBinlogPosition(offsetContext.getOffset());
            Object eventHeader = event.getHeader();
            EventType eventType = eventHeader.getEventType();
            if (eventType == EventType.HEARTBEAT && (fileName = ADBMySqlBinlogSplitReadTask.getFileNameFromEventData(event)).startsWith("special")) {
                if (fileName.compareToIgnoreCase(this.binlogSplit.getEndingOffset().getFilename()) == 0) {
                    LOG.info("Send binlog end event because reach the high watermark {}", (Object)currentBinlogOffset);
                    this.sendBinlogEndEvent(currentBinlogOffset);
                    ((SnapshotSplitReader.SnapshotBinlogSplitChangeEventSourceContextImpl)this.context).finished();
                }
                return;
            }
            if (currentBinlogOffset.isAtOrAfter(this.binlogSplit.getEndingOffset())) {
                LOG.info("Send binlog end event because reach the high watermark {}", (Object)currentBinlogOffset);
                this.sendBinlogEndEvent(currentBinlogOffset);
                ((SnapshotSplitReader.SnapshotBinlogSplitChangeEventSourceContextImpl)this.context).finished();
            }
        }
    }

    private static String getFileNameFromEventData(Event event) {
        Object eventData = event.getData();
        ByteArrayEventData byteArrayEventData = eventData instanceof EventDeserializer.EventDataWrapper ? (ByteArrayEventData)((EventDeserializer.EventDataWrapper)eventData).getInternal() : (ByteArrayEventData)eventData;
        return new String(byteArrayEventData.getData(), StandardCharsets.UTF_8);
    }

    private void sendBinlogEndEvent(BinlogOffset currentBinlogOffset) {
        try {
            this.signalEventDispatcher.dispatchWatermarkEvent(this.binlogSplit, currentBinlogOffset, SignalEventDispatcher.WatermarkKind.BINLOG_END);
        }
        catch (InterruptedException e) {
            LOG.error("Send signal event error.", (Throwable)e);
            this.errorHandler.setProducerThrowable(new DebeziumException("Error processing binlog signal event", e));
        }
    }

    private boolean isBoundedRead() {
        return !BinlogOffsetUtils.isNonStoppingOffset(this.binlogSplit.getEndingOffset());
    }
}

