/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.utils.persistence;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.ClientHelper;

public class ResultsPersisterService {
    public static final Set<RestStatus> IRRECOVERABLE_REST_STATUSES = Collections.unmodifiableSet(new HashSet<RestStatus>(Arrays.asList(RestStatus.GONE, RestStatus.NOT_IMPLEMENTED, RestStatus.NOT_FOUND, RestStatus.BAD_REQUEST, RestStatus.UNAUTHORIZED, RestStatus.FORBIDDEN, RestStatus.METHOD_NOT_ALLOWED, RestStatus.NOT_ACCEPTABLE)));
    private static final Logger LOGGER = LogManager.getLogger(ResultsPersisterService.class);
    public static final Setting<Integer> PERSIST_RESULTS_MAX_RETRIES = Setting.intSetting((String)"xpack.ml.persist_results_max_retries", (int)20, (int)0, (int)50, (Setting.Property[])new Setting.Property[]{Setting.Property.Dynamic, Setting.Property.NodeScope});
    private static final int MAX_RETRY_SLEEP_MILLIS = (int)Duration.ofMinutes(15L).toMillis();
    private static final int MIN_RETRY_SLEEP_MILLIS = 50;
    private static final int MAX_RETRY_EXPONENT = 24;
    private final CheckedConsumer<Integer, InterruptedException> sleeper;
    private final OriginSettingClient client;
    private volatile int maxFailureRetries;

    public ResultsPersisterService(OriginSettingClient client, ClusterService clusterService, Settings settings) {
        this((CheckedConsumer<Integer, InterruptedException>)((CheckedConsumer)Thread::sleep), client, clusterService, settings);
    }

    ResultsPersisterService(CheckedConsumer<Integer, InterruptedException> sleeper, OriginSettingClient client, ClusterService clusterService, Settings settings) {
        this.sleeper = sleeper;
        this.client = client;
        this.maxFailureRetries = (Integer)PERSIST_RESULTS_MAX_RETRIES.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(PERSIST_RESULTS_MAX_RETRIES, this::setMaxFailureRetries);
    }

    void setMaxFailureRetries(int value) {
        this.maxFailureRetries = value;
    }

    public BulkResponse indexWithRetry(String jobId, String indexName, ToXContent object, ToXContent.Params params, WriteRequest.RefreshPolicy refreshPolicy, String id, boolean requireAlias, Supplier<Boolean> shouldRetry, Consumer<String> msgHandler) throws IOException {
        BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(refreshPolicy);
        try (XContentBuilder content = object.toXContent(XContentFactory.jsonBuilder(), params);){
            bulkRequest.add(new IndexRequest(indexName).id(id).source(content).setRequireAlias(requireAlias));
        }
        return this.bulkIndexWithRetry(bulkRequest, jobId, shouldRetry, msgHandler);
    }

    public BulkResponse bulkIndexWithRetry(BulkRequest bulkRequest, String jobId, Supplier<Boolean> shouldRetry, Consumer<String> msgHandler) {
        return this.bulkIndexWithRetry(bulkRequest, jobId, shouldRetry, msgHandler, providedBulkRequest -> (BulkResponse)this.client.bulk(providedBulkRequest).actionGet());
    }

    public BulkResponse bulkIndexWithHeadersWithRetry(Map<String, String> headers, BulkRequest bulkRequest, String jobId, Supplier<Boolean> shouldRetry, Consumer<String> msgHandler) {
        return this.bulkIndexWithRetry(bulkRequest, jobId, shouldRetry, msgHandler, providedBulkRequest -> (BulkResponse)ClientHelper.executeWithHeaders((Map)headers, (String)"ml", (Client)this.client, () -> (BulkResponse)this.client.execute((ActionType)BulkAction.INSTANCE, (ActionRequest)bulkRequest).actionGet()));
    }

    private BulkResponse bulkIndexWithRetry(BulkRequest bulkRequest, String jobId, Supplier<Boolean> shouldRetry, Consumer<String> msgHandler, Function<BulkRequest, BulkResponse> actionExecutor) {
        RetryContext retryContext = new RetryContext(jobId, shouldRetry, msgHandler);
        BulkResponse bulkResponse;
        while ((bulkResponse = actionExecutor.apply(bulkRequest)).hasFailures()) {
            for (BulkItemResponse itemResponse : bulkResponse.getItems()) {
                if (!itemResponse.isFailed() || !ResultsPersisterService.isIrrecoverable(itemResponse.getFailure().getCause())) continue;
                Throwable unwrappedParticular = ExceptionsHelper.unwrapCause((Throwable)itemResponse.getFailure().getCause());
                LOGGER.warn((Message)new ParameterizedMessage("[{}] experienced failure that cannot be automatically retried. Bulk failure message [{}]", (Object)jobId, (Object)bulkResponse.buildFailureMessage()), unwrappedParticular);
                throw new ElasticsearchStatusException("{} experienced failure that cannot be automatically retried. See logs for bulk failures", ExceptionsHelper.status((Throwable)unwrappedParticular), unwrappedParticular, new Object[]{jobId});
            }
            retryContext.nextIteration("index", bulkResponse.buildFailureMessage());
            bulkRequest = this.buildNewRequestFromFailures(bulkRequest, bulkResponse);
        }
        return bulkResponse;
    }

