package com.starrocks.connector.flink.manager;

import com.starrocks.data.load.stream.StreamLoadStrategy;
import com.starrocks.data.load.stream.TableRegion;
import com.starrocks.data.load.stream.properties.StreamLoadProperties;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/starrocks/connector/flink/manager/FlushAndCommitStrategy.class */
public class FlushAndCommitStrategy implements StreamLoadStrategy {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(FlushAndCommitStrategy.class);
    private final long expectDelayTime;
    private final long scanFrequency;
    private final long ageThreshold;
    private final long maxCacheBytes;
    private final boolean enableAutoCommit;
    private final AtomicLong numAgeTriggerFlush = new AtomicLong(0);
    private final AtomicLong numForceTriggerFlush = new AtomicLong(0);

    public FlushAndCommitStrategy(StreamLoadProperties streamLoadProperties, boolean z) {
        this.expectDelayTime = streamLoadProperties.getExpectDelayTime();
        this.scanFrequency = streamLoadProperties.getScanningFrequency();
        this.ageThreshold = this.expectDelayTime / this.scanFrequency;
        this.maxCacheBytes = streamLoadProperties.getMaxCacheBytes();
        this.enableAutoCommit = z;
        LOG.info("{}", this);
    }

    @Override // com.starrocks.data.load.stream.StreamLoadStrategy
    public List<TableRegion> select(Iterable<TableRegion> iterable) {
        throw new UnsupportedOperationException();
    }

    public List<TableRegion> selectFlushRegions(Queue<TableRegion> queue, long j) {
        ArrayList arrayList = new ArrayList();
        for (TableRegion tableRegion : queue) {
            if (shouldCommit(tableRegion)) {
                this.numAgeTriggerFlush.getAndIncrement();
                arrayList.add(tableRegion);
                LOG.debug("Choose region {} to flush because the region should commit, age: {}, threshold: {}, scanFreq: {}, expectDelayTime: {}", new Object[]{tableRegion.getUniqueKey(), Long.valueOf(tableRegion.getAge()), Long.valueOf(this.ageThreshold), Long.valueOf(this.scanFrequency), Long.valueOf(this.expectDelayTime)});
            }
        }
        if (arrayList.isEmpty() && j >= this.maxCacheBytes) {
            Optional<TableRegion> max = queue.stream().max(Comparator.comparingLong((v0) -> {
                return v0.getCacheBytes();
            }));
            arrayList.getClass();
            max.ifPresent((v1) -> {
                r1.add(v1);
            });
            if (!arrayList.isEmpty()) {
                this.numForceTriggerFlush.getAndIncrement();
                TableRegion tableRegion2 = (TableRegion) arrayList.get(0);
                LOG.debug("Choose region {} to flush because it's force flush, age: {}, threshold: {}, scanFreq: {}, expectDelayTime: {}", new Object[]{tableRegion2.getUniqueKey(), Long.valueOf(tableRegion2.getAge()), Long.valueOf(this.ageThreshold), Long.valueOf(this.scanFrequency), Long.valueOf(this.expectDelayTime)});
            }
        }
        return arrayList;
    }

    public boolean shouldCommit(TableRegion tableRegion) {
        return this.enableAutoCommit && tableRegion.getAge() > this.ageThreshold;
    }

    public String toString() {
        return "FlushAndCommitStrategy{expectDelayTime=" + this.expectDelayTime + ", scanFrequency=" + this.scanFrequency + ", ageThreshold=" + this.ageThreshold + ", maxCacheBytes=" + this.maxCacheBytes + ", enableAutoCommit=" + this.enableAutoCommit + ", numAgeTriggerFlush=" + this.numAgeTriggerFlush + ", numForceTriggerFlush=" + this.numForceTriggerFlush + '}';
    }
}
