package eu.radoop.spark;

import com.hortonworks.spark.sql.hive.llap.HiveWarehouseSession;
import eu.radoop.RadoopConf;
import eu.radoop.hive.udf.GenericUDTFApplyModel;
import eu.radoop.transfer.parameter.ParameterTransferObject;
import eu.radoop.transfer.parameter.SparkModelApplyParameter;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.fs.shell.Display;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.spark.SparkException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:lib/radoop-spark3.jar:eu/radoop/spark/ModelApplyRunner.class */
public class ModelApplyRunner {
    public static final String APPLY_MODEL_INPUT_TABLE = "spark_apply_model_input";
    private static final String APPNAME = "Radoop Spark Model Apply";
    private static final String APPLY_MODEL_UDF_NAME = "spark_apply_model";
    private static final String APPLY_MODEL_UDF_CLASS = GenericUDTFApplyModel.class.getName();
    private static boolean debug = true;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ModelApplyRunner.class);

    public static void main(String[] strArr) throws SparkException {
        debug("runner start");
        log.info("Spark Apply Model starting");
        SparkSession sparkSession = null;
        HiveWarehouseSession hiveWarehouseSession = null;
        String str = null;
        try {
            try {
                debug("args:");
                debug(Arrays.deepToString(strArr));
                ParameterTransferObject parameterTransferObject = new ParameterTransferObject(RunnerTools.readFromArgFile(strArr[0]), SparkModelApplyParameter.class);
                debug("pto:");
                debug(parameterTransferObject);
                str = parameterTransferObject.getParameterAsString(SparkModelApplyParameter.EXCEPTION_DIRECTORY);
                String parameterAsString = parameterTransferObject.getParameterAsString(SparkModelApplyParameter.DEFAULT_DB);
                log.info("Creating Spark Session");
                sparkSession = SparkSession.builder().appName(APPNAME).enableHiveSupport().config(RadoopConf.SPARK_EXECUTOR_USERCLASSPATHFIRST, false).getOrCreate();
                sparkSession.sql("use " + parameterAsString);
                debug("hive: " + parameterAsString);
                log.info("Creating Hive Session");
                hiveWarehouseSession = com.hortonworks.hwc.HiveWarehouseSession.session(sparkSession).defaultDB(parameterAsString).build();
                String parameterAsString2 = parameterTransferObject.getParameterAsString(SparkModelApplyParameter.QUERY);
                String parameterAsString3 = parameterTransferObject.getParameterAsString(SparkModelApplyParameter.INPUT_TABLE);
                String parameterAsString4 = parameterTransferObject.getParameterAsString(SparkModelApplyParameter.OUTPUT_TABLE);
                String parameterAsString5 = parameterTransferObject.getParameterAsString(SparkModelApplyParameter.OUTPUT_TABLE_FORMAT);
                debug("register udf:");
                log.info("Registering Spark udf '{}' for class '{}'", APPLY_MODEL_UDF_NAME, APPLY_MODEL_UDF_CLASS);
                sparkSession.sql(String.format("CREATE TEMPORARY FUNCTION %s AS '%s'", APPLY_MODEL_UDF_NAME, APPLY_MODEL_UDF_CLASS));
                debug("read input ");
                log.info("Reading input {}", parameterAsString3);
                hiveWarehouseSession.sql("SELECT * FROM " + parameterAsString3).createOrReplaceTempView(APPLY_MODEL_INPUT_TABLE);
                debug("execute apply ");
                log.info("Executing query {}", parameterAsString2);
                Dataset sql = sparkSession.sql(parameterAsString2);
                debug("write output");
                log.info("Writing output table '{}' in '{}' format", parameterAsString4, parameterAsString5);
                boolean z = -1;
                switch (parameterAsString5.hashCode()) {
                    case -936672467:
                        if (parameterAsString5.equals("rcfile")) {
                            z = 4;
                            break;
                        }
                        break;
                    case -793011724:
                        if (parameterAsString5.equals("parquet")) {
                            z = true;
                            break;
                        }
                        break;
                    case 110304:
                        if (parameterAsString5.equals("orc")) {
                            z = false;
                            break;
                        }
                        break;
                    case 3006770:
                        if (parameterAsString5.equals("avro")) {
                            z = 2;
                            break;
                        }
                        break;
                    case 3556653:
                        if (parameterAsString5.equals(Display.Text.NAME)) {
                            z = 3;
                            break;
                        }
                        break;
                    case 808232125:
                        if (parameterAsString5.equals("sequencefile")) {
                            z = 5;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                    case true:
                    case true:
                        writeSpark(sql, parameterAsString4, parameterAsString5);
                        break;
                    case true:
                    case true:
                    case true:
                        writeHiveFormat(sql, parameterAsString4, parameterAsString5);
                        break;
                }
                debug("apply finish");
                log.info("Spark Apply Model successful");
                debug("finally");
                if (hiveWarehouseSession != null) {
                    hiveWarehouseSession.close();
                }
                if (sparkSession != null) {
                    sparkSession.stop();
                }
                log.info("Spark Apply Model finished");
                debug("runner finish");
            } catch (Exception e) {
                debug("exception");
                log.error("Spark Apply Model Exception: {}", (Throwable) e);
                debug(e.getMessage());
                if (sparkSession != null && str != null) {
                    sparkSession.createDataset(List.of(e.getMessage()), Encoders.STRING()).write().text(str);
                }
                throw e;
            }
        } catch (Throwable th) {
            debug("finally");
            if (hiveWarehouseSession != null) {
                hiveWarehouseSession.close();
            }
            if (sparkSession != null) {
                sparkSession.stop();
            }
            throw th;
        }
    }

    private static void writeSpark(Dataset<Row> dataset, String str, String str2) {
        log.info("Writing {} in '{}' format", str, str2);
        dataset.write().format(str2).mode(SaveMode.ErrorIfExists).saveAsTable(str);
    }

    private static void writeHwc(Dataset<Row> dataset, String str, String str2) {
        log.info("Writing {} in '{}' format with HWC", str, str2);
        dataset.write().format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").option("table", str).option("fileformat", str2).mode(SaveMode.ErrorIfExists).save();
    }

    private static void writeHiveFormat(Dataset<Row> dataset, String str, String str2) {
        String str3;
        String str4;
        String str5;
        log.info("Writing {} in 'hive' format using {} serde", str, str2);
        boolean z = -1;
        switch (str2.hashCode()) {
            case -936672467:
                if (str2.equals("rcfile")) {
                    z = 2;
                    break;
                }
                break;
            case -793011724:
                if (str2.equals("parquet")) {
                    z = false;
                    break;
                }
                break;
            case 110304:
                if (str2.equals("orc")) {
                    z = 4;
                    break;
                }
                break;
            case 3556653:
                if (str2.equals(Display.Text.NAME)) {
                    z = true;
                    break;
                }
                break;
            case 808232125:
                if (str2.equals("sequencefile")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                str3 = RadoopConf.CLASS_PARQUET_INPUT_FORMAT;
                str4 = RadoopConf.CLASS_PARQUET_OUTPUT_FORMAT;
                str5 = RadoopConf.CLASS_PARQUET_SERDE;
                break;
            case true:
                str3 = RadoopConf.CLASS_TEXTFILE_INPUT_FORMAT;
                str4 = RadoopConf.CLASS_TEXTFILE_OUTPUT_FORMAT;
                str5 = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
                break;
            case true:
                str3 = RadoopConf.CLASS_RC_INPUT_FORMAT;
                str4 = RadoopConf.CLASS_RC_OUTPUT_FORMAT;
                str5 = RadoopConf.CLASS_RC_SERDE;
                break;
            case true:
                str3 = RadoopConf.CLASS_SEQUENCEFILE_INPUT_FORMAT;
                str4 = RadoopConf.CLASS_SEQUENCEFILE_OUTPUT_FORMAT;
                str5 = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
                break;
            case true:
                str3 = RadoopConf.CLASS_ORC_INPUT_FORMAT;
                str4 = RadoopConf.CLASS_ORC_OUTPUT_FORMAT;
                str5 = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
                break;
            default:
                log.warn("Uknown output format {}, using ORC", str2);
                str3 = RadoopConf.CLASS_ORC_INPUT_FORMAT;
                str4 = RadoopConf.CLASS_ORC_OUTPUT_FORMAT;
                str5 = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
                break;
        }
        log.info("inputFormat: {}", str3);
        log.info("outputFormat: {}", str4);
        log.info("serde: {}", str3);
        dataset.write().format("hive").option("inputFormat", str3).option("outputFormat", str4).option(hive_metastoreConstants.META_TABLE_SERDE, str5).mode(SaveMode.ErrorIfExists).saveAsTable(str);
    }

    private static void debug(Object obj) {
        if (debug) {
            System.out.println("radoop_out " + String.valueOf(obj));
        }
    }
}
