package org.apache.hadoop.fs.azure;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.util.Time;
import org.mortbay.jetty.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:lib/hadoop-azure-3.1.1.7.1.7.2000-305.jar:org/apache/hadoop/fs/azure/AzureFileSystemThreadPoolExecutor.class */
public class AzureFileSystemThreadPoolExecutor {
    public static final Logger LOG = LoggerFactory.getLogger((Class<?>) AzureFileSystemThreadPoolExecutor.class);
    private int threadCount;
    private String threadNamePrefix;
    private String operation;
    private String key;
    private String config;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:lib/hadoop-azure-3.1.1.7.1.7.2000-305.jar:org/apache/hadoop/fs/azure/AzureFileSystemThreadPoolExecutor$AzureFileSystemThreadFactory.class */
    public static class AzureFileSystemThreadFactory implements ThreadFactory {
        private String threadIdPrefix;
        private AtomicInteger threadSequenceNumber = new AtomicInteger(0);

        public AzureFileSystemThreadFactory(String str) {
            this.threadIdPrefix = "AzureFileSystemThread";
            this.threadIdPrefix = str;
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            thread.setName(String.format("%s-%s-%d", this.threadIdPrefix, Thread.currentThread().getName(), Integer.valueOf(this.threadSequenceNumber.getAndIncrement())));
            return thread;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:lib/hadoop-azure-3.1.1.7.1.7.2000-305.jar:org/apache/hadoop/fs/azure/AzureFileSystemThreadPoolExecutor$AzureFileSystemThreadRunnable.class */
    public static class AzureFileSystemThreadRunnable implements Runnable {
        private volatile IOException lastException = null;
        private volatile boolean operationStatus = true;
        private AtomicInteger fileIndex = new AtomicInteger(0);
        private AtomicInteger filesProcessed = new AtomicInteger(0);
        private AtomicInteger threadsUsed = new AtomicInteger(0);
        private String operation;
        private final FileMetadata[] files;
        private AzureFileSystemThreadTask task;

        public AzureFileSystemThreadRunnable(FileMetadata[] fileMetadataArr, AzureFileSystemThreadTask azureFileSystemThreadTask, String str) {
            this.operation = HttpStatus.Unknown;
            this.operation = str;
            this.files = fileMetadataArr;
            this.task = azureFileSystemThreadTask;
        }

        @Override // java.lang.Runnable
        public void run() {
            FileMetadata fileMetadata;
            long monotonicNow = Time.monotonicNow();
            int i = 0;
            do {
                int andIncrement = this.fileIndex.getAndIncrement();
                if (andIncrement >= this.files.length) {
                    break;
                }
                i++;
                fileMetadata = this.files[andIncrement];
                try {
                    if (this.task.execute(fileMetadata)) {
                        this.filesProcessed.getAndIncrement();
                    } else {
                        AzureFileSystemThreadPoolExecutor.LOG.error("{} operation failed for file {}", this.operation, fileMetadata.getKey());
                        this.operationStatus = false;
                    }
                } catch (Exception e) {
                    AzureFileSystemThreadPoolExecutor.LOG.error("Encountered Exception for {} operation for file {}", this.operation, fileMetadata.getKey());
                    this.lastException = new IOException("Encountered Exception for " + this.operation + " operation for file " + fileMetadata.getKey(), e);
                }
                if (this.lastException != null) {
                    break;
                }
            } while (this.operationStatus);
            AzureFileSystemThreadPoolExecutor.LOG.warn("Terminating execution of {} operation now as some other thread already got exception or operation failed", this.operation, fileMetadata.getKey());
            AzureFileSystemThreadPoolExecutor.LOG.debug("Time taken to process {} files count for {} operation: {} ms", Integer.valueOf(i), this.operation, Long.valueOf(Time.monotonicNow() - monotonicNow));
            if (i > 0) {
                this.threadsUsed.getAndIncrement();
            }
        }
    }

    public AzureFileSystemThreadPoolExecutor(int i, String str, String str2, String str3, String str4) {
        this.threadCount = i;
        this.threadNamePrefix = str;
        this.operation = str2;
        this.key = str3;
        this.config = str4;
    }

    @VisibleForTesting
    ThreadPoolExecutor getThreadPool(int i) throws Exception {
        return new ThreadPoolExecutor(i, i, 2L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new AzureFileSystemThreadFactory(this.threadNamePrefix));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean executeParallel(FileMetadata[] fileMetadataArr, AzureFileSystemThreadTask azureFileSystemThreadTask) throws IOException {
        boolean z = false;
        boolean z2 = false;
        int i = this.threadCount;
        ThreadPoolExecutor threadPoolExecutor = null;
        long monotonicNow = Time.monotonicNow();
        int min = Math.min(fileMetadataArr.length, i);
        if (min > 1) {
            try {
                threadPoolExecutor = getThreadPool(min);
                z2 = true;
            } catch (Exception e) {
                LOG.warn("Failed to create thread pool with threads {} for operation {} on blob {}. Use config {} to set less number of threads. Setting config value to <= 1 will disable threads.", Integer.valueOf(min), this.operation, this.key, this.config);
            }
        } else {
            LOG.warn("Disabling threads for {} operation as thread count {} is <= 1", this.operation, Integer.valueOf(min));
        }
        if (z2) {
            LOG.debug("Using thread pool for {} operation with threads {}", this.operation, Integer.valueOf(min));
            boolean z3 = false;
            AzureFileSystemThreadRunnable azureFileSystemThreadRunnable = new AzureFileSystemThreadRunnable(fileMetadataArr, azureFileSystemThreadTask, this.operation);
            for (int i2 = 0; i2 < min && azureFileSystemThreadRunnable.lastException == null && azureFileSystemThreadRunnable.operationStatus; i2++) {
                try {
                    threadPoolExecutor.execute(azureFileSystemThreadRunnable);
                    z3 = true;
                } catch (RejectedExecutionException e2) {
                    LOG.error("Rejected execution of thread for {} operation on blob {}. Continuing with existing threads. Use config {} to set less number of threads to avoid this error", this.operation, this.key, this.config);
                }
            }
            threadPoolExecutor.shutdown();
            try {
                threadPoolExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
            } catch (InterruptedException e3) {
                threadPoolExecutor.shutdownNow();
                Thread.currentThread().interrupt();
                LOG.error("Threads got interrupted {} blob operation for {} ", this.operation, this.key);
            }
            int i3 = min - azureFileSystemThreadRunnable.threadsUsed.get();
            if (i3 > 0) {
                LOG.warn("{} threads not used for {} operation on blob {}", Integer.valueOf(i3), this.operation, this.key);
            }
            if (z3) {
                IOException iOException = azureFileSystemThreadRunnable.lastException;
                if (iOException == null && azureFileSystemThreadRunnable.operationStatus && azureFileSystemThreadRunnable.filesProcessed.get() < fileMetadataArr.length) {
                    LOG.error("{} failed as operation on subfolders and files failed.", this.operation);
                    iOException = new IOException(this.operation + " failed as operation on subfolders and files failed.");
                }
                if (iOException != null) {
                    throw iOException;
                }
                z = azureFileSystemThreadRunnable.operationStatus;
            } else {
                z2 = false;
                LOG.info("Not able to schedule threads to {} blob {}. Fall back to {} blob serially.", this.operation, this.key, this.operation);
            }
        }
        if (!z2) {
            LOG.debug("Serializing the {} operation", this.operation);
            for (int i4 = 0; i4 < fileMetadataArr.length; i4++) {
                if (!azureFileSystemThreadTask.execute(fileMetadataArr[i4])) {
                    LOG.warn("Failed to {} file {}", this.operation, fileMetadataArr[i4]);
                    return false;
                }
            }
            z = true;
        }
        LOG.info("Time taken for {} operation is: {} ms with threads: {}", this.operation, Long.valueOf(Time.monotonicNow() - monotonicNow), Integer.valueOf(min));
        return z;
    }
}
