package eu.radoop.spark.hdfs.loop;

import com.rapidminer.MacroHandler;
import com.rapidminer.RapidMiner;
import com.rapidminer.license.LicenseConstants;
import com.rapidminer.license.LicenseManagerRegistry;
import com.rapidminer.license.internal.DefaultLicenseManager;
import com.rapidminer.license.location.LicenseLoadingException;
import com.rapidminer.license.location.LicenseLocation;
import com.rapidminer.license.location.LicenseStoringException;
import com.rapidminer.license.product.Constraint;
import com.rapidminer.license.product.DefaultProduct;
import com.rapidminer.license.product.Product;
import com.rapidminer.operator.ExecutionUnit;
import com.rapidminer.operator.IOObject;
import com.rapidminer.operator.Operator;
import com.rapidminer.operator.ProcessRootOperator;
import com.rapidminer.operator.nio.file.SimpleFileObject;
import com.rapidminer.operator.ports.OutputPort;
import com.rapidminer.settings.Settings;
import com.rapidminer.tools.LogService;
import com.rapidminer.tools.ParameterService;
import com.rapidminer.tools.plugin.Plugin;
import eu.radoop.manipulation.HiveWindowing;
import eu.radoop.spark.DebugLogger;
import eu.radoop.spark.HandleErrorType;
import eu.radoop.spark.InputParams;
import eu.radoop.spark.LogRetrievalType;
import eu.radoop.spark.PartitionLogHandler;
import eu.radoop.spark.PushdownTools;
import eu.radoop.spark.SerializableOperator;
import eu.radoop.spark.SerializationConverter;
import eu.radoop.spark.Tuple2;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.Serializable;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Supplier;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import jodd.util.SystemUtil;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.TaskContext;

/* loaded from: input_file:lib/radoop-spark3.jar:eu/radoop/spark/hdfs/loop/LoopHdfsExecutor.class */
public class LoopHdfsExecutor implements Serializable {
    private static final String LICENSE = "H4sIAAAAAAAAAF2RXXOiMBSG/0qHW2pVRIHOdGYBP7GDUvCjrnuRQIAACQhB0I7/fSl1t7N7+7zn5D3P5INLsItogQx7ZXLP3MeRy/LUK1228I7c85HLQYY9ginKO0UG8jgri9BLK3rkHv+OblFe4JQWzcLPIyc9CfyR+/UdTzzMmrh9rqQJJpgh776PXOQh6qImlHoNuV+zRJeG0DJJGlYwkLMxYOgboTrDOfh89V9+X98UKG9gI0MBQW3xG/DSNHuwPx0e1neJh0mN3JKleXsNIgAn7bCHzihJs0brx7f/k5uSP9ZNxDD6FP643RrkNvYsB5iy4qv3P9AO3Y9bjL86REUSJdTv+EN50BFB3+tA2JM7sowUGUB3NBj02zpAacpa1+IueuMeuQIHFLAyR82nmVXUXS/fWUj8teOer9lYT9TAg3lX1OZbPTeE2VazL4IkBKIa9cEqll/3MF4MapuC00mTzYAPT0x3J7WQ6M488y29PHhZObIHWaHtMmW4X7+b+pva3S4mhkXkjNgkhTLfs1EvmV6FDTxPe/VhNti55epiDfmFIWf8NPZ5p5pWo5hE8RKLCkns8PAuyjvBoh7vu8omEpCdbspZpBmR6khwihL3VPWWB8lXdOaJfl9Fm0koVg6UrYjXX0tanvXQNy6rYAjhIhG00UoXJ45zokqtzrtvqmoKQU1MGBmKv8ktjKPz9TQ05zFTxrsAi9Q34I7NtWsZnfv2jN/tYTS+OoZGLP3AL40Q0v3UIdXLC3f7DT7DuOIfAwAA";
    private static final String PRODUCT_SIGNATURE = "xShcKqhWGDdeTdmNc89hwaSi9aALtF2CPl09GKR4KsMSeIpqu6k+Ng5qptZzPIwZf2gq85PFtnZqDCst2RwoaOO7KL06t3514gMM63fb8C4u4TgT/X5iIDLb4IxpmI7k8oknf++T2W4uQELPL5G0gX0I+YpRe99sTQ/wfgo3KO8bIMPuF6RoI41ps8F4De90bzrKMTLHXG1LzfBDFHbiBtCTOZLAOIxnhBV2QH8zxY+mVnMJGtIgsdNK0GLCazsWGbryNohlAz3FPJq+QArRSGljiDqtzCG+1H9FLMa5Duz2/at14TWmD4UawdbEQK6dRru+fVYbJsJ5HxXPC8Rq7Q==";
    private static final Product PRODUCT = new DefaultProduct("rapidminer-sparkpushdown", "7.2+", false, PRODUCT_SIGNATURE, new Constraint[]{LicenseConstants.DATA_ROW_CONSTRAINT, LicenseConstants.LOGICAL_PROCESSOR_CONSTRAINT, LicenseConstants.MEMORY_LIMIT_CONSTRAINT, LicenseConstants.WEB_SERVICE_LIMIT_CONSTRAINT});
    private static final Object initLock = new Object();
    public static final ThreadLocal<String> partitionID = new InheritableThreadLocal();
    private final DebugLogger debugLogger;
    private final SerializationConverter serializationConverter;

