/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.dataframe.process;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor;
import org.elasticsearch.xpack.ml.dataframe.process.results.RowResults;
import org.elasticsearch.xpack.ml.utils.persistence.LimitAwareBulkIndexer;
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;

class DataFrameRowsJoiner
implements AutoCloseable {
    private static final Logger LOGGER = LogManager.getLogger(DataFrameRowsJoiner.class);
    private static final int RESULTS_BATCH_SIZE = 1000;
    private final String analyticsId;
    private final Settings settings;
    private final TaskId parentTaskId;
    private final DataFrameDataExtractor dataExtractor;
    private final ResultsPersisterService resultsPersisterService;
    private final Iterator<DataFrameDataExtractor.Row> dataFrameRowsIterator;
    private LinkedList<RowResults> currentResults;
    private volatile String failure;
    private volatile boolean isCancelled;

    DataFrameRowsJoiner(String analyticsId, Settings settings, TaskId parentTaskId, DataFrameDataExtractor dataExtractor, ResultsPersisterService resultsPersisterService) {
        this.analyticsId = Objects.requireNonNull(analyticsId);
        this.settings = Objects.requireNonNull(settings);
        this.parentTaskId = Objects.requireNonNull(parentTaskId);
        this.dataExtractor = Objects.requireNonNull(dataExtractor);
        this.resultsPersisterService = Objects.requireNonNull(resultsPersisterService);
        this.dataFrameRowsIterator = new ResultMatchingDataFrameRows();
        this.currentResults = new LinkedList();
    }

    @Nullable
    String getFailure() {
        return this.failure;
    }

    void processRowResults(RowResults rowResults) {
        if (this.failure != null) {
            return;
        }
        try {
            this.addResultAndJoinIfEndOfBatch(rowResults);
        }
        catch (Exception e) {
            LOGGER.error(() -> "[" + this.analyticsId + "] Failed to join results ", (Throwable)e);
            this.failure = "[" + this.analyticsId + "] Failed to join results: " + e.getMessage();
        }
    }

    void cancel() {
        this.isCancelled = true;
    }

    private void addResultAndJoinIfEndOfBatch(RowResults rowResults) {
        this.currentResults.add(rowResults);
        if (this.currentResults.size() == 1000) {
            this.joinCurrentResults();
        }
    }

    private void joinCurrentResults() {
        try (LimitAwareBulkIndexer bulkIndexer = new LimitAwareBulkIndexer(this.settings, this::executeBulkRequest);){
            while (!this.currentResults.isEmpty()) {
                RowResults result = this.currentResults.pop();
                DataFrameDataExtractor.Row row = this.dataFrameRowsIterator.next();
                this.checkChecksumsMatch(row, result);
                bulkIndexer.addAndExecuteIfNeeded(this.createIndexRequest(result, row.getHit()));
            }
        }
        this.currentResults = new LinkedList();
    }

    private void executeBulkRequest(BulkRequest bulkRequest) {
        bulkRequest.setParentTask(this.parentTaskId);
        this.resultsPersisterService.bulkIndexWithHeadersWithRetry(this.dataExtractor.getHeaders(), bulkRequest, this.analyticsId, () -> !this.isCancelled, retryMessage -> {});
    }

    private void checkChecksumsMatch(DataFrameDataExtractor.Row row, RowResults result) {
        if (row.getChecksum() != result.getChecksum()) {
            String msg = "Detected checksum mismatch for document with id [" + row.getHit().getId() + "]; ";
            msg = msg + "expected [" + row.getChecksum() + "] but result had [" + result.getChecksum() + "]; ";
            msg = msg + "this implies the data frame index [" + row.getHit().getIndex() + "] was modified while the analysis was running. ";
            msg = msg + "We rely on this index being immutable during a running analysis and so the results will be unreliable.";
            throw ExceptionsHelper.serverError((String)msg);
        }
    }

    private IndexRequest createIndexRequest(RowResults result, SearchHit hit) {
        LinkedHashMap<String, Object> source = new LinkedHashMap<String, Object>(hit.getSourceAsMap());
        source.putAll(result.getResults());
        IndexRequest indexRequest = new IndexRequest(hit.getIndex());
        indexRequest.id(hit.getId());
        indexRequest.source(source);
        indexRequest.opType(DocWriteRequest.OpType.INDEX);
        indexRequest.setParentTask(this.parentTaskId);
        return indexRequest;
    }

    @Override
    public void close() {
        try {
            this.joinCurrentResults();
        }
        catch (Exception e) {
            LOGGER.error(() -> "[" + this.analyticsId + "] Failed to join results", (Throwable)e);
            this.failure = "[" + this.analyticsId + "] Failed to join results: " + e.getMessage();
        }
        finally {
            try {
                this.consumeDataExtractor();
            }
            catch (Exception e) {
                LOGGER.error(() -> "[" + this.analyticsId + "] Failed to consume data extractor", (Throwable)e);
            }
        }
    }

    private void consumeDataExtractor() throws IOException {
        this.dataExtractor.cancel();
        while (this.dataExtractor.hasNext()) {
            this.dataExtractor.next();
        }
    }

    private class ResultMatchingDataFrameRows
    implements Iterator<DataFrameDataExtractor.Row> {
        private List<DataFrameDataExtractor.Row> currentDataFrameRows = Collections.emptyList();
        private int currentDataFrameRowsIndex;

        private ResultMatchingDataFrameRows() {
        }

        @Override
        public boolean hasNext() {
            return DataFrameRowsJoiner.this.dataExtractor.hasNext() || this.currentDataFrameRowsIndex < this.currentDataFrameRows.size();
        }

        @Override
        public DataFrameDataExtractor.Row next() {
            DataFrameDataExtractor.Row row = null;
            while (this.hasNoMatch(row) && this.hasNext()) {
                this.advanceToNextBatchIfNecessary();
                row = this.currentDataFrameRows.get(this.currentDataFrameRowsIndex++);
            }
            if (this.hasNoMatch(row)) {
                throw ExceptionsHelper.serverError((String)"no more data frame rows could be found while joining results");
            }
            return row;
        }

        private boolean hasNoMatch(DataFrameDataExtractor.Row row) {
            return row == null || row.shouldSkip() || !row.isTraining();
        }

        private void advanceToNextBatchIfNecessary() {
            if (this.currentDataFrameRowsIndex >= this.currentDataFrameRows.size()) {
                this.currentDataFrameRows = this.getNextDataRowsBatch().orElse(Collections.emptyList());
                this.currentDataFrameRowsIndex = 0;
            }
        }

        private Optional<List<DataFrameDataExtractor.Row>> getNextDataRowsBatch() {
            try {
                return DataFrameRowsJoiner.this.dataExtractor.next();
            }
            catch (IOException e) {
                throw ExceptionsHelper.serverError((String)("error reading next batch of data frame rows [" + e.getMessage() + "]"));
            }
        }
    }
}

