/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.postgresql.connection;

import io.debezium.DebeziumException;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.connector.postgresql.connection.MessageDecoder;
import io.debezium.connector.postgresql.connection.MessageDecoderContext;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.connection.ReplicationMessage;
import io.debezium.connector.postgresql.connection.ReplicationStream;
import io.debezium.connector.postgresql.connection.ServerInfo;
import io.debezium.connector.postgresql.connection.WalPositionLocator;
import io.debezium.connector.postgresql.spi.SlotCreationResult;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcConnectionException;
import io.debezium.relational.RelationalTableFilters;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import java.nio.ByteBuffer;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.time.Duration;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.core.BaseConnection;
import org.postgresql.core.ServerVersion;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import org.postgresql.util.PSQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresReplicationConnection
extends JdbcConnection
implements ReplicationConnection {
    private static Logger LOGGER = LoggerFactory.getLogger(PostgresReplicationConnection.class);
    private final String slotName;
    private final String publicationName;
    private final RelationalTableFilters tableFilter;
    private final PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode;
    private final PostgresConnectorConfig.LogicalDecoder plugin;
    private final boolean dropSlotOnClose;
    private final PostgresConnectorConfig connectorConfig;
    private final Duration statusUpdateInterval;
    private final MessageDecoder messageDecoder;
    private final PostgresConnection jdbcConnection;
    private final TypeRegistry typeRegistry;
    private final Properties streamParams;
    private Lsn defaultStartingPos;
    private SlotCreationResult slotCreationInfo;
    private boolean hasInitedSlot;
    private Lsn endingPos;

    private PostgresReplicationConnection(PostgresConnectorConfig config, String slotName, String publicationName, RelationalTableFilters tableFilter, PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode, PostgresConnectorConfig.LogicalDecoder plugin, boolean dropSlotOnClose, boolean doSnapshot, Duration statusUpdateInterval, PostgresConnection jdbcConnection, TypeRegistry typeRegistry, Properties streamParams, PostgresSchema schema) {
        super(PostgresReplicationConnection.addDefaultSettings(config.getJdbcConfig()), PostgresConnection.FACTORY, null, null, "\"", "\"");
        this.connectorConfig = config;
        this.slotName = slotName;
        this.publicationName = publicationName;
        this.tableFilter = tableFilter;
        this.publicationAutocreateMode = publicationAutocreateMode;
        this.plugin = plugin;
        this.dropSlotOnClose = dropSlotOnClose;
        this.statusUpdateInterval = statusUpdateInterval;
        this.messageDecoder = plugin.messageDecoder(new MessageDecoderContext(config, schema), jdbcConnection);
        this.jdbcConnection = jdbcConnection;
        this.typeRegistry = typeRegistry;
        this.streamParams = streamParams;
        this.slotCreationInfo = null;
        this.hasInitedSlot = false;
    }

    private static JdbcConfiguration addDefaultSettings(JdbcConfiguration configuration) {
        return JdbcConfiguration.adapt(PostgresConnection.addDefaultSettings(configuration, "Debezium Streaming").edit().with("replication", "database").with("preferQueryMode", "simple").build());
    }

    private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException, InterruptedException {
        try (PostgresConnection connection = new PostgresConnection(this.connectorConfig.getJdbcConfig(), "Debezium Slot Info");){
            ServerInfo.ReplicationSlot replicationSlot = connection.readReplicationSlotInfo(this.slotName, this.plugin.getPostgresPluginName());
            return replicationSlot;
        }
    }

    /*
     * Unable to fully structure code
     */
    protected void initPublication() {
        tableFilterString = null;
        if (PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.equals(this.plugin)) {
            PostgresReplicationConnection.LOGGER.info("Initializing PgOutput logical decoder publication");
            try {
                pgconn = new PostgresConnection(this.connectorConfig.getJdbcConfig(), "Debezium General");
                try {
                    block29: {
                        conn = pgconn.connection(false);
                        conn.setAutoCommit(false);
                        selectPublication = String.format("SELECT COUNT(1) FROM pg_publication WHERE pubname = '%s'", new Object[]{this.publicationName});
                        stmt = conn.createStatement();
                        try {
                            rs = stmt.executeQuery(selectPublication);
                            try {
                                if (!rs.next()) break block29;
                                count = rs.getLong(1);
                                if (count == 0L) {
                                    PostgresReplicationConnection.LOGGER.info("Creating new publication '{}' for plugin '{}'", (Object)this.publicationName, (Object)this.plugin);
                                    switch (2.$SwitchMap$io$debezium$connector$postgresql$PostgresConnectorConfig$AutoCreateMode[this.publicationAutocreateMode.ordinal()]) {
                                        case 1: {
                                            throw new ConnectException("Publication autocreation is disabled, please create one and restart the connector.");
                                        }
                                        case 2: {
                                            createPublicationStmt = String.format("CREATE PUBLICATION %s FOR ALL TABLES;", new Object[]{this.publicationName});
                                            PostgresReplicationConnection.LOGGER.info("Creating Publication with statement '{}'", (Object)createPublicationStmt);
                                            stmt.execute(createPublicationStmt);
                                            break;
                                        }
                                        case 3: {
                                            this.createOrUpdatePublicationModeFilterted(tableFilterString, stmt, false);
                                        }
                                    }
                                    break block29;
                                }
                                switch (2.$SwitchMap$io$debezium$connector$postgresql$PostgresConnectorConfig$AutoCreateMode[this.publicationAutocreateMode.ordinal()]) {
                                    case 3: {
                                        this.createOrUpdatePublicationModeFilterted(tableFilterString, stmt, true);
                                        ** break;
lbl34:
                                        // 1 sources

                                        break;
                                    }
                                    default: {
                                        PostgresReplicationConnection.LOGGER.trace("A logical publication named '{}' for plugin '{}' and database '{}' is already active on the server and will be used by the plugin", new Object[]{this.publicationName, this.plugin, this.database()});
                                        break;
                                    }
                                }
                            }
                            finally {
                                if (rs != null) {
                                    rs.close();
                                }
                            }
                        }
                        finally {
                            if (stmt != null) {
                                stmt.close();
                            }
                        }
                    }
                    conn.commit();
                    conn.setAutoCommit(true);
                }
                finally {
                    pgconn.close();
                }
            }
            catch (SQLException e) {
                throw new JdbcConnectionException(e);
            }
        }
    }

    private void createOrUpdatePublicationModeFilterted(String tableFilterString, Statement stmt, boolean isUpdate) {
        try {
            Set<TableId> tablesToCapture = this.determineCapturedTables();
            tableFilterString = tablesToCapture.stream().map(TableId::toDoubleQuotedString).collect(Collectors.joining(", "));
            if (tableFilterString.isEmpty()) {
                throw new DebeziumException(String.format("No table filters found for filtered publication %s", this.publicationName));
            }
            for (TableId tableId : tablesToCapture) {
                this.alterReplicaIdentity(stmt, tableId);
            }
            String createOrUpdatePublicationStmt = isUpdate ? String.format("ALTER PUBLICATION %s SET TABLE %s;", this.publicationName, tableFilterString) : String.format("CREATE PUBLICATION %s FOR TABLE %s;", this.publicationName, tableFilterString);
            LOGGER.info(isUpdate ? "Updating Publication with statement '{}'" : "Creating Publication with statement '{}'", (Object)createOrUpdatePublicationStmt);
            stmt.execute(createOrUpdatePublicationStmt);
        }
        catch (Exception e) {
            throw new ConnectException(String.format("Unable to %s filtered publication %s for %s", isUpdate ? "update" : "create", this.publicationName, tableFilterString), e);
        }
    }

    private void alterReplicaIdentity(Statement stmt, TableId tableId) {
        try {
            String alterReplicaStmt = String.format("ALTER TABLE %s REPLICA IDENTITY FULL;", tableId.toDoubleQuotedString());
            LOGGER.info("Setting replica identity for table: {}", (Object)alterReplicaStmt);
            stmt.execute(alterReplicaStmt);
        }
        catch (Exception e) {
            LOGGER.warn("Failed to set replica identity for table {}: {}", (Object)tableId.toDoubleQuotedString(), (Object)e.getMessage());
        }
    }

    private Set<TableId> determineCapturedTables() throws Exception {
        Set<TableId> allTableIds = this.jdbcConnection.getAllTableIds(this.connectorConfig.databaseName());
        HashSet<TableId> capturedTables = new HashSet<TableId>();
        for (TableId tableId : allTableIds) {
            if (this.tableFilter.dataCollectionFilter().isIncluded(tableId)) {
                LOGGER.trace("Adding table {} to the list of captured tables", (Object)tableId);
                capturedTables.add(tableId);
                continue;
            }
            LOGGER.trace("Ignoring table {} as it's not included in the filter configuration", (Object)tableId);
        }
        return capturedTables.stream().sorted().collect(Collectors.toCollection(LinkedHashSet::new));
    }

    protected void initReplicationSlot() throws SQLException, InterruptedException {
        ServerInfo.ReplicationSlot slotInfo = this.getSlotInfo();
        boolean shouldCreateSlot = ServerInfo.ReplicationSlot.INVALID == slotInfo;
        try {
            if (shouldCreateSlot) {
                this.createReplicationSlot();
            }
            this.pgConnection();
            String identifySystemStatement = "IDENTIFY_SYSTEM";
            LOGGER.debug("running '{}' to validate replication connection", (Object)"IDENTIFY_SYSTEM");
            Lsn xlogStart = this.queryAndMap("IDENTIFY_SYSTEM", rs -> {
                if (!rs.next()) {
                    throw new IllegalStateException("The DB connection is not a valid replication connection");
                }
                String xlogpos = rs.getString("xlogpos");
                LOGGER.debug("received latest xlogpos '{}'", (Object)xlogpos);
                return Lsn.valueOf(xlogpos);
            });
            if (this.slotCreationInfo != null) {
                this.defaultStartingPos = this.slotCreationInfo.startLsn();
            } else if (shouldCreateSlot || !slotInfo.hasValidFlushedLsn()) {
                this.defaultStartingPos = Lsn.INVALID_LSN;
            } else {
                Lsn latestFlushedLsn;
                this.defaultStartingPos = latestFlushedLsn = slotInfo.latestFlushedLsn();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("found previous flushed LSN '{}'", (Object)latestFlushedLsn);
                }
            }
            this.hasInitedSlot = true;
        }
        catch (SQLException e) {
            throw new JdbcConnectionException(e);
        }
    }

    private boolean useTemporarySlot() throws SQLException {
        return false;
    }

    @Override
    public ReplicationStream startStreaming(WalPositionLocator walPosition) throws SQLException, InterruptedException {
        return this.startStreaming(null, walPosition);
    }

    @Override
    public ReplicationStream startStreaming(Lsn offset, WalPositionLocator walPosition) throws SQLException, InterruptedException {
        this.initConnection();
        this.connect();
        if (offset == null) {
            offset = this.defaultStartingPos;
        }
        Lsn lsn = offset;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("starting streaming from LSN '{}'", (Object)lsn);
        }
        int maxRetries = this.connectorConfig.maxRetries();
        Duration delay = this.connectorConfig.retryDelay();
        int tryCount = 0;
        while (true) {
            try {
                return this.createReplicationStream(lsn, walPosition);
            }
            catch (Exception e) {
                String message = "Failed to start replication stream at " + lsn;
                if (++tryCount > maxRetries) {
                    if (e.getMessage().matches(".*replication slot .* is active.*")) {
                        message = message + "; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each.";
                    }
                    throw new DebeziumException(message, e);
                }
                LOGGER.warn(message + ", waiting for {} ms and retrying, attempt number {} over {}", new Object[]{delay, tryCount, maxRetries});
                Metronome metronome = Metronome.sleeper(delay, Clock.SYSTEM);
                metronome.pause();
                continue;
            }
            break;
        }
    }

    @Override
    public void initConnection() throws SQLException, InterruptedException {
        this.initPublication();
        if (!this.hasInitedSlot) {
            this.initReplicationSlot();
        }
    }

    @Override
    public Optional<SlotCreationResult> createReplicationSlot() throws SQLException {
        LOGGER.debug("Creating new replication slot '{}' for plugin '{}'", (Object)this.slotName, (Object)this.plugin);
        String tempPart = "";
        boolean canExportSnapshot = this.pgConnection().haveMinimumServerVersion(ServerVersion.v9_4);
        if (this.dropSlotOnClose && !canExportSnapshot) {
            LOGGER.warn("A slot marked as temporary or with an exported snapshot was created, but not on a supported version of Postgres, ignoring!");
        }
        if (this.useTemporarySlot()) {
            tempPart = "TEMPORARY";
        }
        this.initPublication();
        try (Statement stmt = this.pgConnection().createStatement();){
            String createCommand = String.format("CREATE_REPLICATION_SLOT \"%s\" %s LOGICAL %s FAILOVER", this.slotName, tempPart, this.plugin.getPostgresPluginName());
            LOGGER.info("Creating replication slot with command {}", (Object)createCommand);
            stmt.execute(createCommand);
            if (canExportSnapshot) {
                this.slotCreationInfo = this.parseSlotCreation(stmt.getResultSet());
            }
            Optional<SlotCreationResult> optional = Optional.ofNullable(this.slotCreationInfo);
            return optional;
        }
    }

    protected BaseConnection pgConnection() throws SQLException {
        return (BaseConnection)this.connection(false);
    }

    private SlotCreationResult parseSlotCreation(ResultSet rs) {
        try {
            if (rs.next()) {
                String slotName = rs.getString("slot_name");
                String startPoint = Lsn.INVALID_LSN.asString();
                String snapName = rs.getString("snapshot_name");
                String pluginName = rs.getString("output_plugin");
                return new SlotCreationResult(slotName, startPoint, snapName, pluginName);
            }
            throw new ConnectException("No replication slot found");
        }
        catch (SQLException ex) {
            throw new ConnectException("Unable to parse create_replication_slot response", ex);
        }
    }

    private String getGlobalStartPoint() throws SQLException {
        Lsn globalLsn = Lsn.INVALID_LSN;
        AtomicLong startLsn = new AtomicLong(0L);
        StringBuilder initLsnSQL = new StringBuilder();
        initLsnSQL.append("set allow_system_table_mods=true;");
        try (PostgresConnection pgconn = new PostgresConnection(this.connectorConfig.getJdbcConfig(), "Debezium General");){
            BaseConnection conn = (BaseConnection)pgconn.connection(false);
            try (Statement stmt = conn.createStatement();){
                String selectCommand = String.format("select gp_segment_id, restart_lsn from gp_replication_slots where slot_name = '%s' order by gp_segment_id;", this.slotName);
                stmt.execute(selectCommand);
                ResultSet rs = stmt.getResultSet();
                while (rs.next()) {
                    globalLsn = Lsn.valueOf(startLsn.incrementAndGet());
                    int segId = rs.getInt(1);
                    Lsn localLsn = Lsn.valueOf(rs.getString(2));
                    initLsnSQL.append(String.format("insert into adbpg_replication_lsn_map values ('%s', '%s', %s, '%s');", this.slotName, globalLsn.asString(), segId, localLsn.asString()));
                }
                stmt.execute(String.valueOf(initLsnSQL));
            }
            conn.close();
        }
        return globalLsn.asString();
    }

    private SlotCreationResult getPostgresLogicalSlot(Statement stmt, String slotName, String pluginName) {
        String getSlotCommand = String.format("select slot_name from pg_replication_slots where slot_name= '%s'", slotName);
        try {
            stmt.execute(getSlotCommand);
            ResultSet rs = stmt.getResultSet();
            if (!rs.next()) {
                throw new ConnectException("No replication slot found");
            }
            rs = stmt.executeQuery("SELECT pg_export_snapshot()");
            if (!rs.next()) {
                throw new ConnectException("Can not export snapshot");
            }
            String snapName = rs.getString(1);
            String getStartPointCommand = String.format("SELECT global_lsn\nFROM adbpg_replication_lsn_map outer_map\nWHERE (\n    SELECT COUNT(DISTINCT segindex)\n    FROM adbpg_replication_lsn_map inner_map\n    WHERE inner_map.global_lsn <= outer_map.global_lsn\n) = (\n    SELECT COUNT(DISTINCT segindex)\n    FROM adbpg_replication_lsn_map\n) and slotname = '%s' \nORDER BY global_lsn\nLIMIT 1", slotName);
            stmt.execute(getStartPointCommand);
            rs = stmt.getResultSet();
            String startPoint = rs.next() ? rs.getString(1) : "0";
            return new SlotCreationResult(slotName, startPoint, snapName, pluginName);
        }
        catch (SQLException ex) {
            throw new ConnectException("Unable to create SlotCreationResult", ex);
        }
    }

    private ReplicationStream createReplicationStream(final Lsn startLsn, final WalPositionLocator walPosition) throws SQLException, InterruptedException {
        PGReplicationStream s;
        try {
            try {
                s = this.startPgReplicationStream(startLsn, this.plugin.forceRds() ? this.messageDecoder::optionsWithoutMetadata : this.messageDecoder::optionsWithMetadata);
                this.messageDecoder.setContainsMetadata(!this.plugin.forceRds());
            }
            catch (PSQLException e) {
                LOGGER.debug("Could not register for streaming, retrying without optional options", (Throwable)e);
                if (this.useTemporarySlot()) {
                    this.initReplicationSlot();
                }
                s = this.startPgReplicationStream(startLsn, this.plugin.forceRds() ? this.messageDecoder::optionsWithoutMetadata : this.messageDecoder::optionsWithMetadata);
                this.messageDecoder.setContainsMetadata(!this.plugin.forceRds());
            }
        }
        catch (PSQLException e) {
            if (e.getMessage().matches("(?s)ERROR: option .* is unknown.*")) {
                LOGGER.warn("Could not register for streaming with metadata in messages, falling back to messages without metadata");
                if (this.useTemporarySlot()) {
                    this.initReplicationSlot();
                }
                s = this.startPgReplicationStream(startLsn, this.messageDecoder::optionsWithoutMetadata);
                this.messageDecoder.setContainsMetadata(false);
            }
            if (e.getMessage().matches("(?s)ERROR: requested WAL segment .* has already been removed.*")) {
                LOGGER.error("Cannot rewind to last processed WAL position", (Throwable)e);
                throw new ConnectException("The offset to start reading from has been removed from the database write-ahead log. Create a new snapshot and consider setting of PostgreSQL parameter wal_keep_segments = 0.");
            }
            throw e;
        }
        final PGReplicationStream stream = s;
        return new ReplicationStream(){
            private static final int CHECK_WARNINGS_AFTER_COUNT = 100;
            private int warningCheckCounter = 100;
            private ExecutorService keepAliveExecutor = null;
            private AtomicBoolean keepAliveRunning;
            private final Metronome metronome;
            private volatile Lsn lastReceivedLsn;
            {
                this.metronome = Metronome.sleeper(PostgresReplicationConnection.this.statusUpdateInterval, Clock.SYSTEM);
            }

            @Override
            public void read(ReplicationStream.ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
                this.processWarnings(false);
                ByteBuffer read = stream.read();
                Lsn lastReceiveLsn = Lsn.valueOf(stream.getLastReceiveLSN());
                LOGGER.trace("Streaming requested from LSN {}, received LSN {}", (Object)startLsn, (Object)lastReceiveLsn);
                if (this.reachEnd(this.lastReceivedLsn)) {
                    this.lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
                    LOGGER.trace("Received message at LSN {}", (Object)this.lastReceivedLsn);
                    processor.process(new ReplicationMessage.NoopMessage(null, null));
                    return;
                }
                if (PostgresReplicationConnection.this.messageDecoder.shouldMessageBeSkipped(read, lastReceiveLsn, startLsn, walPosition)) {
                    return;
                }
                this.deserializeMessages(read, processor);
            }

            @Override
            public boolean readPending(ReplicationStream.ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
                this.processWarnings(false);
                ByteBuffer read = stream.readPending();
                Lsn lastReceiveLsn = Lsn.valueOf(stream.getLastReceiveLSN());
                LOGGER.trace("Streaming requested from LSN {}, received LSN {}", (Object)startLsn, (Object)lastReceiveLsn);
                if (this.reachEnd(lastReceiveLsn)) {
                    this.lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
                    LOGGER.trace("Received message at LSN {}", (Object)this.lastReceivedLsn);
                    processor.process(new ReplicationMessage.NoopMessage(null, null));
                    return true;
                }
                if (read == null) {
                    return false;
                }
                if (PostgresReplicationConnection.this.messageDecoder.shouldMessageBeSkipped(read, lastReceiveLsn, startLsn, walPosition)) {
                    return true;
                }
                this.deserializeMessages(read, processor);
                return true;
            }

            private void deserializeMessages(ByteBuffer buffer, ReplicationStream.ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
                this.lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
                LOGGER.trace("Received message at LSN {}", (Object)this.lastReceivedLsn);
                PostgresReplicationConnection.this.messageDecoder.processMessage(buffer, processor, PostgresReplicationConnection.this.typeRegistry);
            }

            @Override
            public void close() throws SQLException {
                this.processWarnings(true);
                stream.close();
            }

            @Override
            public void flushLsn(Lsn lsn) throws SQLException {
                this.doFlushLsn(lsn);
            }

            private void doFlushLsn(Lsn lsn) throws SQLException {
                stream.setFlushedLSN(lsn.asLogSequenceNumber());
                stream.setAppliedLSN(lsn.asLogSequenceNumber());
                stream.forceUpdateStatus();
            }

            @Override
            public Lsn lastReceivedLsn() {
                return this.lastReceivedLsn;
            }

            @Override
            public void startKeepAlive(ExecutorService service) {
                if (this.keepAliveExecutor == null) {
                    this.keepAliveExecutor = service;
                    this.keepAliveRunning = new AtomicBoolean(true);
                    this.keepAliveExecutor.submit(() -> {
                        while (this.keepAliveRunning.get()) {
                            try {
                                LOGGER.trace("Forcing status update with replication stream");
                                stream.forceUpdateStatus();
                                this.metronome.pause();
                            }
                            catch (Exception exp) {
                                throw new RuntimeException("received unexpected exception will perform keep alive", exp);
                            }
                        }
                    });
                }
            }

            @Override
            public void stopKeepAlive() {
                if (this.keepAliveExecutor != null) {
                    this.keepAliveRunning.set(false);
                    this.keepAliveExecutor.shutdownNow();
                    this.keepAliveExecutor = null;
                }
            }

            private void processWarnings(boolean forced) throws SQLException {
                if (--this.warningCheckCounter == 0 || forced) {
                    this.warningCheckCounter = 100;
                    for (SQLWarning w = PostgresReplicationConnection.this.connection().getWarnings(); w != null; w = w.getNextWarning()) {
                        LOGGER.debug("Server-side message: '{}', state = {}, code = {}", new Object[]{w.getMessage(), w.getSQLState(), w.getErrorCode()});
                    }
                    PostgresReplicationConnection.this.connection().clearWarnings();
                }
            }

            @Override
            public Lsn startLsn() {
                return startLsn;
            }

            private boolean reachEnd(Lsn receivedLsn) {
                if (receivedLsn == null) {
                    return false;
                }
                return PostgresReplicationConnection.this.endingPos != null && !PostgresReplicationConnection.this.endingPos.isNonStopping() && PostgresReplicationConnection.this.endingPos.compareTo(receivedLsn) < 0;
            }
        };
    }

    public void setEndingPos(Lsn endingPos) {
        this.endingPos = endingPos;
    }

    private PGReplicationStream startPgReplicationStream(Lsn lsn, BiFunction<ChainedLogicalStreamBuilder, Function<Integer, Boolean>, ChainedLogicalStreamBuilder> configurator) throws SQLException {
        assert (lsn != null);
        ChainedLogicalStreamBuilder streamBuilder = ((ChainedLogicalStreamBuilder)((ChainedLogicalStreamBuilder)this.pgConnection().getReplicationAPI().replicationStream().logical().withSlotName("\"" + this.slotName + "\"")).withStartPosition(lsn.asLogSequenceNumber())).withSlotOptions(this.streamParams);
        streamBuilder = configurator.apply(streamBuilder, this::hasMinimumVersion);
        if (this.statusUpdateInterval != null && this.statusUpdateInterval.toMillis() > 0L) {
            streamBuilder.withStatusInterval(Math.toIntExact(this.statusUpdateInterval.toMillis()), TimeUnit.MILLISECONDS);
        }
        PGReplicationStream stream = streamBuilder.start();
        try {
            Thread.sleep(10L);
        }
        catch (Exception exception) {
            // empty catch block
        }
        stream.forceUpdateStatus();
        return stream;
    }

    private Boolean hasMinimumVersion(int version) {
        try {
            return this.pgConnection().haveMinimumServerVersion(version);
        }
        catch (SQLException e) {
            throw new DebeziumException(e);
        }
    }

    @Override
    public synchronized void close() {
        this.close(true);
    }

    public synchronized void close(boolean dropSlot) {
        try {
            LOGGER.debug("Closing message decoder");
            this.messageDecoder.close();
        }
        catch (Throwable e) {
            LOGGER.error("Unexpected error while closing message decoder", e);
        }
        try {
            LOGGER.debug("Closing replication connection");
            super.close();
        }
        catch (Throwable e) {
            LOGGER.error("Unexpected error while closing Postgres connection", e);
        }
        if (this.dropSlotOnClose && dropSlot) {
            try (PostgresConnection connection = new PostgresConnection(this.connectorConfig.getJdbcConfig(), "Debezium Drop Slot");){
                connection.dropReplicationSlot(this.slotName);
            }
            catch (Throwable e) {
                LOGGER.error("Unexpected error while dropping replication slot", e);
            }
        }
    }

    @Override
    public void reconnect() throws SQLException {
        this.close(false);
        this.connection(false);
    }

    protected static class ReplicationConnectionBuilder
    implements ReplicationConnection.Builder {
        private final PostgresConnectorConfig config;
        private String slotName = "debezium";
        private String publicationName = "dbz_publication";
        private RelationalTableFilters tableFilter;
        private PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode = PostgresConnectorConfig.AutoCreateMode.ALL_TABLES;
        private PostgresConnectorConfig.LogicalDecoder plugin = PostgresConnectorConfig.LogicalDecoder.DECODERBUFS;
        private boolean dropSlotOnClose = true;
        private Duration statusUpdateIntervalVal;
        private boolean doSnapshot;
        private TypeRegistry typeRegistry;
        private PostgresSchema schema;
        private Properties slotStreamParams = new Properties();
        private PostgresConnection jdbcConnection;

        protected ReplicationConnectionBuilder(PostgresConnectorConfig config) {
            assert (config != null);
            this.config = config;
        }

        @Override
        public ReplicationConnectionBuilder withSlot(String slotName) {
            assert (slotName != null);
            this.slotName = slotName;
            return this;
        }

        @Override
        public ReplicationConnection.Builder withPublication(String publicationName) {
            assert (publicationName != null);
            this.publicationName = publicationName;
            return this;
        }

        @Override
        public ReplicationConnection.Builder withTableFilter(RelationalTableFilters tableFilter) {
            assert (tableFilter != null);
            this.tableFilter = tableFilter;
            return this;
        }

        @Override
        public ReplicationConnection.Builder withPublicationAutocreateMode(PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode) {
            assert (this.publicationName != null);
            this.publicationAutocreateMode = publicationAutocreateMode;
            return this;
        }

        @Override
        public ReplicationConnectionBuilder withPlugin(PostgresConnectorConfig.LogicalDecoder plugin) {
            assert (plugin != null);
            this.plugin = plugin;
            return this;
        }

        @Override
        public ReplicationConnectionBuilder dropSlotOnClose(boolean dropSlotOnClose) {
            this.dropSlotOnClose = dropSlotOnClose;
            return this;
        }

        @Override
        public ReplicationConnectionBuilder streamParams(String slotStreamParams) {
            if (slotStreamParams != null && !slotStreamParams.isEmpty()) {
                String[] paramsWithValues;
                this.slotStreamParams = new Properties();
                for (String paramsWithValue : paramsWithValues = slotStreamParams.split(";")) {
                    String[] paramAndValue = paramsWithValue.split("=");
                    if (paramAndValue.length == 2) {
                        this.slotStreamParams.setProperty(paramAndValue[0], paramAndValue[1]);
                        continue;
                    }
                    LOGGER.warn("The following STREAM_PARAMS value is invalid: {}", (Object)paramsWithValue);
                }
            }
            return this;
        }

        @Override
        public ReplicationConnectionBuilder statusUpdateInterval(Duration statusUpdateInterval) {
            this.statusUpdateIntervalVal = statusUpdateInterval;
            return this;
        }

        @Override
        public ReplicationConnection.Builder doSnapshot(boolean doSnapshot) {
            this.doSnapshot = doSnapshot;
            return this;
        }

        @Override
        public ReplicationConnection.Builder jdbcMetadataConnection(PostgresConnection jdbcConnection) {
            this.jdbcConnection = jdbcConnection;
            return this;
        }

        @Override
        public ReplicationConnection build() {
            assert (this.plugin != null) : "Decoding plugin name is not set";
            return new PostgresReplicationConnection(this.config, this.slotName, this.publicationName, this.tableFilter, this.publicationAutocreateMode, this.plugin, this.dropSlotOnClose, this.doSnapshot, this.statusUpdateIntervalVal, this.jdbcConnection, this.typeRegistry, this.slotStreamParams, this.schema);
        }

        @Override
        public ReplicationConnection.Builder withTypeRegistry(TypeRegistry typeRegistry) {
            this.typeRegistry = typeRegistry;
            return this;
        }

        @Override
        public ReplicationConnection.Builder withSchema(PostgresSchema schema) {
            this.schema = schema;
            return this;
        }
    }
}