    public LoopHdfsExecutor(DebugLogger debugLogger, SerializationConverter serializationConverter) {
        this.debugLogger = debugLogger;
        this.serializationConverter = serializationConverter;
    }

    public PartitionResult runOnPartition(String str, InputParams inputParams, int i, HandleErrorType handleErrorType) {
        debug("executor start: " + str);
        if (inputParams.setPartitionMacro.booleanValue()) {
            inputParams.macros.put(inputParams.partitionMacroName, str);
        }
        inputParams.macros.put("task", String.format("stage: %s attempt: %s", Integer.valueOf(TaskContext.get().stageId()), Long.valueOf(TaskContext.get().taskAttemptId())));
        PartitionLogHandler partitionLogHandler = new PartitionLogHandler(str, inputParams.logLimit.intValue() / i, () -> {
            return partitionID.get();
        });
        try {
            debug("workOnPartition start");
            PartitionResult runProcess = runProcess(inputParams.processXML, inputParams.ioObjectList, partitionLogHandler, inputParams.processConfiguration, inputParams.macros, inputParams.extensionsUsed, str);
            if (LogRetrievalType.ALL == inputParams.logRetrieveType) {
                runProcess.addLogs(partitionLogHandler.getLogRecords());
            }
            return runProcess;
        } catch (Throwable th) {
            debug("partition error " + str + ": " + th);
            switch (handleErrorType) {
                case IGNORE_ERROR:
                    debug("partition error " + str + " ignored");
                    PartitionError partitionError = new PartitionError(str, th.toString());
                    if (LogRetrievalType.NONE != inputParams.logRetrieveType) {
                        partitionError.addLogs(partitionLogHandler.getLogRecords());
                        partitionError.addLog(new LogRecord(Level.WARNING, "Ignored error: " + th.getMessage()));
                    }
                    return partitionError;
                default:
                    HashMap hashMap = new HashMap();
                    hashMap.put(str, partitionLogHandler.getLogRecords());
                    throw PushdownTools.createExecutorException(th, hashMap);
            }
        }
    }

