package org.apache.paimon.flink;

import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
import javax.annotation.Nullable;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.AliCatalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.dlf.DlfUtils;
import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils;
import org.apache.paimon.flink.sink.VvrTableSink;
import org.apache.paimon.flink.source.DataTableSource;
import org.apache.paimon.flink.source.SystemTableSource;
import org.apache.paimon.flink.source.VvrDataTableSource;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.SnapshotManager;

/* loaded from: input_file:org/apache/paimon/flink/VvrTableFactory.class */
public class VvrTableFactory extends AbstractFlinkTableFactory {
    private static final String LOOKUP_CUSTOM_SHUFFLE_INTERFACE = "org.apache.flink.table.connector.source.abilities.SupportsLookupCustomShuffle";
    private final Options catalogOptions;

    public VvrTableFactory() {
        this(new Options(), null);
    }

    public VvrTableFactory(Options options, @Nullable VvrCatalog vvrCatalog) {
        super(vvrCatalog);
        this.catalogOptions = options;
    }

    public String factoryIdentifier() {
        return "paimon";
    }

    @Override // org.apache.paimon.flink.AbstractFlinkTableFactory
    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        if (isFlinkTable(context.getCatalogTable())) {
            return FactoryUtil.createTableSource((Catalog) null, context.getObjectIdentifier(), context.getCatalogTable(), context.getConfiguration(), context.getClassLoader(), context.isTemporary());
        }
        createTableIfNeeded(context);
        DlfUtils.maybeCollectDlfPaimonTableInfos(context, this.catalogOptions);
        DynamicTableFactory.Context enrichStartupOptionsForSource = enrichStartupOptionsForSource(context);
        CatalogTable origin = enrichStartupOptionsForSource.getCatalogTable().getOrigin();
        boolean z = enrichStartupOptionsForSource.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
        if (origin instanceof SystemCatalogTable) {
            return new SystemTableSource(((SystemCatalogTable) origin).table(), z, enrichStartupOptionsForSource.getObjectIdentifier());
        }
        FileStoreTable fileStoreTable = (FileStoreTable) buildPaimonTable(enrichStartupOptionsForSource);
        return isClassExists() ? new VvrDataTableSource(enrichStartupOptionsForSource.getObjectIdentifier(), fileStoreTable, z, enrichStartupOptionsForSource, createOptionalLogStoreFactory(enrichStartupOptionsForSource).orElse(null)) : new DataTableSource(enrichStartupOptionsForSource.getObjectIdentifier(), fileStoreTable, z, enrichStartupOptionsForSource, createOptionalLogStoreFactory(enrichStartupOptionsForSource).orElse(null));
    }

    @Override // org.apache.paimon.flink.AbstractFlinkTableFactory
    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        if (isFlinkTable(context.getCatalogTable())) {
            return FactoryUtil.createTableSink((Catalog) null, context.getObjectIdentifier(), context.getCatalogTable(), context.getConfiguration(), context.getClassLoader(), context.isTemporary());
        }
        createTableIfNeeded(context);
        DlfUtils.maybeCollectDlfPaimonTableInfos(context, this.catalogOptions);
        return new VvrTableSink(context.getObjectIdentifier(), buildVvrPaimonTable(context), context, this.catalogOptions, createOptionalLogStoreFactory(context).orElse(null));
    }

    public static boolean isFlinkTable(CatalogTable catalogTable) {
        String str = (String) catalogTable.getOptions().get(FactoryUtil.CONNECTOR.key());
        return (str == null || "paimon".equals(str)) ? false : true;
    }

    @VisibleForTesting
    DynamicTableFactory.Context enrichStartupOptionsForSource(DynamicTableFactory.Context context) {
        long j;
        ResolvedCatalogTable catalogTable = ContextUtil.normalizeContext(this, ContextUtil.mergeTableOptionsFromTableConfig(context)).getCatalogTable();
        if (catalogTable.getOrigin() instanceof SystemCatalogTable) {
            return context;
        }
        Map<String, String> options = catalogTable.getOptions();
        Configuration normalizeOptions = normalizeOptions(options);
        long longValue = ((Long) normalizeOptions.get(ContextUtil.START_TIME_MILLS)).longValue();
        String str = (String) normalizeOptions.get(ContextUtil.OPTIONAL_START_TIME);
        if (longValue < 0 && str == null) {
            return context;
        }
        if (longValue < 0) {
            String str2 = (String) normalizeOptions.get(ContextUtil.TIME_ZONE);
            j = DateTimeUtils.parseTimestampData(str, 0, str2 == null ? TimeZone.getDefault() : TimeZone.getTimeZone(str2)).getMillisecond();
        } else {
            j = longValue;
        }
        HashMap hashMap = new HashMap(options);
        SnapshotManager snapshotManager = ((FileStoreTable) buildPaimonTable(context)).snapshotManager();
        Long earlierThanTimeMills = snapshotManager.earlierThanTimeMills(j, true);
        if (earlierThanTimeMills == null || !(snapshotManager.snapshotExists(earlierThanTimeMills.longValue()) || snapshotManager.longLivedChangelogExists(earlierThanTimeMills.longValue()))) {
            hashMap.put(CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS.key(), String.valueOf(j));
        } else {
            hashMap.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), String.valueOf(earlierThanTimeMills.longValue() + 1));
        }
        CoreOptions fromMap = CoreOptions.fromMap(options);
        if (fromMap.consumerId() != null && !fromMap.consumerIgnoreProgress()) {
            hashMap.put(CoreOptions.CONSUMER_IGNORE_PROGRESS.key(), "true");
        }
        return new FactoryUtil.DefaultDynamicTableContext(context.getObjectIdentifier(), catalogTable.copy(hashMap), context.getEnrichmentOptions(), context.getConfiguration(), context.getClassLoader(), context.isTemporary());
    }

    private Configuration normalizeOptions(Map<String, String> map) {
        Configuration configuration = new Configuration();
        map.forEach((str, str2) -> {
            configuration.setString(str.toLowerCase(), str2);
        });
        return configuration;
    }

    private void createTableIfNeeded(DynamicTableFactory.Context context) {
        ResolvedCatalogTable catalogTable = context.getCatalogTable();
        if (((Boolean) Options.fromMap(catalogTable.getOptions()).get(CoreOptions.AUTO_CREATE)).booleanValue()) {
            try {
                Path path = CoreOptions.path((Map<String, String>) catalogTable.getOptions());
                FileIO fileIO = FileIO.get(path, createCatalogContext(context));
                SnapshotManager snapshotManager = new SnapshotManager(fileIO, path);
                SchemaManager schemaManager = new SchemaManager(fileIO, path);
                if (snapshotManager.latestSnapshotId() == null) {
                    if (schemaManager.latest().isPresent()) {
                        fileIO.deleteDirectoryQuietly(new Path(path, DebeziumSchemaUtils.FIELD_SCHEMA));
                    }
                    schemaManager.createTable(FlinkCatalog.fromCatalogTable(catalogTable));
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private Table buildVvrPaimonTable(DynamicTableFactory.Context context) {
        if (!"true".equalsIgnoreCase((String) context.getCatalogTable().getOrigin().getOptions().get(VvrCatalog.IS_SCHEMA_EVOLUTION))) {
            return buildPaimonTable(context);
        }
        CatalogContext createCatalogContext = createCatalogContext(context);
        this.catalogOptions.toMap().forEach((str, str2) -> {
            createCatalogContext.options().set(str, str2);
        });
        ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
        Identifier create = Identifier.create(objectIdentifier.getDatabaseName(), objectIdentifier.getObjectName());
        Schema fromCatalogTable = FlinkCatalog.fromCatalogTable(context.getCatalogTable());
        try {
            org.apache.paimon.catalog.Catalog createCatalog = CatalogFactory.createCatalog(createCatalogContext, context.getClassLoader());
            Throwable th = null;
            try {
                if (DlfUtils.DLF_PAIMON_IDENTIFIER.equals(this.catalogOptions.get(CatalogOptions.METASTORE))) {
                    createCatalog.createDatabase(create.getDatabaseName(), true);
                }
                Path tableLocation = createCatalog.getTableLocation(create);
                FileIO fileIO = createCatalog.fileIO();
                if (createCatalog.tableExists(create)) {
                    FileStoreTable fileStoreTable = (FileStoreTable) createCatalog.getTable(create);
                    FileStoreTable copy = fileStoreTable.copy(toTableSchema(fromCatalogTable, fileStoreTable.schema()));
                    if (createCatalog != null) {
                        if (0 != 0) {
                            try {
                                createCatalog.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createCatalog.close();
                        }
                    }
                    return copy;
                }
                TableSchema tableSchema = toTableSchema(fromCatalogTable);
                MetastoreClient.Factory factory = null;
                if (createCatalog instanceof AliCatalog) {
                    factory = ((AliCatalog) createCatalog).metastoreClientFactory(create, tableSchema).orElse(null);
                }
                FileStoreTable create2 = FileStoreTableFactory.create(fileIO, tableLocation, tableSchema, new CatalogEnvironment(create, Lock.factory(createCatalog.lockFactory().orElse(null), createCatalog.lockContext().orElse(null), create), factory, null));
                if (createCatalog != null) {
                    if (0 != 0) {
                        try {
                            createCatalog.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        createCatalog.close();
                    }
                }
                return create2;
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        throw new RuntimeException(e);
    }

    private TableSchema toTableSchema(Schema schema) {
        return new TableSchema(0L, schema.fields(), schema.fields().stream().mapToInt((v0) -> {
            return v0.id();
        }).max().getAsInt(), schema.partitionKeys(), schema.primaryKeys(), schema.options(), schema.comment());
    }

    private TableSchema toTableSchema(Schema schema, TableSchema tableSchema) {
        HashMap hashMap = new HashMap(tableSchema.options());
        hashMap.putAll(schema.options());
        return new TableSchema(tableSchema.id(), schema.fields(), schema.fields().stream().mapToInt((v0) -> {
            return v0.id();
        }).max().getAsInt(), schema.partitionKeys(), schema.primaryKeys(), hashMap, tableSchema.comment());
    }

    private boolean isClassExists() {
        try {
            Class.forName(LOOKUP_CUSTOM_SHUFFLE_INTERFACE);
            return true;
        } catch (ClassNotFoundException e) {
            return false;
        }
    }
}
