package com.rapidminer.extension.kafka_connector.operator;

import com.rapidminer.connection.util.ConnectionInformationSelector;
import com.rapidminer.example.Attribute;
import com.rapidminer.example.Example;
import com.rapidminer.example.ExampleSet;
import com.rapidminer.extension.kafka_connector.connections.KafkaConnectionHandler;
import com.rapidminer.operator.Operator;
import com.rapidminer.operator.OperatorDescription;
import com.rapidminer.operator.OperatorException;
import com.rapidminer.operator.ports.InputPort;
import com.rapidminer.operator.ports.OutputPort;
import com.rapidminer.parameter.ParameterType;
import com.rapidminer.parameter.ParameterTypeBoolean;
import com.rapidminer.parameter.ParameterTypeCategory;
import com.rapidminer.parameter.ParameterTypeInt;
import com.rapidminer.parameter.ParameterTypeString;
import com.rapidminer.parameter.conditions.BooleanParameterCondition;
import com.rapidminer.parameter.conditions.EqualTypeCondition;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.serialization.StringSerializer;

/* loaded from: input_file:com/rapidminer/extension/kafka_connector/operator/WriteKafkaTopic.class */
public class WriteKafkaTopic extends Operator {
    static final String PARAMETER_TOPIC = "kafka_topic";
    static final String PARAMETER_BULK = "bulk_sending";
    static final String PARAMETER_MESSAGE_INTERVAL = "message_interval";
    static final String PARAMETER_MESSAGE_FORMAT = "message_format";
    static final String PARAMETER_ATTRIBUTE_SEPARATOR = "attribute_separator";
    private static final String[] FORMAT_TYPES = {"String", "JSON"};
    private static final int FORMAT_STRING = 0;
    private static final int FORMAT_JSON = 1;
    private final ConnectionInformationSelector connectionSelector;
    private final InputPort inputPort;
    private final OutputPort outputPort;

    public WriteKafkaTopic(OperatorDescription operatorDescription) {
        super(operatorDescription);
        this.connectionSelector = new ConnectionInformationSelector(this, KafkaConnectionHandler.TYPE);
        this.inputPort = getInputPorts().createPort("input");
        this.outputPort = getOutputPorts().createPort("throughput");
        getTransformer().addPassThroughRule(this.inputPort, this.outputPort);
    }

    public void doWork() throws OperatorException {
        Properties buildClusterConfiguration = KafkaConnectionHandler.getINSTANCE().buildClusterConfiguration(this.connectionSelector.getConnection().getConfiguration());
        String parameterAsString = getParameterAsString(PARAMETER_TOPIC);
        boolean parameterAsBoolean = getParameterAsBoolean(PARAMETER_BULK);
        int parameterAsInt = getParameterAsInt(PARAMETER_MESSAGE_INTERVAL);
        String parameterAsString2 = getParameterAsString(PARAMETER_ATTRIBUTE_SEPARATOR);
        String parameterAsString3 = getParameterAsString(PARAMETER_MESSAGE_FORMAT);
        ExampleSet<Example> copy = this.inputPort.getData(ExampleSet.class).copy();
        this.outputPort.deliver(copy);
        Thread.currentThread().setContextClassLoader(null);
        buildClusterConfiguration.put(ProducerConfig.ACKS_CONFIG, "all");
        buildClusterConfiguration.put("retries", 0);
        buildClusterConfiguration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        buildClusterConfiguration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        KafkaProducer kafkaProducer = new KafkaProducer(buildClusterConfiguration);
        Throwable th = null;
        try {
            try {
                for (Example example : copy) {
                    StringBuilder sb = new StringBuilder();
                    Iterator allAttributes = example.getAttributes().allAttributes();
                    while (allAttributes.hasNext()) {
                        if (parameterAsString3.equals(FORMAT_TYPES[1])) {
                            Attribute attribute = (Attribute) allAttributes.next();
                            sb.append("\"" + attribute.getName() + "\"").append(":").append(example.getValueAsString(attribute, -1, true)).append(",");
                        } else {
                            sb.append(example.getValueAsString((Attribute) allAttributes.next())).append(parameterAsString2);
                        }
                    }
                    StringBuilder sb2 = new StringBuilder(sb.substring(0, sb.length() - parameterAsString2.length()));
                    if (parameterAsString3.equals(FORMAT_TYPES[1])) {
                        sb2 = new StringBuilder("{" + ((Object) sb2) + "}");
                    }
                    kafkaProducer.send(new ProducerRecord(parameterAsString, "", sb2.toString()));
                    checkForStop();
                    if (!parameterAsBoolean) {
                        try {
                            TimeUnit.MILLISECONDS.sleep(parameterAsInt);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    public List<ParameterType> getParameterTypes() {
        List<ParameterType> parameterTypes = super.getParameterTypes();
        parameterTypes.add(new ParameterTypeString(PARAMETER_TOPIC, "Kafka topic to write to.", false));
        ParameterTypeString parameterTypeString = new ParameterTypeString(PARAMETER_ATTRIBUTE_SEPARATOR, "string separator when writing attributes as string message.", ";");
        parameterTypeString.registerDependencyCondition(new EqualTypeCondition(this, PARAMETER_MESSAGE_FORMAT, FORMAT_TYPES, true, new int[]{0}));
        parameterTypes.add(parameterTypeString);
        parameterTypes.add(new ParameterTypeBoolean(PARAMETER_BULK, "sending message in one bulk", false));
        ParameterTypeInt parameterTypeInt = new ParameterTypeInt(PARAMETER_MESSAGE_INTERVAL, "interval between two messages in miliseconds", 1, FetchRequest.DEFAULT_RESPONSE_MAX_BYTES, 1);
        parameterTypeInt.registerDependencyCondition(new BooleanParameterCondition(this, PARAMETER_BULK, true, false));
        parameterTypes.add(parameterTypeInt);
        parameterTypes.add(new ParameterTypeCategory(PARAMETER_MESSAGE_FORMAT, "format of the send messages", FORMAT_TYPES, 1));
        return parameterTypes;
    }
}
