package org.apache.paimon.shaded.dlf.org.apache.hadoop.hive.ql.exec.spark;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.paimon.shaded.dlf.com.google.common.base.Splitter;
import org.apache.paimon.shaded.dlf.com.google.common.base.Strings;
import org.apache.paimon.shaded.dlf.org.apache.hadoop.hive.conf.HiveConf;
import org.apache.paimon.shaded.dlf.org.apache.hadoop.hive.conf.HiveConfUtil;
import org.apache.paimon.shaded.dlf.org.apache.hadoop.hive.ql.Context;
import org.apache.paimon.shaded.dlf.org.apache.hadoop.hive.ql.DriverContext;
import org.apache.paimon.shaded.dlf.org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.paimon.shaded.dlf.org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
import org.apache.paimon.shaded.dlf.org.apache.hadoop.hive.ql.exec.spark.status.impl.JobMetricsListener;
import org.apache.paimon.shaded.dlf.org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobRef;
import org.apache.paimon.shaded.dlf.org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobStatus;
import org.apache.paimon.shaded.dlf.org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.paimon.shaded.dlf.org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.paimon.shaded.dlf.org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.paimon.shaded.dlf.org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.paimon.shaded.dlf.org.apache.hive.spark.counter.SparkCounters;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/shaded/dlf/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.class */
public class LocalHiveSparkClient implements HiveSparkClient {
    private static final long serialVersionUID = 1;
    private static final String MR_JAR_PROPERTY = "tmpjars";
    protected static final transient Logger LOG = LoggerFactory.getLogger(LocalHiveSparkClient.class);
    private static final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
    private static LocalHiveSparkClient client;
    private final JavaSparkContext sc;
    private final List<String> localJars = new ArrayList();
    private final List<String> localFiles = new ArrayList();
    private final JobMetricsListener jobMetricsListener = new JobMetricsListener();

    public static synchronized LocalHiveSparkClient getInstance(SparkConf sparkConf) {
        if (client == null) {
            client = new LocalHiveSparkClient(sparkConf);
        }
        return client;
    }

    private LocalHiveSparkClient(SparkConf sparkConf) {
        this.sc = new JavaSparkContext(sparkConf);
        this.sc.sc().listenerBus().addListener(this.jobMetricsListener);
    }

    @Override // org.apache.paimon.shaded.dlf.org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient
    public SparkConf getSparkConf() {
        return this.sc.sc().conf();
    }

    @Override // org.apache.paimon.shaded.dlf.org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient
    public int getExecutorCount() {
        return this.sc.sc().getExecutorMemoryStatus().size();
    }

    @Override // org.apache.paimon.shaded.dlf.org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient
    public int getDefaultParallelism() throws Exception {
        return this.sc.sc().defaultParallelism();
    }

    @Override // org.apache.paimon.shaded.dlf.org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient
    public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception {
        Context ctx = driverContext.getCtx();
        HiveConf hiveConf = (HiveConf) ctx.getConf();
        refreshLocalResources(sparkWork, hiveConf);
        JobConf jobConf = new JobConf(hiveConf);
        Path mRTmpPath = ctx.getMRTmpPath();
        mRTmpPath.getFileSystem(jobConf).mkdirs(mRTmpPath);
        HiveConfUtil.updateJobCredentialProviders(jobConf);
        SparkCounters sparkCounters = new SparkCounters(this.sc);
        Map<String, List<String>> requiredCounterPrefix = sparkWork.getRequiredCounterPrefix();
        if (requiredCounterPrefix != null) {
            for (String str : requiredCounterPrefix.keySet()) {
                Iterator<String> it = requiredCounterPrefix.get(str).iterator();
                while (it.hasNext()) {
                    sparkCounters.createCounter(str, it.next());
                }
            }
        }
        SparkPlan generate = new SparkPlanGenerator(this.sc, ctx, jobConf, mRTmpPath, new SparkReporter(sparkCounters)).generate(sparkWork);
        if (driverContext.isShutdown()) {
            throw new HiveException("Operation is cancelled.");
        }
        JavaFutureAction foreachAsync = generate.generateGraph().foreachAsync(HiveVoidFunction.getInstance());
        int intValue = ((Integer) foreachAsync.jobIds().get(0)).intValue();
        return new LocalSparkJobRef(Integer.toString(intValue), hiveConf, new LocalSparkJobStatus(this.sc, intValue, this.jobMetricsListener, sparkCounters, generate.getCachedRDDIds(), foreachAsync), this.sc);
    }

    private synchronized void refreshLocalResources(SparkWork sparkWork, HiveConf hiveConf) {
        addJars(new JobConf(getClass()).getJar());
        addJars(hiveConf.getAuxJars());
        addJars(SessionState.get() == null ? null : SessionState.get().getReloadableAuxJars());
        String resourceFiles = Utilities.getResourceFiles(hiveConf, SessionState.ResourceType.JAR);
        HiveConf.setVar(hiveConf, HiveConf.ConfVars.HIVEADDEDJARS, resourceFiles);
        addJars(resourceFiles);
        JobConf jobConf = new JobConf(hiveConf);
        jobConf.set(MR_JAR_PROPERTY, "");
        Iterator<BaseWork> it = sparkWork.getAllWork().iterator();
        while (it.hasNext()) {
            it.next().configureJobConf(jobConf);
        }
        addJars(hiveConf.get(MR_JAR_PROPERTY));
        String resourceFiles2 = Utilities.getResourceFiles(hiveConf, SessionState.ResourceType.FILE);
        HiveConf.setVar(hiveConf, HiveConf.ConfVars.HIVEADDEDFILES, resourceFiles2);
        addResources(resourceFiles2);
        String resourceFiles3 = Utilities.getResourceFiles(hiveConf, SessionState.ResourceType.ARCHIVE);
        HiveConf.setVar(hiveConf, HiveConf.ConfVars.HIVEADDEDARCHIVES, resourceFiles3);
        addResources(resourceFiles3);
    }

    private void addResources(String str) {
        for (String str2 : CSV_SPLITTER.split(Strings.nullToEmpty(str))) {
            if (!this.localFiles.contains(str2)) {
                this.localFiles.add(str2);
                this.sc.addFile(str2);
            }
        }
    }

    private void addJars(String str) {
        for (String str2 : CSV_SPLITTER.split(Strings.nullToEmpty(str))) {
            if (!this.localJars.contains(str2)) {
                this.localJars.add(str2);
                this.sc.addJar(str2);
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        synchronized (LocalHiveSparkClient.class) {
            client = null;
        }
        if (this.sc != null) {
            this.sc.stop();
        }
    }
}
