package org.apache.paimon.shaded.dlf.org.apache.hive.spark.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.nio.NioEventLoopGroup;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import jodd.util.StringPool;
import org.apache.commons.io.FileUtils;
import org.apache.paimon.shaded.dlf.com.google.common.base.Joiner;
import org.apache.paimon.shaded.dlf.com.google.common.base.Preconditions;
import org.apache.paimon.shaded.dlf.com.google.common.base.Throwables;
import org.apache.paimon.shaded.dlf.com.google.common.collect.Lists;
import org.apache.paimon.shaded.dlf.com.google.common.collect.Maps;
import org.apache.paimon.shaded.dlf.com.google.common.io.Files;
import org.apache.paimon.shaded.dlf.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.paimon.shaded.dlf.org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.paimon.shaded.dlf.org.apache.hive.spark.client.BaseProtocol;
import org.apache.paimon.shaded.dlf.org.apache.hive.spark.client.metrics.Metrics;
import org.apache.paimon.shaded.dlf.org.apache.hive.spark.client.rpc.Rpc;
import org.apache.paimon.shaded.dlf.org.apache.hive.spark.client.rpc.RpcConfiguration;
import org.apache.paimon.shaded.dlf.org.apache.hive.spark.counter.SparkCounters;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkJobInfo;
import org.apache.spark.Success$;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/paimon/shaded/dlf/org/apache/hive/spark/client/RemoteDriver.class */
public class RemoteDriver {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteDriver.class);
    private final ExecutorService executor;
    private final NioEventLoopGroup egroup;
    private final Rpc clientRpc;
    private final DriverProtocol protocol;
    private volatile JobContextImpl jc;
    private volatile boolean running;
    private final List<JobWrapper<?>> jobQueue = Lists.newLinkedList();
    private final Map<String, JobWrapper<?>> activeJobs = Maps.newConcurrentMap();
    private final Object jcLock = new Object();
    private final Object shutdownLock = new Object();
    private final File localTmpDir = Files.createTempDir();

    /* loaded from: input_file:org/apache/paimon/shaded/dlf/org/apache/hive/spark/client/RemoteDriver$ClientListener.class */
    private class ClientListener extends SparkListener {
        private final Map<Integer, Integer> stageToJobId;

        private ClientListener() {
            this.stageToJobId = Maps.newHashMap();
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void onJobStart(SparkListenerJobStart sparkListenerJobStart) {
            synchronized (this.stageToJobId) {
                for (int i = 0; i < sparkListenerJobStart.stageIds().length(); i++) {
                    this.stageToJobId.put((Integer) sparkListenerJobStart.stageIds().mo14155apply(i), Integer.valueOf(sparkListenerJobStart.jobId()));
                }
            }
        }

        public void onJobEnd(SparkListenerJobEnd sparkListenerJobEnd) {
            synchronized (this.stageToJobId) {
                Iterator<Map.Entry<Integer, Integer>> it = this.stageToJobId.entrySet().iterator();
                while (it.hasNext()) {
                    if (it.next().getValue().intValue() == sparkListenerJobEnd.jobId()) {
                        it.remove();
                    }
                }
            }
            String clientId = getClientId(Integer.valueOf(sparkListenerJobEnd.jobId()));
            if (clientId != null) {
                ((JobWrapper) RemoteDriver.this.activeJobs.get(clientId)).jobDone();
            }
        }

        public void onTaskEnd(SparkListenerTaskEnd sparkListenerTaskEnd) {
            Integer num;
            if (!(sparkListenerTaskEnd.reason() instanceof Success$) || sparkListenerTaskEnd.taskInfo().speculative()) {
                return;
            }
            Metrics metrics = new Metrics(sparkListenerTaskEnd.taskMetrics());
            synchronized (this.stageToJobId) {
                num = this.stageToJobId.get(Integer.valueOf(sparkListenerTaskEnd.stageId()));
            }
            String clientId = getClientId(num);
            if (clientId != null) {
                RemoteDriver.this.protocol.sendMetrics(clientId, num.intValue(), sparkListenerTaskEnd.stageId(), sparkListenerTaskEnd.taskInfo().taskId(), metrics);
            }
        }

        private String getClientId(Integer num) {
            for (Map.Entry entry : RemoteDriver.this.activeJobs.entrySet()) {
                Iterator it = ((JobWrapper) entry.getValue()).jobs.iterator();
                while (it.hasNext()) {
                    if (((JavaFutureAction) it.next()).jobIds().contains(num)) {
                        return (String) entry.getKey();
                    }
                }
            }
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/paimon/shaded/dlf/org/apache/hive/spark/client/RemoteDriver$DriverProtocol.class */
    public class DriverProtocol extends BaseProtocol {
        private DriverProtocol() {
        }

        void sendError(Throwable th) {
            RemoteDriver.LOG.debug("Send error to Client: {}", Throwables.getStackTraceAsString(th));
            RemoteDriver.this.clientRpc.call(new BaseProtocol.Error(th));
        }

        <T extends Serializable> void jobFinished(String str, T t, Throwable th, SparkCounters sparkCounters) {
            RemoteDriver.LOG.debug("Send job({}) result to Client.", str);
            RemoteDriver.this.clientRpc.call(new BaseProtocol.JobResult(str, t, th, sparkCounters));
        }

        void jobStarted(String str) {
            RemoteDriver.this.clientRpc.call(new BaseProtocol.JobStarted(str));
        }

        void jobSubmitted(String str, int i) {
            RemoteDriver.LOG.debug("Send job({}/{}) submitted to Client.", str, Integer.valueOf(i));
            RemoteDriver.this.clientRpc.call(new BaseProtocol.JobSubmitted(str, i));
        }

        void sendMetrics(String str, int i, int i2, long j, Metrics metrics) {
            RemoteDriver.LOG.debug("Send task({}/{}/{}/{}) metric to Client.", new Object[]{str, Integer.valueOf(i), Integer.valueOf(i2), Long.valueOf(j)});
            RemoteDriver.this.clientRpc.call(new BaseProtocol.JobMetrics(str, i, i2, j, metrics));
        }

        private void handle(ChannelHandlerContext channelHandlerContext, BaseProtocol.CancelJob cancelJob) {
            JobWrapper jobWrapper = (JobWrapper) RemoteDriver.this.activeJobs.get(cancelJob.id);
            if (jobWrapper == null || !RemoteDriver.this.cancelJob(jobWrapper)) {
                RemoteDriver.LOG.info("Requested to cancel an already finished job.");
            }
        }

        private void handle(ChannelHandlerContext channelHandlerContext, BaseProtocol.EndSession endSession) {
            RemoteDriver.LOG.debug("Shutting down due to EndSession request.");
            RemoteDriver.this.shutdown(null);
        }

        private void handle(ChannelHandlerContext channelHandlerContext, BaseProtocol.JobRequest jobRequest) {
            RemoteDriver.LOG.info("Received job request {}", jobRequest.id);
            JobWrapper jobWrapper = new JobWrapper(jobRequest);
            RemoteDriver.this.activeJobs.put(jobRequest.id, jobWrapper);
            RemoteDriver.this.submit(jobWrapper);
        }

        private Object handle(ChannelHandlerContext channelHandlerContext, BaseProtocol.SyncJobRequest syncJobRequest) throws Exception {
            if (RemoteDriver.this.jc == null) {
                synchronized (RemoteDriver.this.jcLock) {
                    while (RemoteDriver.this.jc == null) {
                        RemoteDriver.this.jcLock.wait();
                        if (!RemoteDriver.this.running) {
                            throw new IllegalStateException("Remote context is shutting down.");
                        }
                    }
                }
            }
            RemoteDriver.this.jc.setMonitorCb(new MonitorCallback() { // from class: org.apache.paimon.shaded.dlf.org.apache.hive.spark.client.RemoteDriver.DriverProtocol.1
                @Override // org.apache.paimon.shaded.dlf.org.apache.hive.spark.client.MonitorCallback
                public void call(JavaFutureAction<?> javaFutureAction, SparkCounters sparkCounters, Set<Integer> set) {
                    throw new IllegalStateException("JobContext.monitor() is not available for synchronous jobs.");
                }
            });
            try {
                Serializable call = syncJobRequest.job.call(RemoteDriver.this.jc);
                RemoteDriver.this.jc.setMonitorCb(null);
                return call;
            } catch (Throwable th) {
                RemoteDriver.this.jc.setMonitorCb(null);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/paimon/shaded/dlf/org/apache/hive/spark/client/RemoteDriver$JobWrapper.class */
    public class JobWrapper<T extends Serializable> implements Callable<Void> {
        private final BaseProtocol.JobRequest<T> req;
        private Future<?> future;
        private final List<JavaFutureAction<?>> jobs = Lists.newArrayList();
        private int completed = 0;
        private final AtomicInteger jobEndReceived = new AtomicInteger(0);
        private SparkCounters sparkCounters = null;
        private Set<Integer> cachedRDDIds = null;
        private Integer sparkJobId = null;

        JobWrapper(BaseProtocol.JobRequest<T> jobRequest) {
            this.req = jobRequest;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            SparkJobInfo jobInfo;
            RemoteDriver.this.protocol.jobStarted(this.req.id);
            try {
                try {
                    RemoteDriver.this.jc.setMonitorCb(new MonitorCallback() { // from class: org.apache.paimon.shaded.dlf.org.apache.hive.spark.client.RemoteDriver.JobWrapper.1
                        @Override // org.apache.paimon.shaded.dlf.org.apache.hive.spark.client.MonitorCallback
                        public void call(JavaFutureAction<?> javaFutureAction, SparkCounters sparkCounters, Set<Integer> set) {
                            JobWrapper.this.monitorJob(javaFutureAction, sparkCounters, set);
                        }
                    });
                    T call = this.req.job.call(RemoteDriver.this.jc);
                    Iterator<JavaFutureAction<?>> it = this.jobs.iterator();
                    while (it.hasNext()) {
                        it.next().get();
                        this.completed++;
                        RemoteDriver.LOG.debug("Client job {}: {} of {} Spark jobs finished.", new Object[]{this.req.id, Integer.valueOf(this.completed), Integer.valueOf(this.jobs.size())});
                    }
                    if (this.sparkJobId != null && (jobInfo = RemoteDriver.this.jc.sc().statusTracker().getJobInfo(this.sparkJobId.intValue())) != null && jobInfo.stageIds() != null && jobInfo.stageIds().length > 0) {
                        synchronized (this.jobEndReceived) {
                            while (this.jobEndReceived.get() < this.jobs.size()) {
                                this.jobEndReceived.wait();
                            }
                        }
                    }
                    SparkCounters sparkCounters = null;
                    if (this.sparkCounters != null) {
                        sparkCounters = this.sparkCounters.snapshot();
                    }
                    RemoteDriver.this.protocol.jobFinished(this.req.id, call, null, sparkCounters);
                    RemoteDriver.this.jc.setMonitorCb(null);
                    RemoteDriver.this.activeJobs.remove(this.req.id);
                    releaseCache();
                    return null;
                } catch (Throwable th) {
                    RemoteDriver.LOG.info("Failed to run job " + this.req.id, th);
                    RemoteDriver.this.protocol.jobFinished(this.req.id, null, th, this.sparkCounters != null ? this.sparkCounters.snapshot() : null);
                    throw new ExecutionException(th);
                }
            } catch (Throwable th2) {
                RemoteDriver.this.jc.setMonitorCb(null);
                RemoteDriver.this.activeJobs.remove(this.req.id);
                releaseCache();
                throw th2;
            }
        }

        void submit() {
            this.future = RemoteDriver.this.executor.submit(this);
        }

        void jobDone() {
            synchronized (this.jobEndReceived) {
                this.jobEndReceived.incrementAndGet();
                this.jobEndReceived.notifyAll();
            }
        }

        void releaseCache() {
            if (this.cachedRDDIds != null) {
                Iterator<Integer> it = this.cachedRDDIds.iterator();
                while (it.hasNext()) {
                    RemoteDriver.this.jc.sc().sc().unpersistRDD(it.next().intValue(), false);
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void monitorJob(JavaFutureAction<?> javaFutureAction, SparkCounters sparkCounters, Set<Integer> set) {
            this.jobs.add(javaFutureAction);
            if (!RemoteDriver.this.jc.getMonitoredJobs().containsKey(this.req.id)) {
                RemoteDriver.this.jc.getMonitoredJobs().put(this.req.id, new CopyOnWriteArrayList());
            }
            RemoteDriver.this.jc.getMonitoredJobs().get(this.req.id).add(javaFutureAction);
            this.sparkCounters = sparkCounters;
            this.cachedRDDIds = set;
            this.sparkJobId = (Integer) javaFutureAction.jobIds().get(0);
            RemoteDriver.this.protocol.jobSubmitted(this.req.id, this.sparkJobId.intValue());
        }
    }

    private RemoteDriver(String[] strArr) throws Exception {
        SparkConf sparkConf = new SparkConf();
        String str = null;
        int i = -1;
        for (int i2 = 0; i2 < strArr.length; i2 += 2) {
            String str2 = strArr[i2];
            if (str2.equals("--remote-host")) {
                str = getArg(strArr, i2);
            } else if (str2.equals("--remote-port")) {
                i = Integer.parseInt(getArg(strArr, i2));
            } else if (str2.equals("--client-id")) {
                sparkConf.set("spark.client.authentication.client_id", getArg(strArr, i2));
            } else if (str2.equals("--secret")) {
                sparkConf.set("spark.client.authentication.secret", getArg(strArr, i2));
            } else {
                if (!str2.equals("--conf")) {
                    throw new IllegalArgumentException("Invalid command line: " + Joiner.on(" ").join((Object[]) strArr));
                }
                String[] split = getArg(strArr, i2).split("[=]", 2);
                sparkConf.set(split[0], split[1]);
            }
        }
        this.executor = Executors.newCachedThreadPool();
        LOG.info("Connecting to: {}:{}", str, Integer.valueOf(i));
        HashMap newHashMap = Maps.newHashMap();
        for (Tuple2 tuple2 : sparkConf.getAll()) {
            newHashMap.put(tuple2.mo14083_1(), tuple2.mo14082_2());
            LOG.debug("Remote Driver configured with: " + ((String) tuple2.mo14083_1()) + StringPool.EQUALS + ((String) tuple2.mo14082_2()));
        }
        String str3 = (String) newHashMap.get("spark.client.authentication.client_id");
        Preconditions.checkArgument(str3 != null, "No client ID provided.");
        String str4 = (String) newHashMap.get("spark.client.authentication.secret");
        Preconditions.checkArgument(str4 != null, "No secret provided.");
        this.egroup = new NioEventLoopGroup(new RpcConfiguration(newHashMap).getRpcThreadCount(), new ThreadFactoryBuilder().setNameFormat("Driver-RPC-Handler-%d").setDaemon(true).build());
        this.protocol = new DriverProtocol();
        this.clientRpc = (Rpc) Rpc.createClient(newHashMap, this.egroup, str, i, str3, str4, this.protocol).get();
        this.running = true;
        this.clientRpc.addListener(new Rpc.Listener() { // from class: org.apache.paimon.shaded.dlf.org.apache.hive.spark.client.RemoteDriver.1
            @Override // org.apache.paimon.shaded.dlf.org.apache.hive.spark.client.rpc.Rpc.Listener
            public void rpcClosed(Rpc rpc) {
                RemoteDriver.LOG.warn("Shutting down driver because RPC channel was closed.");
                RemoteDriver.this.shutdown(null);
            }
        });
        try {
            JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
            javaSparkContext.sc().addSparkListener(new ClientListener());
            synchronized (this.jcLock) {
                this.jc = new JobContextImpl(javaSparkContext, this.localTmpDir);
                this.jcLock.notifyAll();
            }
            synchronized (this.jcLock) {
                Iterator<JobWrapper<?>> it = this.jobQueue.iterator();
                while (it.hasNext()) {
                    it.next().submit();
                }
            }
        } catch (Exception e) {
            LOG.error("Failed to start SparkContext: " + e, e);
            shutdown(e);
            synchronized (this.jcLock) {
                this.jcLock.notifyAll();
                throw e;
            }
        }
    }

    private void run() throws InterruptedException {
        synchronized (this.shutdownLock) {
            while (this.running) {
                this.shutdownLock.wait();
            }
        }
        this.executor.shutdownNow();
        try {
            FileUtils.deleteDirectory(this.localTmpDir);
        } catch (IOException e) {
            LOG.warn("Failed to delete local tmp dir: " + this.localTmpDir, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void submit(JobWrapper<?> jobWrapper) {
        synchronized (this.jcLock) {
            if (this.jc != null) {
                jobWrapper.submit();
            } else {
                LOG.info("SparkContext not yet up, queueing job request.");
                this.jobQueue.add(jobWrapper);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void shutdown(Throwable th) {
        if (this.running) {
            if (th == null) {
                LOG.info("Shutting down remote driver.");
            } else {
                LOG.error("Shutting down remote driver due to error: " + th, th);
            }
            this.running = false;
            Iterator<JobWrapper<?>> it = this.activeJobs.values().iterator();
            while (it.hasNext()) {
                cancelJob(it.next());
            }
            if (th != null) {
                this.protocol.sendError(th);
            }
            if (this.jc != null) {
                this.jc.stop();
            }
            this.clientRpc.close();
            this.egroup.shutdownGracefully();
            synchronized (this.shutdownLock) {
                this.shutdownLock.notifyAll();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean cancelJob(JobWrapper<?> jobWrapper) {
        boolean z = false;
        Iterator it = ((JobWrapper) jobWrapper).jobs.iterator();
        while (it.hasNext()) {
            z |= ((JavaFutureAction) it.next()).cancel(true);
        }
        return z | (((JobWrapper) jobWrapper).future != null && ((JobWrapper) jobWrapper).future.cancel(true));
    }

    private String getArg(String[] strArr, int i) {
        int i2 = i + 1;
        if (strArr.length <= i2) {
            throw new IllegalArgumentException("Invalid command line: " + Joiner.on(" ").join((Object[]) strArr));
        }
        return strArr[i2];
    }

    public static void main(String[] strArr) throws Exception {
        new RemoteDriver(strArr).run();
    }
}