    private PartitionResult runProcess(String str, List<IOObject> list, PartitionLogHandler partitionLogHandler, ArrayList<Tuple2<String, String>> arrayList, Map<String, String> map, List<String> list2, String str2) throws Exception {
        debug("runProcess start " + str2);
        Settings.setSetting("rapidminer.logging.resource-file-jar-path", "com.rapidminer.resources.i18n.LogMessages");
        Logger root = LogService.getRoot();
        Level level = Level.INFO;
        partitionID.set(str2);
        root.addHandler(partitionLogHandler);
        partitionLogHandler.setLevel(level);
        initRapidMiner(partitionLogHandler, arrayList, list2, str2, root, level);
        ProcessRootOperator restore = new SerializableOperator(str).restore();
        ExecutionUnit subprocess = restore.getSubprocess(0);
        if (!isOperatorEnabled(subprocess)) {
            return new PartitionResult(str2);
        }
        for (Operator operator : restore.getAllInnerOperators()) {
            operator.setBreakpoint(0, false);
            operator.setBreakpoint(1, false);
        }
        List list3 = (List) restore.getSubprocess(0).getInnerSources().getAllPorts().stream().filter((v0) -> {
            return v0.isConnected();
        }).collect(Collectors.toList());
        if (!list3.isEmpty()) {
            ((OutputPort) list3.get(0)).deliver(new SimpleFileObject(copyToLocalPath(str2)));
        }
        Iterator<IOObject> it = list.iterator();
        list3.stream().skip(1L).takeWhile(outputPort -> {
            return it.hasNext();
        }).forEach(outputPort2 -> {
            outputPort2.deliver((IOObject) it.next());
        });
        MacroHandler macroHandler = restore.getProcess().getMacroHandler();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            debug("Setting macro: " + entry.getKey() + ", " + entry.getValue());
            macroHandler.addMacro(entry.getKey(), entry.getValue());
        }
        debug("RapidMiner process start");
        long currentTimeMillis = System.currentTimeMillis();
        subprocess.execute();
        debug("RapidMiner process finished in " + (System.currentTimeMillis() - currentTimeMillis) + "ms");
        PartitionResult partitionResult = new PartitionResult(str2);
        debug("subproc output");
        Stream peek = subprocess.getInnerSinks().createIOContainer(true).asList().stream().peek(iOObject -> {
            debug(() -> {
                return "resultObject: " + iOObject.getClass().getCanonicalName();
            });
        });
        SerializationConverter serializationConverter = this.serializationConverter;
        Objects.requireNonNull(serializationConverter);
        partitionResult.getOutputs().addAll((List) peek.map(serializationConverter::convertForSparkSerialization).peek(iOObject2 -> {
            iOObject2.getAnnotations().put(HiveWindowing.ROLE_PARTITION, str2);
        }).collect(Collectors.toList()));
        MacroHandler macroHandler2 = restore.getProcess().getMacroHandler();
        Iterator definedMacroNames = macroHandler2.getDefinedMacroNames();
        while (definedMacroNames.hasNext()) {
            String str3 = (String) definedMacroNames.next();
            partitionResult.getMacros().put(str3, macroHandler2.getMacro(str3));
        }
        root.removeHandler(partitionLogHandler);
        debug(String.format("runprocess end '%s' with %d outputs", str2, Integer.valueOf(partitionResult.getOutputs().size())));
        return partitionResult;
    }

    private void initRapidMiner(PartitionLogHandler partitionLogHandler, ArrayList<Tuple2<String, String>> arrayList, List<String> list, String str, Logger logger, Level level) throws IOException {
        debug("RapidMiner init called " + str);
        synchronized (initLock) {
            if (RapidMiner.isInitialized()) {
                debug("RapidMiner already initialized " + str);
            } else {
                debug("RapidMiner init SYNCHRONIZED " + str);
                long currentTimeMillis = System.currentTimeMillis();
                RapidMiner.setExecutionMode(RapidMiner.ExecutionMode.COMMAND_LINE);
                Plugin.addAdditionalExtensionDir(setupExtensionDir(list));
                Plugin.setInitPlugins(true);
                try {
                    LicenseManagerRegistry.INSTANCE.set(new DefaultLicenseManager());
                } catch (IllegalStateException e) {
                }
                ParameterService.setParameterValue("rapidminer.proxy.mode", "Direct (no proxy)");
                RapidMiner.init(PRODUCT, new LicenseLocation() { // from class: eu.radoop.spark.hdfs.loop.LoopHdfsExecutor.1
                    public void storeLicense(String str2, String str3, String str4, LocalDate localDate, LocalDate localDate2, String str5) throws LicenseStoringException {
                    }

                    public List<String> loadLicenses(String str2) throws LicenseLoadingException {
                        return Collections.singletonList(LoopHdfsExecutor.LICENSE);
                    }
                });
                Iterator<Tuple2<String, String>> it = arrayList.iterator();
                while (it.hasNext()) {
                    Tuple2<String, String> next = it.next();
                    ParameterService.setParameterValue(next._1, next._2);
                }
                logger.setLevel(parseCustomLogLevel(ParameterService.getParameterValue("rapidminer.gui.log_level"), level));
                logger.setUseParentHandlers(false);
                for (Handler handler : logger.getHandlers()) {
                    if (!(handler instanceof PartitionLogHandler)) {
                        logger.removeHandler(handler);
                    }
                }
                debug("RapidMiner init end " + str);
                debug("RapidMiner init took " + (System.currentTimeMillis() - currentTimeMillis) + "ms");
            }
            partitionLogHandler.setLevel(parseCustomLogLevel(ParameterService.getParameterValue("rapidminer.gui.log_level"), level));
        }
    }

    private static Level parseCustomLogLevel(String str, Level level) {
        Level level2 = level;
        if (str != null) {
            try {
                level2 = Level.parse(str);
            } catch (IllegalArgumentException e) {
                LogService.getRoot().warning("Could not set log level to: " + str);
            }
        }
        return level2;
    }

    private static String setupExtensionDir(List<String> list) throws IOException {
        File file = new File(".");
        String str = System.getProperty(SystemUtil.USER_DIR) + "/tmp/extensions";
        File file2 = new File(str);
        if (!file2.exists()) {
            if (!file2.mkdir() && !file2.exists()) {
                throw new IOException("Could not setup extensions, could not create directory: " + str);
            }
            final HashSet hashSet = new HashSet(list);
            File[] listFiles = file.listFiles(new FilenameFilter() { // from class: eu.radoop.spark.hdfs.loop.LoopHdfsExecutor.2
                @Override // java.io.FilenameFilter
                public boolean accept(File file3, String str2) {
                    return hashSet.contains(str2);
                }
            });
            if (listFiles == null) {
                throw new IOException("Could not setup extensions, failed to list directory: " + str);
            }
            for (File file3 : listFiles) {
                FileUtils.copyFile(file3, new File(str + "/" + file3.getName()));
            }
        }
        return str;
    }

    private static File copyToLocalPath(String str) throws IOException {
        Path path = new Path(str);
        Path path2 = new Path("file://" + System.getProperty("java.io.tmpdir") + File.separator + TaskContext.get().stageId() + "-" + TaskContext.get().partitionId() + "-" + UUID.randomUUID().toString() + "-" + path.getName());
        FileSystem.get(new Configuration()).copyToLocalFile(path, path2);
        return new File(path2.toUri());
    }

    private static boolean isOperatorEnabled(ExecutionUnit executionUnit) {
        return executionUnit.getEnclosingOperator().isEnabled();
    }

    private void debug(Object obj) {
        this.debugLogger.debug(obj);
    }

    private void debug(Supplier<String> supplier) {
        this.debugLogger.debug(supplier);
    }
}
