package org.apache.paimon.flink.sink.cdc;

import java.util.List;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.types.DataField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.class */
public class CdcDynamicTableParsingProcessFunction<T> extends ProcessFunction<T, Void> {
    private static final Logger LOG = LoggerFactory.getLogger(CdcDynamicTableParsingProcessFunction.class);
    public static final OutputTag<CdcMultiplexRecord> DYNAMIC_OUTPUT_TAG = new OutputTag<>("paimon-dynamic-table", TypeInformation.of(CdcMultiplexRecord.class));
    public static final OutputTag<Tuple2<Identifier, List<DataField>>> DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG = new OutputTag<>("paimon-dynamic-table-schema-change", TypeInformation.of(new TypeHint<Tuple2<Identifier, List<DataField>>>() { // from class: org.apache.paimon.flink.sink.cdc.CdcDynamicTableParsingProcessFunction.1
    }));
    private final EventParser.Factory<T> parserFactory;
    private final String database;
    private final Catalog.Loader catalogLoader;
    private transient EventParser<T> parser;
    private transient Catalog catalog;

    public CdcDynamicTableParsingProcessFunction(String str, Catalog.Loader loader, EventParser.Factory<T> factory) {
        this.database = str;
        this.catalogLoader = loader;
        this.parserFactory = factory;
    }

    public void open(Configuration configuration) throws Exception {
        this.parser = this.parserFactory.create();
        this.catalog = this.catalogLoader.load();
    }

    public void processElement(T t, ProcessFunction<T, Void>.Context context, Collector<Void> collector) throws Exception {
        this.parser.setRawEvent(t);
        String parseTableName = this.parser.parseTableName();
        this.parser.parseNewTable().ifPresent(schema -> {
            Identifier identifier = new Identifier(this.database, parseTableName);
            try {
                this.catalog.createTable(identifier, schema, true);
            } catch (Exception e) {
                LOG.error("Cannot create newly added Paimon table {}", identifier.getFullName(), e);
            }
        });
        List<DataField> parseSchemaChange = this.parser.parseSchemaChange();
        if (!parseSchemaChange.isEmpty()) {
            context.output(DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG, Tuple2.of(Identifier.create(this.database, parseTableName), parseSchemaChange));
        }
        this.parser.parseRecords().forEach(cdcRecord -> {
            context.output(DYNAMIC_OUTPUT_TAG, wrapRecord(this.database, parseTableName, cdcRecord));
        });
    }

    private CdcMultiplexRecord wrapRecord(String str, String str2, CdcRecord cdcRecord) {
        return CdcMultiplexRecord.fromCdcRecord(str, str2, cdcRecord);
    }

    public void close() throws Exception {
        if (this.catalog != null) {
            this.catalog.close();
            this.catalog = null;
        }
    }
}
