package eu.radoop.spark;

import com.google.common.base.Joiner;
import eu.radoop.transfer.model.KMeansMTO;
import eu.radoop.transfer.parameter.ParameterTransferObject;
import eu.radoop.transfer.parameter.SparkKMeansParameter;
import java.util.Collections;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.rdd.RDD;

/* loaded from: input_file:lib/radoop-spark3.jar:eu/radoop/spark/SparkKMeansRunner.class */
public class SparkKMeansRunner extends AbstractSparkRunner {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/radoop-spark3.jar:eu/radoop/spark/SparkKMeansRunner$ResultTupleToOutputLineFunction.class */
    public static final class ResultTupleToOutputLineFunction implements Function<scala.Tuple2<String[], Integer>, String> {
        private static final long serialVersionUID = -8143101654555056888L;
        private static final String CLUSTER_ID_PREFIX = "cluster_";
        private final String fieldSeparator;

        private ResultTupleToOutputLineFunction(String str) {
            this.fieldSeparator = str;
        }

        public String call(scala.Tuple2<String[], Integer> tuple2) throws Exception {
            Joiner on = Joiner.on(this.fieldSeparator);
            return on.join(on.join((Object[]) tuple2._1), "cluster_" + tuple2._2, new Object[0]);
        }
    }

    public static void main(String[] strArr) throws SparkException {
        try {
            trainModelAndClusterData(init(strArr));
        } catch (Exception e) {
            processException(e);
        } finally {
            close();
        }
    }

    public static void trainModelAndClusterData(String str) {
        ParameterTransferObject parameterTransferObject = new ParameterTransferObject(str, SparkKMeansParameter.class);
        System.out.println("pto:\n" + parameterTransferObject);
        JavaRDD cache = getInputAsRDD().cache();
        JavaRDD cache2 = getFeaturesAsVectorRDD(cache).cache();
        KMeansModel trainModel = trainModel(JavaRDD.toRDD(cache2), parameterTransferObject);
        createClusteredRDD(cache, cache2, trainModel).saveAsTextFile(exampleSetOutputDirectory);
        persistModel(convertKMeansModelToMTO(trainModel));
    }

    private static JavaRDD<String> createClusteredRDD(JavaRDD<String[]> javaRDD, JavaRDD<Vector> javaRDD2, KMeansModel kMeansModel) {
        return javaRDD.zip(kMeansModel.predict(javaRDD2)).map(new ResultTupleToOutputLineFunction(fieldSeparator));
    }

    private static KMeansModel trainModel(RDD<Vector> rdd, ParameterTransferObject<SparkKMeansParameter> parameterTransferObject) {
        int intValue = parameterTransferObject.getParameterAsInteger(SparkKMeansParameter.K).intValue();
        int intValue2 = parameterTransferObject.getParameterAsInteger(SparkKMeansParameter.MAX_ITERATIONS).intValue();
        return new KMeans().setK(intValue).setMaxIterations(intValue2).setInitializationMode(parameterTransferObject.getParameterAsString(SparkKMeansParameter.INITIALIZATION_MODE)).setSeed(parameterTransferObject.getParameterAsInteger(SparkKMeansParameter.RANDOM_SEED).intValue()).setEpsilon(parameterTransferObject.getParameterAsDouble(SparkKMeansParameter.EPSILON).doubleValue()).run(rdd);
    }

    /* JADX WARN: Type inference failed for: r0v4, types: [double[], double[][]] */
    private static KMeansMTO convertKMeansModelToMTO(KMeansModel kMeansModel) {
        Vector[] clusterCenters = kMeansModel.clusterCenters();
        ?? r0 = new double[clusterCenters.length];
        for (int i = 0; i < clusterCenters.length; i++) {
            r0[i] = clusterCenters[i].toArray();
        }
        return new KMeansMTO(Collections.emptyMap(), r0);
    }
}
