/*
 * Decompiled with CFR 0.152.
 */
package com.rapidminer.extension.insightshub.operator;

import com.rapidminer.connection.util.ConnectionInformationSelector;
import com.rapidminer.extension.insightshub.client.DataLakeClient;
import com.rapidminer.extension.insightshub.client.TimeSeriesClient;
import com.rapidminer.extension.insightshub.generated.api.datalake.model.ImportJobResponse;
import com.rapidminer.extension.insightshub.ioobject.DataLakeStreamFileObject;
import com.rapidminer.extension.insightshub.operator.InsightsOperator;
import com.rapidminer.extension.insightshub.operator.parameter.ParameterUtils;
import com.rapidminer.operator.IOObject;
import com.rapidminer.operator.IOObjectCollection;
import com.rapidminer.operator.Operator;
import com.rapidminer.operator.OperatorDescription;
import com.rapidminer.operator.OperatorException;
import com.rapidminer.operator.UserError;
import com.rapidminer.operator.ports.OutputPort;
import com.rapidminer.parameter.ParameterType;
import com.rapidminer.parameter.ParameterTypeDateTimeString;
import com.rapidminer.parameter.ParameterTypeInt;
import com.rapidminer.parameter.ParameterTypeString;
import java.time.LocalDate;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;

public class TimeSeriesBulkImportOperator
extends InsightsOperator {
    private static final Logger log = Logger.getLogger(TimeSeriesBulkImportOperator.class.getName());
    public static final String PARAMETER_JOB_NAME = "job_name";
    public static final String PARAMETER_DESTINATION = "destination";
    public static final String PARAMETER_ASPECT_NAMES = "aspect_names";
    public static final String PARAMETER_ASSET_IDS = "asset_ids";
    public static final String PARAMETER_FROM = "from";
    public static final String PARAMETER_TO = "to";
    public static final String PARAMETER_TIMEOUT = "timeout";
    private final OutputPort outputPort = (OutputPort)this.getOutputPorts().createPort("output");

    public TimeSeriesBulkImportOperator(OperatorDescription description) {
        super(description);
        this.initConnectionSelector();
    }

    public List<ParameterType> getParameterTypes() {
        List types = super.getParameterTypes();
        types.addAll(ConnectionInformationSelector.createParameterTypes((ConnectionInformationSelector)this.getConnectionSelector()));
        types.add(new ParameterTypeString(PARAMETER_JOB_NAME, "Name of the time series bulk import job (optional - auto-generated if empty)", true));
        types.add(new ParameterTypeString(PARAMETER_DESTINATION, "User specified destination folder in the data lake", false));
        types.add(new ParameterTypeString(PARAMETER_ASPECT_NAMES, "Aspect ID to import", false));
        types.add(new ParameterTypeString(PARAMETER_ASSET_IDS, "Asset ID to import", false));
        types.add(new ParameterTypeDateTimeString(PARAMETER_FROM, "Beginning of the time range to read (format: YYYY-MM-DD HH:mm:ss or YYYY-MM-DD HH:mm:ss.SSS)", false, false, ParameterUtils.DATETIME_FORMATTER));
        types.add(new ParameterTypeDateTimeString(PARAMETER_TO, "End of the time range to read (format: YYYY-MM-DD HH:mm:ss or YYYY-MM-DD HH:mm:ss.SSS)", false, false, ParameterUtils.DATETIME_FORMATTER));
        ParameterTypeInt timeoutParam = new ParameterTypeInt(PARAMETER_TIMEOUT, "Timeout in seconds for import job completion polling", 1, Integer.MAX_VALUE, 300);
        types.add(timeoutParam);
        return types;
    }

    @Override
    public void doWork() throws OperatorException {
        LocalDate toLocalDate;
        super.doWork();
        String name = this.getParameterAsString(PARAMETER_JOB_NAME);
        if (name == null || name.trim().isEmpty()) {
            name = String.format("iot-import-%s", UUID.randomUUID().toString());
        }
        String destination = this.getParameterAsString(PARAMETER_DESTINATION);
        String aspectId = this.getParameterAsString(PARAMETER_ASPECT_NAMES);
        String assetId = this.getParameterAsString(PARAMETER_ASSET_IDS);
        int timeout2 = this.getParameterAsInt(PARAMETER_TIMEOUT);
        ZonedDateTime fromDateTime = this.getZonedDateTimeParameter(PARAMETER_FROM);
        ZonedDateTime toDateTime = this.getZonedDateTimeParameter(PARAMETER_TO);
        String fromIso = ParameterUtils.toIsoString(fromDateTime);
        String toIso = ParameterUtils.toIsoString(toDateTime);
        LocalDate fromLocalDate = fromDateTime != null ? fromDateTime.toLocalDate() : null;
        LocalDate localDate = toLocalDate = toDateTime != null ? toDateTime.toLocalDate() : null;
        if (fromLocalDate != null && toLocalDate != null && (fromLocalDate.isAfter(toLocalDate) || fromLocalDate.isEqual(toLocalDate))) {
            throw new UserError((Operator)this, "insights.invalid_date_range", new Object[]{fromLocalDate.toString(), toLocalDate.toString()});
        }
        try {
            List<DataLakeStreamFileObject> files;
            TimeSeriesClient client = this.createTimeSeriesClient();
            List<String> assetIds = Arrays.asList(assetId);
            List<String> aspectIds = Arrays.asList(aspectId);
            ImportJobResponse jobResponse = client.createImportJob(name, destination, null, aspectIds, assetIds, fromIso, toIso);
            String jobId = jobResponse.getId();
            log.info(() -> "Created import job with ID: " + jobId);
            ImportJobResponse.StatusEnum finalStatus = client.pollJobStatus(jobId, timeout2);
            if (finalStatus == ImportJobResponse.StatusEnum.SUCCESS) {
                DataLakeClient dataLakeClient = this.createDataLakeRestClient();
                files = dataLakeClient.discoverImportJobFiles(destination, assetId, aspectId, fromLocalDate, toLocalDate);
                if (files.isEmpty()) {
                    throw new UserError((Operator)this, "insights.no_files_found", new Object[]{jobId});
                }
            } else {
                if (finalStatus == ImportJobResponse.StatusEnum.FAILED) {
                    throw new UserError((Operator)this, "insights.import_job_failed", new Object[]{jobId});
                }
                throw new UserError((Operator)this, "insights.import_job_timeout", new Object[]{jobId, timeout2});
            }
            IOObjectCollection fileCollection = new IOObjectCollection(files);
            this.outputPort.deliver((IOObject)fileCollection);
            log.info(() -> String.format("Import job completed successfully. Found %d daily parquet files for job %s", files.size(), jobId));
        }
        catch (UserError ue) {
            throw ue;
        }
        catch (Exception e) {
            log.log(Level.WARNING, e, () -> "Failed to create import job: " + e.getMessage());
            throw this.mapException(e);
        }
    }
}

