/*
 * Decompiled with CFR 0.152.
 */
package org.logstash.execution;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.config.ir.CompiledPipeline;
import org.logstash.execution.QueueBatch;
import org.logstash.execution.QueueReadClient;

public final class WorkerLoop
implements Runnable {
    private static final Logger LOGGER = LogManager.getLogger(WorkerLoop.class);
    private final CompiledPipeline.CompiledExecution execution;
    private final QueueReadClient readClient;
    private final AtomicBoolean flushRequested;
    private final AtomicBoolean flushing;
    private final AtomicBoolean shutdownRequested;
    private final LongAdder consumedCounter;
    private final LongAdder filteredCounter;
    private final boolean drainQueue;
    private final boolean preserveEventOrder;

    public WorkerLoop(CompiledPipeline pipeline, QueueReadClient readClient, LongAdder filteredCounter, LongAdder consumedCounter, AtomicBoolean flushRequested, AtomicBoolean flushing, AtomicBoolean shutdownRequested, boolean drainQueue, boolean preserveEventOrder) {
        this.consumedCounter = consumedCounter;
        this.filteredCounter = filteredCounter;
        this.execution = pipeline.buildExecution(preserveEventOrder);
        this.drainQueue = drainQueue;
        this.readClient = readClient;
        this.flushRequested = flushRequested;
        this.flushing = flushing;
        this.shutdownRequested = shutdownRequested;
        this.preserveEventOrder = preserveEventOrder;
    }

    @Override
    public void run() {
        try {
            QueueBatch batch;
            boolean isShutdown = false;
            do {
                isShutdown = isShutdown || this.shutdownRequested.get();
                batch = this.readClient.readBatch();
                boolean isFlush = this.flushRequested.compareAndSet(true, false);
                if (batch.filteredSize() <= 0 && !isFlush) continue;
                this.consumedCounter.add(batch.filteredSize());
                this.readClient.startMetrics(batch);
                this.execution.compute(batch, isFlush, false);
                int filteredCount = batch.filteredSize();
                this.filteredCounter.add(filteredCount);
                this.readClient.addOutputMetrics(filteredCount);
                this.readClient.addFilteredMetrics(filteredCount);
                this.readClient.closeBatch(batch);
                if (!isFlush) continue;
                this.flushing.set(false);
            } while (!isShutdown || this.isDraining());
            batch = this.readClient.newBatch();
            this.readClient.startMetrics(batch);
            this.execution.compute(batch, true, true);
            this.readClient.closeBatch(batch);
        }
        catch (Exception ex) {
            throw new IllegalStateException(ex);
        }
    }

    private boolean isDraining() {
        return this.drainQueue && !this.readClient.isEmpty();
    }
}