    public SearchResponse searchWithRetry(SearchRequest searchRequest, String jobId, Supplier<Boolean> shouldRetry, Consumer<String> msgHandler) {
        RetryContext retryContext = new RetryContext(jobId, shouldRetry, msgHandler);
        while (true) {
            String failureMessage;
            block4: {
                try {
                    SearchResponse searchResponse = (SearchResponse)this.client.search(searchRequest).actionGet();
                    if (RestStatus.OK.equals((Object)searchResponse.status())) {
                        return searchResponse;
                    }
                    failureMessage = searchResponse.status().toString();
                }
                catch (ElasticsearchException e) {
                    LOGGER.warn("[" + jobId + "] Exception while executing search action", (Throwable)e);
                    failureMessage = e.getDetailedMessage();
                    if (!ResultsPersisterService.isIrrecoverable((Exception)((Object)e))) break block4;
                    LOGGER.warn((Message)new ParameterizedMessage("[{}] experienced failure that cannot be automatically retried", (Object)jobId), (Throwable)e);
                    throw new ElasticsearchStatusException("{} experienced failure that cannot be automatically retried", ExceptionsHelper.status((Throwable)e), (Throwable)e, new Object[]{jobId});
                }
            }
            retryContext.nextIteration("search", failureMessage);
        }
    }

    private static boolean isIrrecoverable(Exception ex) {
        Throwable t = ExceptionsHelper.unwrapCause((Throwable)ex);
        return IRRECOVERABLE_REST_STATUSES.contains(ExceptionsHelper.status((Throwable)t));
    }

    private BulkRequest buildNewRequestFromFailures(BulkRequest bulkRequest, BulkResponse bulkResponse) {
        BulkRequest bulkRequestOfFailures = new BulkRequest();
        Set failedDocIds = Arrays.stream(bulkResponse.getItems()).filter(BulkItemResponse::isFailed).map(BulkItemResponse::getId).collect(Collectors.toSet());
        bulkRequest.requests().forEach(docWriteRequest -> {
            if (failedDocIds.contains(docWriteRequest.id())) {
                bulkRequestOfFailures.add(docWriteRequest);
            }
        });
        return bulkRequestOfFailures;
    }

    private class RetryContext {
        final String jobId;
        final Supplier<Boolean> shouldRetry;
        final Consumer<String> msgHandler;
        final Random random = Randomness.get();
        int currentAttempt = 0;
        int currentMin = 50;
        int currentMax = 50;

        RetryContext(String jobId, Supplier<Boolean> shouldRetry, Consumer<String> msgHandler) {
            this.jobId = jobId;
            this.shouldRetry = shouldRetry;
            this.msgHandler = msgHandler;
        }

        void nextIteration(String actionName, String failureMessage) {
            ++this.currentAttempt;
            if (!this.shouldRetry.get().booleanValue()) {
                String msg = new ParameterizedMessage("[{}] should not retry {} after [{}] attempts. {}", new Object[]{this.jobId, actionName, this.currentAttempt, failureMessage}).getFormattedMessage();
                LOGGER.info(msg);
                throw new ElasticsearchException(msg, new Object[0]);
            }
            if (this.currentAttempt > ResultsPersisterService.this.maxFailureRetries) {
                String msg = new ParameterizedMessage("[{}] failed to {} after [{}] attempts. {}", new Object[]{this.jobId, actionName, this.currentAttempt, failureMessage}).getFormattedMessage();
                LOGGER.warn(msg);
                throw new ElasticsearchException(msg, new Object[0]);
            }
            if (this.currentMax < MAX_RETRY_SLEEP_MILLIS) {
                this.currentMin = this.currentMax;
            }
            int uncappedBackoff = ((1 << Math.min(this.currentAttempt, 24)) - 1) * 50;
            this.currentMax = Math.min(uncappedBackoff, MAX_RETRY_SLEEP_MILLIS);
            int randBound = 1 + (this.currentMax - this.currentMin);
            int randSleep = this.currentMin + this.random.nextInt(randBound);
            String msg = new ParameterizedMessage("failed to {} after [{}] attempts. Will attempt again in [{}].", new Object[]{actionName, this.currentAttempt, TimeValue.timeValueMillis((long)randSleep).getStringRep()}).getFormattedMessage();
            LOGGER.warn(() -> new ParameterizedMessage("[{}] {}", (Object)this.jobId, (Object)msg));
            this.msgHandler.accept(msg);
            try {
                ResultsPersisterService.this.sleeper.accept((Object)randSleep);
            }
            catch (InterruptedException interruptedException) {
                LOGGER.warn((Message)new ParameterizedMessage("[{}] failed to {} after [{}] attempts due to interruption", new Object[]{this.jobId, actionName, this.currentAttempt}), (Throwable)interruptedException);
                Thread.currentThread().interrupt();
            }
        }
    }
}

