/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.postgresql;

import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.PostgresPartition;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.spi.SlotCreationResult;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.util.Clock;
import java.sql.SQLException;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresSnapshotChangeEventSource
extends RelationalSnapshotChangeEventSource<PostgresPartition, PostgresOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSnapshotChangeEventSource.class);
    private final PostgresConnectorConfig connectorConfig;
    private final PostgresConnection jdbcConnection;
    private final PostgresSchema schema;
    private final Snapshotter snapshotter;
    private final SlotCreationResult slotCreatedInfo;
    private final SlotState startingSlotInfo;

    public PostgresSnapshotChangeEventSource(PostgresConnectorConfig connectorConfig, Snapshotter snapshotter, PostgresConnection jdbcConnection, PostgresSchema schema, EventDispatcher<PostgresPartition, TableId> dispatcher, Clock clock, SnapshotProgressListener<PostgresPartition> snapshotProgressListener, SlotCreationResult slotCreatedInfo, SlotState startingSlotInfo) {
        super(connectorConfig, jdbcConnection, schema, dispatcher, clock, snapshotProgressListener);
        this.connectorConfig = connectorConfig;
        this.jdbcConnection = jdbcConnection;
        this.schema = schema;
        this.snapshotter = snapshotter;
        this.slotCreatedInfo = slotCreatedInfo;
        this.startingSlotInfo = startingSlotInfo;
    }

    @Override
    protected AbstractSnapshotChangeEventSource.SnapshottingTask getSnapshottingTask(PostgresPartition partition, PostgresOffsetContext previousOffset) {
        boolean snapshotSchema = true;
        boolean snapshotData = true;
        snapshotData = this.snapshotter.shouldSnapshot();
        if (snapshotData) {
            LOGGER.info("According to the connector configuration data will be snapshotted");
        } else {
            LOGGER.info("According to the connector configuration no snapshot will be executed");
            snapshotSchema = false;
        }
        return new AbstractSnapshotChangeEventSource.SnapshottingTask(snapshotSchema, snapshotData);
    }

    @Override
    protected AbstractSnapshotChangeEventSource.SnapshotContext<PostgresPartition, PostgresOffsetContext> prepare(PostgresPartition partition) throws Exception {
        return new PostgresSnapshotContext(partition, this.connectorConfig.databaseName());
    }

    @Override
    protected void connectionCreated(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext) throws Exception {
        if (this.snapshotter.shouldStreamEventsStartingFromSnapshot()) {
            this.setSnapshotTransactionIsolationLevel();
        }
        this.schema.refresh(this.jdbcConnection, false);
    }

    @Override
    protected Set<TableId> getAllTableIds(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> ctx) throws Exception {
        return this.jdbcConnection.getAllTableIds(ctx.catalogName);
    }

    @Override
    protected void lockTablesForSchemaSnapshot(ChangeEventSource.ChangeEventSourceContext sourceContext, RelationalSnapshotChangeEventSource.RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext) throws SQLException, InterruptedException {
        Duration lockTimeout = this.connectorConfig.snapshotLockTimeout();
        Optional<String> lockStatement = this.snapshotter.snapshotTableLockingStatement(lockTimeout, snapshotContext.capturedTables);
        if (lockStatement.isPresent()) {
            LOGGER.info("Waiting a maximum of '{}' seconds for each table lock", (Object)lockTimeout.getSeconds());
            this.jdbcConnection.executeWithoutCommitting(lockStatement.get());
            this.schema.refresh(this.jdbcConnection, false);
        }
    }

    @Override
    protected void releaseSchemaSnapshotLocks(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext) throws SQLException {
    }

    @Override
    protected void determineSnapshotOffset(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> ctx, PostgresOffsetContext previousOffset) throws Exception {
        PostgresOffsetContext offset = (PostgresOffsetContext)ctx.offset;
        if (offset == null) {
            if (this.startingSlotInfo != null && this.snapshotter.shouldSnapshot()) {
                this.jdbcConnection.init_current_wal_lsn(this.connectorConfig.slotName());
            }
            offset = previousOffset != null && !this.snapshotter.shouldStreamEventsStartingFromSnapshot() ? PostgresOffsetContext.initialContext(this.connectorConfig, this.jdbcConnection, this.getClock(), previousOffset.lastCommitLsn(), previousOffset.lastCompletelyProcessedLsn()) : PostgresOffsetContext.initialContext(this.connectorConfig, this.jdbcConnection, this.getClock());
            ctx.offset = offset;
        }
        this.updateOffsetForSnapshot(offset);
    }

    private void updateOffsetForSnapshot(PostgresOffsetContext offset) throws SQLException {
        Lsn xlogStart = this.getTransactionStartLsn();
        Long txId = this.jdbcConnection.currentTransactionId();
        LOGGER.info("Read xlogStart at '{}' from transaction '{}'", (Object)xlogStart, (Object)txId);
        offset.updateWalPosition(xlogStart, offset.lastCompletelyProcessedLsn(), this.clock.currentTime(), txId, offset.xmin(), null);
    }

    protected void updateOffsetForPreSnapshotCatchUpStreaming(PostgresOffsetContext offset) throws SQLException {
        this.updateOffsetForSnapshot(offset);
        offset.setStreamingStoppingLsn(Lsn.valueOf(this.jdbcConnection.currentGlobalLocation()));
    }

    private Lsn getTransactionStartLsn() throws SQLException {
        if (this.slotCreatedInfo != null) {
            return this.slotCreatedInfo.startLsn();
        }
        if (!this.snapshotter.shouldStreamEventsStartingFromSnapshot() && this.startingSlotInfo != null) {
            SlotState currentSlotState = this.jdbcConnection.getReplicationSlotState(this.connectorConfig.slotName(), this.connectorConfig.plugin().getPostgresPluginName());
            return currentSlotState.slotLastFlushedLsn();
        }
        return Lsn.valueOf(this.jdbcConnection.currentGlobalLocation());
    }

    @Override
    protected void readTableStructure(ChangeEventSource.ChangeEventSourceContext sourceContext, RelationalSnapshotChangeEventSource.RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext, PostgresOffsetContext offsetContext) throws SQLException, InterruptedException {
        Set schemas = snapshotContext.capturedTables.stream().map(TableId::schema).collect(Collectors.toSet());
        for (String schema : schemas) {
            if (!sourceContext.isRunning()) {
                throw new InterruptedException("Interrupted while reading structure of schema " + schema);
            }
            LOGGER.info("Reading structure of schema '{}' of catalog '{}'", (Object)schema, (Object)snapshotContext.catalogName);
            this.jdbcConnection.readSchema(snapshotContext.tables, snapshotContext.catalogName, schema, this.connectorConfig.getTableFilters().dataCollectionFilter(), null, false);
        }
        this.schema.refresh(this.jdbcConnection, false);
    }

    @Override
    protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext, Table table) throws SQLException {
        return SchemaChangeEvent.ofSnapshotCreate(snapshotContext.partition, snapshotContext.offset, snapshotContext.catalogName, table);
    }

    @Override
    protected void complete(AbstractSnapshotChangeEventSource.SnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext) {
        this.snapshotter.snapshotCompleted();
    }

    @Override
    protected Optional<String> getSnapshotSelect(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext, TableId tableId, List<String> columns) {
        return this.snapshotter.buildSnapshotQuery(tableId, columns);
    }

    protected void setSnapshotTransactionIsolationLevel() throws SQLException {
        LOGGER.info("Setting isolation level");
        String transactionStatement = this.snapshotter.snapshotTransactionIsolationLevelStatement(this.slotCreatedInfo);
        LOGGER.info("Opening transaction with statement {}", (Object)transactionStatement);
        this.jdbcConnection.executeWithoutCommitting(transactionStatement);
    }

    private static class PostgresSnapshotContext
    extends RelationalSnapshotChangeEventSource.RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> {
        public PostgresSnapshotContext(PostgresPartition partition, String catalogName) throws SQLException {
            super(partition, catalogName);
        }
    }
}

