/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.share;

import com.yammer.metrics.core.Meter;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import kafka.cluster.Partition;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.share.PendingRemoteFetches;
import kafka.server.share.ShareFetchUtils;
import kafka.server.share.SharePartition;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.server.log.remote.storage.RemoteLogManager;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey;
import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.share.fetch.ShareFetchPartitionData;
import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
import org.apache.kafka.storage.internals.log.LogReadResult;
import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.collection.Seq;
import scala.jdk.javaapi.CollectionConverters;
import scala.runtime.BoxedUnit;

public class DelayedShareFetch
extends DelayedOperation {
    private static final Logger log = LoggerFactory.getLogger(DelayedShareFetch.class);
    private static final String EXPIRES_PER_SEC = "ExpiresPerSec";
    private final ShareFetch shareFetch;
    private final ReplicaManager replicaManager;
    private final BiConsumer<SharePartitionKey, Throwable> exceptionHandler;
    private final PartitionMaxBytesStrategy partitionMaxBytesStrategy;
    private final ShareGroupMetrics shareGroupMetrics;
    private final Time time;
    private final LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions;
    private final Meter expiredRequestMeter;
    private final Uuid fetchId;
    private long acquireStartTimeMs;
    private LinkedHashMap<TopicIdPartition, Long> partitionsAcquired;
    private LinkedHashMap<TopicIdPartition, LogReadResult> localPartitionsAlreadyFetched;
    private Optional<PendingRemoteFetches> pendingRemoteFetchesOpt;
    private Optional<Exception> remoteStorageFetchException;
    private final AtomicBoolean outsidePurgatoryCallbackLock;
    private final long remoteFetchMaxWaitMs;

    public DelayedShareFetch(ShareFetch shareFetch, ReplicaManager replicaManager, BiConsumer<SharePartitionKey, Throwable> exceptionHandler, LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions, ShareGroupMetrics shareGroupMetrics, Time time, long remoteFetchMaxWaitMs) {
        this(shareFetch, replicaManager, exceptionHandler, sharePartitions, PartitionMaxBytesStrategy.type((PartitionMaxBytesStrategy.StrategyType)PartitionMaxBytesStrategy.StrategyType.UNIFORM), shareGroupMetrics, time, Optional.empty(), Uuid.randomUuid(), remoteFetchMaxWaitMs);
    }

    DelayedShareFetch(ShareFetch shareFetch, ReplicaManager replicaManager, BiConsumer<SharePartitionKey, Throwable> exceptionHandler, LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions, PartitionMaxBytesStrategy partitionMaxBytesStrategy, ShareGroupMetrics shareGroupMetrics, Time time, Optional<PendingRemoteFetches> pendingRemoteFetchesOpt, Uuid fetchId, long remoteFetchMaxWaitMs) {
        super(shareFetch.fetchParams().maxWaitMs);
        this.shareFetch = shareFetch;
        this.replicaManager = replicaManager;
        this.partitionsAcquired = new LinkedHashMap();
        this.localPartitionsAlreadyFetched = new LinkedHashMap();
        this.exceptionHandler = exceptionHandler;
        this.sharePartitions = sharePartitions;
        this.partitionMaxBytesStrategy = partitionMaxBytesStrategy;
        this.shareGroupMetrics = shareGroupMetrics;
        this.time = time;
        this.acquireStartTimeMs = time.hiResClockMs();
        this.pendingRemoteFetchesOpt = pendingRemoteFetchesOpt;
        this.remoteStorageFetchException = Optional.empty();
        this.fetchId = fetchId;
        this.outsidePurgatoryCallbackLock = new AtomicBoolean(false);
        this.remoteFetchMaxWaitMs = remoteFetchMaxWaitMs;
        KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", "DelayedShareFetchMetrics");
        this.expiredRequestMeter = metricsGroup.newMeter(EXPIRES_PER_SEC, "requests", TimeUnit.SECONDS);
    }

    public void onExpiration() {
        this.expiredRequestMeter.mark();
    }

    public void onComplete() {
        log.trace("Completing the delayed share fetch request for group {}, member {}, topic partitions {}", new Object[]{this.shareFetch.groupId(), this.shareFetch.memberId(), this.partitionsAcquired.keySet()});
        if (this.remoteStorageFetchException.isPresent()) {
            this.completeErroneousRemoteShareFetchRequest();
        } else if (this.pendingRemoteFetchesOpt.isPresent()) {
            if (this.maybeRegisterCallbackPendingRemoteFetch()) {
                log.trace("Registered remote storage fetch callback for group {}, member {}, topic partitions {}", new Object[]{this.shareFetch.groupId(), this.shareFetch.memberId(), this.partitionsAcquired.keySet()});
                return;
            }
            this.completeRemoteStorageShareFetchRequest();
        } else {
            this.completeLocalLogShareFetchRequest();
        }
    }

    private void completeLocalLogShareFetchRequest() {
        LinkedHashMap<TopicIdPartition, Long> topicPartitionData;
        if (this.partitionsAcquired.isEmpty()) {
            topicPartitionData = this.acquirablePartitions(this.sharePartitions);
            this.updateAcquireElapsedTimeMetric();
        } else {
            topicPartitionData = this.partitionsAcquired;
        }
        if (topicPartitionData.isEmpty()) {
            this.shareGroupMetrics.recordTopicPartitionsFetchRatio(this.shareFetch.groupId(), 0L);
            this.shareFetch.maybeComplete(Map.of());
            return;
        }
        double requestTopicToAcquired = (double)topicPartitionData.size() / (double)this.shareFetch.topicIdPartitions().size();
        this.shareGroupMetrics.recordTopicPartitionsFetchRatio(this.shareFetch.groupId(), (long)((int)(requestTopicToAcquired * 100.0)));
        log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", new Object[]{topicPartitionData, this.shareFetch.groupId(), this.shareFetch.fetchParams()});
        this.processAcquiredTopicPartitionsForLocalLogFetch(topicPartitionData);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap<TopicIdPartition, Long> topicPartitionData) {
        try {
            LinkedHashMap<TopicIdPartition, LogReadResult> responseData = this.localPartitionsAlreadyFetched.isEmpty() ? this.readFromLog(topicPartitionData, this.partitionMaxBytesStrategy.maxBytes(this.shareFetch.fetchParams().maxBytes, topicPartitionData.keySet(), topicPartitionData.size())) : this.combineLogReadResponse(topicPartitionData, this.localPartitionsAlreadyFetched);
            this.resetFetchOffsetMetadataForRemoteFetchPartitions(topicPartitionData, responseData);
            ArrayList<ShareFetchPartitionData> shareFetchPartitionDataList = new ArrayList<ShareFetchPartitionData>();
            responseData.forEach((topicIdPartition, logReadResult) -> {
                if (logReadResult.info().delayedRemoteStorageFetch.isEmpty()) {
                    shareFetchPartitionDataList.add(new ShareFetchPartitionData(topicIdPartition, ((Long)topicPartitionData.get(topicIdPartition)).longValue(), logReadResult.toFetchPartitionData(false)));
                }
            });
            this.shareFetch.maybeComplete(ShareFetchUtils.processFetchResponse(this.shareFetch, shareFetchPartitionDataList, this.sharePartitions, this.replicaManager, this.exceptionHandler));
        }
        catch (Exception e) {
            log.error("Error processing delayed share fetch request", (Throwable)e);
            this.handleFetchException(this.shareFetch, topicPartitionData.keySet(), e);
        }
        finally {
            this.releasePartitionLocksAndAddToActionQueue(topicPartitionData.keySet());
        }
    }

    private void resetFetchOffsetMetadataForRemoteFetchPartitions(LinkedHashMap<TopicIdPartition, Long> topicPartitionData, LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse) {
        replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) -> {
            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
                SharePartition sharePartition = this.sharePartitions.get(topicIdPartition);
                sharePartition.updateFetchOffsetMetadata((Long)topicPartitionData.get(topicIdPartition), null);
            }
        });
    }

    public boolean tryComplete() {
        if (this.pendingRemoteFetchesOpt.isPresent()) {
            return this.maybeCompletePendingRemoteFetch();
        }
        LinkedHashMap<TopicIdPartition, Long> topicPartitionData = this.acquirablePartitions(this.sharePartitions);
        try {
            if (!topicPartitionData.isEmpty()) {
                this.updateAcquireElapsedTimeMetric();
                LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse = this.maybeReadFromLog(topicPartitionData);
                LinkedHashMap<TopicIdPartition, LogReadResult> remoteStorageFetchInfoMap = this.maybePrepareRemoteStorageFetchInfo(topicPartitionData, replicaManagerReadResponse);
                if (!remoteStorageFetchInfoMap.isEmpty()) {
                    return this.maybeProcessRemoteFetch(topicPartitionData, remoteStorageFetchInfoMap);
                }
                this.maybeUpdateFetchOffsetMetadata(topicPartitionData, replicaManagerReadResponse);
                if (this.anyPartitionHasLogReadError(replicaManagerReadResponse) || this.isMinBytesSatisfied(topicPartitionData, this.partitionMaxBytesStrategy.maxBytes(this.shareFetch.fetchParams().maxBytes, topicPartitionData.keySet(), topicPartitionData.size()))) {
                    this.partitionsAcquired = topicPartitionData;
                    this.localPartitionsAlreadyFetched = replicaManagerReadResponse;
                    return this.forceComplete();
                }
                log.debug("minBytes is not satisfied for the share fetch request for group {}, member {}, topic partitions {}", new Object[]{this.shareFetch.groupId(), this.shareFetch.memberId(), this.sharePartitions.keySet()});
                this.releasePartitionLocks(topicPartitionData.keySet());
            } else {
                log.trace("Can't acquire any partitions in the share fetch request for group {}, member {}, topic partitions {}", new Object[]{this.shareFetch.groupId(), this.shareFetch.memberId(), this.sharePartitions.keySet()});
            }
            return false;
        }
        catch (Exception e) {
            log.error("Error processing delayed share fetch request", (Throwable)e);
            if (this.remoteStorageFetchException.isEmpty()) {
                this.releasePartitionLocks(topicPartitionData.keySet());
                this.partitionsAcquired.clear();
                this.localPartitionsAlreadyFetched.clear();
            }
            return this.forceComplete();
        }
    }

    LinkedHashMap<TopicIdPartition, Long> acquirablePartitions(LinkedHashMap<TopicIdPartition, SharePartition> sharePartitionsForAcquire) {
        LinkedHashMap<TopicIdPartition, Long> topicPartitionData = new LinkedHashMap<TopicIdPartition, Long>();
        sharePartitionsForAcquire.forEach((topicIdPartition, sharePartition) -> {
            if (sharePartition.maybeAcquireFetchLock(this.fetchId)) {
                try {
                    log.trace("Fetch lock for share partition {}-{} has been acquired by {}", new Object[]{this.shareFetch.groupId(), topicIdPartition, this.fetchId});
                    if (sharePartition.canAcquireRecords()) {
                        topicPartitionData.put((TopicIdPartition)topicIdPartition, sharePartition.nextFetchOffset());
                    } else {
                        sharePartition.releaseFetchLock(this.fetchId);
                        log.trace("Record lock partition limit exceeded for SharePartition {}-{}, cannot acquire more records. Releasing the fetch lock by {}", new Object[]{this.shareFetch.groupId(), topicIdPartition, this.fetchId});
                    }
                }
                catch (Exception e) {
                    log.error("Error checking condition for SharePartition: {}-{}", new Object[]{this.shareFetch.groupId(), topicIdPartition, e});
                    sharePartition.releaseFetchLock(this.fetchId);
                    log.trace("Fetch lock for share partition {}-{} is being released by {}", new Object[]{this.shareFetch.groupId(), topicIdPartition, this.fetchId});
                }
            }
        });
        return topicPartitionData;
    }

    private LinkedHashMap<TopicIdPartition, LogReadResult> maybeReadFromLog(LinkedHashMap<TopicIdPartition, Long> topicPartitionData) {
        LinkedHashMap<TopicIdPartition, Long> partitionsNotMatchingFetchOffsetMetadata = new LinkedHashMap<TopicIdPartition, Long>();
        topicPartitionData.forEach((topicIdPartition, fetchOffset) -> {
            SharePartition sharePartition = this.sharePartitions.get(topicIdPartition);
            if (sharePartition.fetchOffsetMetadata((long)fetchOffset).isEmpty()) {
                partitionsNotMatchingFetchOffsetMetadata.put((TopicIdPartition)topicIdPartition, (Long)fetchOffset);
            }
        });
        if (partitionsNotMatchingFetchOffsetMetadata.isEmpty()) {
            return new LinkedHashMap<TopicIdPartition, LogReadResult>();
        }
        return this.readFromLog(partitionsNotMatchingFetchOffsetMetadata, this.partitionMaxBytesStrategy.maxBytes(this.shareFetch.fetchParams().maxBytes, partitionsNotMatchingFetchOffsetMetadata.keySet(), topicPartitionData.size()));
    }

    private void maybeUpdateFetchOffsetMetadata(LinkedHashMap<TopicIdPartition, Long> topicPartitionData, LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) {
        for (Map.Entry<TopicIdPartition, LogReadResult> entry : replicaManagerReadResponseData.entrySet()) {
            TopicIdPartition topicIdPartition = entry.getKey();
            SharePartition sharePartition = this.sharePartitions.get(topicIdPartition);
            LogReadResult replicaManagerLogReadResult = entry.getValue();
            if (replicaManagerLogReadResult.error().code() != Errors.NONE.code()) {
                log.debug("Replica manager read log result {} errored out for topic partition {}", (Object)replicaManagerLogReadResult, (Object)topicIdPartition);
                continue;
            }
            sharePartition.updateFetchOffsetMetadata(topicPartitionData.get(topicIdPartition), replicaManagerLogReadResult.info().fetchOffsetMetadata);
        }
    }

    private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, Long> topicPartitionData, LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes) {
        long accumulatedSize = 0L;
        for (Map.Entry<TopicIdPartition, Long> entry : topicPartitionData.entrySet()) {
            SharePartition sharePartition;
            Optional<LogOffsetMetadata> optionalFetchOffsetMetadata;
            LogOffsetMetadata endOffsetMetadata;
            TopicIdPartition topicIdPartition = entry.getKey();
            long fetchOffset = entry.getValue();
            try {
                endOffsetMetadata = this.endOffsetMetadataForTopicPartition(topicIdPartition);
            }
            catch (Exception e) {
                this.shareFetch.addErroneous(topicIdPartition, (Throwable)e);
                this.exceptionHandler.accept(new SharePartitionKey(this.shareFetch.groupId(), topicIdPartition), e);
                continue;
            }
            if (endOffsetMetadata == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA || (optionalFetchOffsetMetadata = (sharePartition = this.sharePartitions.get(topicIdPartition)).fetchOffsetMetadata(fetchOffset)).isEmpty() || optionalFetchOffsetMetadata.get() == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) continue;
            LogOffsetMetadata fetchOffsetMetadata = optionalFetchOffsetMetadata.get();
            if (fetchOffsetMetadata.messageOffset > endOffsetMetadata.messageOffset) {
                log.debug("Satisfying delayed share fetch request for group {}, member {} since it is fetching later segments of topicIdPartition {}", new Object[]{this.shareFetch.groupId(), this.shareFetch.memberId(), topicIdPartition});
                return true;
            }
            if (fetchOffsetMetadata.messageOffset >= endOffsetMetadata.messageOffset) continue;
            if (fetchOffsetMetadata.onOlderSegment(endOffsetMetadata)) {
                log.debug("Satisfying delayed share fetch request for group {}, member {} immediately since it is fetching older segments of topicIdPartition {}", new Object[]{this.shareFetch.groupId(), this.shareFetch.memberId(), topicIdPartition});
                return true;
            }
            if (!fetchOffsetMetadata.onSameSegment(endOffsetMetadata)) continue;
            long bytesAvailable = Math.min(endOffsetMetadata.positionDiff(fetchOffsetMetadata), partitionMaxBytes.get(topicIdPartition));
            accumulatedSize += bytesAvailable;
        }
        return accumulatedSize >= (long)this.shareFetch.fetchParams().minBytes;
    }

    private LogOffsetMetadata endOffsetMetadataForTopicPartition(TopicIdPartition topicIdPartition) {
        Partition partition = ShareFetchUtils.partition(this.replicaManager, topicIdPartition.topicPartition());
        LogOffsetSnapshot offsetSnapshot = partition.fetchOffsetSnapshot(Optional.empty(), true);
        FetchIsolation isolationType = this.shareFetch.fetchParams().isolation;
        if (isolationType == FetchIsolation.LOG_END) {
            return offsetSnapshot.logEndOffset();
        }
        if (isolationType == FetchIsolation.HIGH_WATERMARK) {
            return offsetSnapshot.highWatermark();
        }
        return offsetSnapshot.lastStableOffset();
    }

    private LinkedHashMap<TopicIdPartition, LogReadResult> readFromLog(LinkedHashMap<TopicIdPartition, Long> topicPartitionFetchOffsets, LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes) {
        Set partitionsToFetch = this.shareFetch.filterErroneousTopicPartitions(topicPartitionFetchOffsets.keySet());
        if (partitionsToFetch.isEmpty()) {
            return new LinkedHashMap<TopicIdPartition, LogReadResult>();
        }
        LinkedHashMap topicPartitionData = new LinkedHashMap();
        topicPartitionFetchOffsets.forEach((topicIdPartition, fetchOffset) -> topicPartitionData.put(topicIdPartition, new FetchRequest.PartitionData(topicIdPartition.topicId(), fetchOffset.longValue(), 0L, ((Integer)partitionMaxBytes.get(topicIdPartition)).intValue(), Optional.empty())));
        Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = this.replicaManager.readFromLog(this.shareFetch.fetchParams(), (Seq<Tuple2<TopicIdPartition, FetchRequest.PartitionData>>)CollectionConverters.asScala(partitionsToFetch.stream().map(topicIdPartition -> new Tuple2(topicIdPartition, (Object)((FetchRequest.PartitionData)topicPartitionData.get(topicIdPartition)))).collect(Collectors.toList())), QuotaFactory.UNBOUNDED_QUOTA, true);
        LinkedHashMap<TopicIdPartition, LogReadResult> responseData = new LinkedHashMap<TopicIdPartition, LogReadResult>();
        responseLogResult.foreach(tpLogResult -> {
            responseData.put((TopicIdPartition)tpLogResult._1(), (LogReadResult)tpLogResult._2());
            return BoxedUnit.UNIT;
        });
        log.trace("Data successfully retrieved by replica manager: {}", responseData);
        return responseData;
    }

    private boolean anyPartitionHasLogReadError(LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse) {
        return replicaManagerReadResponse.values().stream().anyMatch(logReadResult -> logReadResult.error().code() != Errors.NONE.code());
    }

    private void handleFetchException(ShareFetch shareFetch, Set<TopicIdPartition> topicIdPartitions, Throwable throwable) {
        topicIdPartitions.forEach(topicIdPartition -> this.exceptionHandler.accept(new SharePartitionKey(shareFetch.groupId(), topicIdPartition), throwable));
        shareFetch.maybeCompleteWithException(topicIdPartitions, throwable);
    }

    private void updateAcquireElapsedTimeMetric() {
        long currentTimeMs = this.time.hiResClockMs();
        this.shareGroupMetrics.recordTopicPartitionsAcquireTimeMs(this.shareFetch.groupId(), currentTimeMs - this.acquireStartTimeMs);
        this.acquireStartTimeMs = currentTimeMs;
    }

    LinkedHashMap<TopicIdPartition, LogReadResult> combineLogReadResponse(LinkedHashMap<TopicIdPartition, Long> topicPartitionData, LinkedHashMap<TopicIdPartition, LogReadResult> existingFetchedData) {
        LinkedHashMap<TopicIdPartition, Long> missingLogReadTopicPartitions = new LinkedHashMap<TopicIdPartition, Long>();
        topicPartitionData.forEach((topicIdPartition, fetchOffset) -> {
            if (!existingFetchedData.containsKey(topicIdPartition)) {
                missingLogReadTopicPartitions.put((TopicIdPartition)topicIdPartition, (Long)fetchOffset);
            }
        });
        if (missingLogReadTopicPartitions.isEmpty()) {
            return existingFetchedData;
        }
        LinkedHashMap<TopicIdPartition, LogReadResult> missingTopicPartitionsLogReadResponse = this.readFromLog(missingLogReadTopicPartitions, this.partitionMaxBytesStrategy.maxBytes(this.shareFetch.fetchParams().maxBytes, missingLogReadTopicPartitions.keySet(), topicPartitionData.size()));
        missingTopicPartitionsLogReadResponse.putAll(existingFetchedData);
        return missingTopicPartitionsLogReadResponse;
    }

    void releasePartitionLocks(Set<TopicIdPartition> topicIdPartitions) {
        topicIdPartitions.forEach(tp -> {
            SharePartition sharePartition = this.sharePartitions.get(tp);
            sharePartition.releaseFetchLock(this.fetchId);
            log.trace("Fetch lock for share partition {}-{} is being released by {}", new Object[]{this.shareFetch.groupId(), tp, this.fetchId});
        });
    }

    Lock lock() {
        return this.lock;
    }

    PendingRemoteFetches pendingRemoteFetches() {
        return this.pendingRemoteFetchesOpt.orElse(null);
    }

    boolean outsidePurgatoryCallbackLock() {
        return this.outsidePurgatoryCallbackLock.get();
    }

    void updatePartitionsAcquired(LinkedHashMap<TopicIdPartition, Long> partitionsAcquired) {
        this.partitionsAcquired = partitionsAcquired;
    }

    Meter expiredRequestMeter() {
        return this.expiredRequestMeter;
    }

    private LinkedHashMap<TopicIdPartition, LogReadResult> maybePrepareRemoteStorageFetchInfo(LinkedHashMap<TopicIdPartition, Long> topicPartitionData, LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse) {
        LinkedHashMap<TopicIdPartition, LogReadResult> remoteStorageFetchInfoMap = new LinkedHashMap<TopicIdPartition, LogReadResult>();
        for (Map.Entry<TopicIdPartition, LogReadResult> entry : replicaManagerReadResponse.entrySet()) {
            TopicIdPartition topicIdPartition = entry.getKey();
            LogReadResult logReadResult = entry.getValue();
            if (!logReadResult.info().delayedRemoteStorageFetch.isPresent()) continue;
            remoteStorageFetchInfoMap.put(topicIdPartition, logReadResult);
            this.partitionsAcquired.put(topicIdPartition, topicPartitionData.get(topicIdPartition));
        }
        return remoteStorageFetchInfoMap;
    }

    private boolean maybeProcessRemoteFetch(LinkedHashMap<TopicIdPartition, Long> topicPartitionData, LinkedHashMap<TopicIdPartition, LogReadResult> remoteStorageFetchInfoMap) {
        LinkedHashSet<TopicIdPartition> nonRemoteFetchTopicPartitions = new LinkedHashSet<TopicIdPartition>();
        topicPartitionData.keySet().forEach(topicIdPartition -> {
            if (!remoteStorageFetchInfoMap.containsKey(topicIdPartition)) {
                nonRemoteFetchTopicPartitions.add((TopicIdPartition)topicIdPartition);
            }
        });
        this.releasePartitionLocksAndAddToActionQueue(nonRemoteFetchTopicPartitions);
        this.processRemoteFetchOrException(remoteStorageFetchInfoMap);
        return this.maybeCompletePendingRemoteFetch();
    }

    private boolean maybeRegisterCallbackPendingRemoteFetch() {
        log.trace("Registering callback pending remote fetch");
        PendingRemoteFetches pendingFetch = this.pendingRemoteFetchesOpt.get();
        if (!pendingFetch.isDone() && this.shareFetch.fetchParams().maxWaitMs < this.remoteFetchMaxWaitMs) {
            PendingRemoteFetchTimerTask timerTask = new PendingRemoteFetchTimerTask();
            pendingFetch.invokeCallbackOnCompletion((ignored, throwable) -> {
                timerTask.cancel();
                log.trace("Invoked remote storage fetch callback for group {}, member {}, topic partitions {}", new Object[]{this.shareFetch.groupId(), this.shareFetch.memberId(), this.partitionsAcquired.keySet()});
                if (throwable != null) {
                    log.error("Remote storage fetch failed for group {}, member {}, topic partitions {}", new Object[]{this.shareFetch.groupId(), this.shareFetch.memberId(), this.sharePartitions.keySet(), throwable});
                }
                this.completeRemoteShareFetchRequestOutsidePurgatory();
            });
            this.replicaManager.addShareFetchTimerRequest(timerTask);
            return true;
        }
        return false;
    }

    private void processRemoteFetchOrException(LinkedHashMap<TopicIdPartition, LogReadResult> remoteStorageFetchInfoMap) {
        LinkedHashMap<TopicIdPartition, LogOffsetMetadata> fetchOffsetMetadataMap = new LinkedHashMap<TopicIdPartition, LogOffsetMetadata>();
        remoteStorageFetchInfoMap.forEach((topicIdPartition, logReadResult) -> fetchOffsetMetadataMap.put((TopicIdPartition)topicIdPartition, logReadResult.info().fetchOffsetMetadata));
        ArrayList<PendingRemoteFetches.RemoteFetch> remoteFetches = new ArrayList<PendingRemoteFetches.RemoteFetch>();
        for (Map.Entry<TopicIdPartition, LogReadResult> entry : remoteStorageFetchInfoMap.entrySet()) {
            Future remoteFetchTask;
            TopicIdPartition remoteFetchTopicIdPartition = entry.getKey();
            RemoteStorageFetchInfo remoteStorageFetchInfo = (RemoteStorageFetchInfo)entry.getValue().info().delayedRemoteStorageFetch.get();
            CompletableFuture<RemoteLogReadResult> remoteFetchResult = new CompletableFuture<RemoteLogReadResult>();
            try {
                remoteFetchTask = ((RemoteLogManager)this.replicaManager.remoteLogManager().get()).asyncRead(remoteStorageFetchInfo, result -> {
                    remoteFetchResult.complete((RemoteLogReadResult)result);
                    this.replicaManager.completeDelayedShareFetchRequest((DelayedShareFetchKey)new DelayedShareFetchGroupKey(this.shareFetch.groupId(), remoteFetchTopicIdPartition.topicId(), remoteFetchTopicIdPartition.partition()));
                });
            }
            catch (Exception e) {
                remoteFetches.forEach(this::cancelRemoteFetchTask);
                this.remoteStorageFetchException = Optional.of(e);
                throw e;
            }
            remoteFetches.add(new PendingRemoteFetches.RemoteFetch(remoteFetchTopicIdPartition, entry.getValue(), remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo));
        }
        this.pendingRemoteFetchesOpt = Optional.of(new PendingRemoteFetches(remoteFetches, fetchOffsetMetadataMap));
    }

    private boolean maybeCompletePendingRemoteFetch() {
        boolean canComplete = false;
        for (TopicIdPartition topicIdPartition : this.pendingRemoteFetchesOpt.get().fetchOffsetMetadataMap().keySet()) {
            try {
                Partition partition = this.replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
                if (!partition.isLeader()) {
                    throw new NotLeaderException("Broker is no longer the leader of topicPartition: " + String.valueOf(topicIdPartition));
                }
            }
            catch (KafkaStorageException e) {
                log.debug("TopicPartition {} is in an offline log directory, satisfy {} immediately", (Object)topicIdPartition, (Object)this.shareFetch.fetchParams());
                canComplete = true;
            }
            catch (UnknownTopicOrPartitionException e) {
                log.debug("Broker no longer knows of topicPartition {}, satisfy {} immediately", (Object)topicIdPartition, (Object)this.shareFetch.fetchParams());
                canComplete = true;
            }
            catch (NotLeaderException e) {
                log.debug("Broker is no longer the leader of topicPartition {}, satisfy {} immediately", (Object)topicIdPartition, (Object)this.shareFetch.fetchParams());
                canComplete = true;
            }
            catch (NotLeaderOrFollowerException e) {
                log.debug("Broker is no longer the leader or follower of topicPartition {}, satisfy {} immediately", (Object)topicIdPartition, (Object)this.shareFetch.fetchParams());
                canComplete = true;
            }
            if (!canComplete) continue;
            break;
        }
        if (canComplete || this.pendingRemoteFetchesOpt.get().isDone()) {
            return this.forceComplete();
        }
        return false;
    }

    private void completeErroneousRemoteShareFetchRequest() {
        try {
            this.handleFetchException(this.shareFetch, this.partitionsAcquired.keySet(), this.remoteStorageFetchException.get());
        }
        finally {
            this.releasePartitionLocksAndAddToActionQueue(this.partitionsAcquired.keySet());
        }
    }

    private void releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> topicIdPartitions) {
        if (topicIdPartitions.isEmpty()) {
            return;
        }
        this.releasePartitionLocks(topicIdPartitions);
        this.replicaManager.addToActionQueue(() -> topicIdPartitions.forEach(topicIdPartition -> {
            this.replicaManager.completeDelayedShareFetchRequest((DelayedShareFetchKey)new DelayedShareFetchGroupKey(this.shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition()));
            this.replicaManager.completeDelayedShareFetchRequest((DelayedShareFetchKey)new DelayedShareFetchPartitionKey(topicIdPartition));
        }));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void completeRemoteStorageShareFetchRequest() {
        LinkedHashMap<Object, Object> acquiredNonRemoteFetchTopicPartitionData = new LinkedHashMap();
        try {
            double acquiredRatio;
            ArrayList<ShareFetchPartitionData> shareFetchPartitionData = new ArrayList<ShareFetchPartitionData>();
            int readableBytes = 0;
            for (PendingRemoteFetches.RemoteFetch remoteFetch : this.pendingRemoteFetchesOpt.get().remoteFetches()) {
                if (remoteFetch.remoteFetchResult().isDone()) {
                    RemoteLogReadResult remoteLogReadResult = remoteFetch.remoteFetchResult().get();
                    if (remoteLogReadResult.error().isPresent()) {
                        shareFetchPartitionData.add(new ShareFetchPartitionData(remoteFetch.topicIdPartition(), this.partitionsAcquired.get(remoteFetch.topicIdPartition()).longValue(), new LogReadResult(Errors.forException((Throwable)((Throwable)remoteLogReadResult.error().get()))).toFetchPartitionData(false)));
                        continue;
                    }
                    FetchDataInfo fetchDataInfo = (FetchDataInfo)remoteLogReadResult.fetchDataInfo().get();
                    TopicIdPartition topicIdPartition2 = remoteFetch.topicIdPartition();
                    LogReadResult logReadResult = remoteFetch.logReadResult();
                    shareFetchPartitionData.add(new ShareFetchPartitionData(topicIdPartition2, this.partitionsAcquired.get(remoteFetch.topicIdPartition()).longValue(), new FetchPartitionData(logReadResult.error(), logReadResult.highWatermark(), logReadResult.leaderLogStartOffset(), fetchDataInfo.records, Optional.empty(), logReadResult.lastStableOffset().isPresent() ? OptionalLong.of(logReadResult.lastStableOffset().getAsLong()) : OptionalLong.empty(), fetchDataInfo.abortedTransactions, logReadResult.preferredReadReplica().isPresent() ? OptionalInt.of(logReadResult.preferredReadReplica().getAsInt()) : OptionalInt.empty(), false)));
                    readableBytes += fetchDataInfo.records.sizeInBytes();
                    continue;
                }
                this.cancelRemoteFetchTask(remoteFetch);
            }
            if (readableBytes < this.shareFetch.fetchParams().maxBytes) {
                LinkedHashMap<TopicIdPartition, SharePartition> nonRemoteFetchSharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
                this.sharePartitions.forEach((topicIdPartition, sharePartition) -> {
                    if (!this.partitionsAcquired.containsKey(topicIdPartition)) {
                        nonRemoteFetchSharePartitions.put((TopicIdPartition)topicIdPartition, (SharePartition)sharePartition);
                    }
                });
                acquiredNonRemoteFetchTopicPartitionData = this.acquirablePartitions(nonRemoteFetchSharePartitions);
                if (!acquiredNonRemoteFetchTopicPartitionData.isEmpty()) {
                    log.trace("Fetchable local share partitions for a remote share fetch request data: {} with groupId: {} fetch params: {}", new Object[]{acquiredNonRemoteFetchTopicPartitionData, this.shareFetch.groupId(), this.shareFetch.fetchParams()});
                    LinkedHashMap<TopicIdPartition, LogReadResult> responseData = this.readFromLog(acquiredNonRemoteFetchTopicPartitionData, this.partitionMaxBytesStrategy.maxBytes(this.shareFetch.fetchParams().maxBytes - readableBytes, acquiredNonRemoteFetchTopicPartitionData.keySet(), acquiredNonRemoteFetchTopicPartitionData.size()));
                    this.resetFetchOffsetMetadataForRemoteFetchPartitions(acquiredNonRemoteFetchTopicPartitionData, responseData);
                    for (Map.Entry entry : responseData.entrySet()) {
                        if (!((LogReadResult)entry.getValue()).info().delayedRemoteStorageFetch.isEmpty()) continue;
                        shareFetchPartitionData.add(new ShareFetchPartitionData((TopicIdPartition)entry.getKey(), ((Long)acquiredNonRemoteFetchTopicPartitionData.get(entry.getKey())).longValue(), ((LogReadResult)entry.getValue()).toFetchPartitionData(false)));
                    }
                }
            }
            if ((acquiredRatio = (double)(this.partitionsAcquired.size() + acquiredNonRemoteFetchTopicPartitionData.size()) / (double)this.shareFetch.topicIdPartitions().size()) > 0.0) {
                this.shareGroupMetrics.recordTopicPartitionsFetchRatio(this.shareFetch.groupId(), (long)((int)(acquiredRatio * 100.0)));
            }
            Map<TopicIdPartition, ShareFetchResponseData.PartitionData> remoteFetchResponse = ShareFetchUtils.processFetchResponse(this.shareFetch, shareFetchPartitionData, this.sharePartitions, this.replicaManager, this.exceptionHandler);
            this.shareFetch.maybeComplete(remoteFetchResponse);
            log.trace("Remote share fetch request completed successfully, response: {}", remoteFetchResponse);
        }
        catch (InterruptedException | ExecutionException e) {
            log.error("Exception occurred in completing remote fetch {} for delayed share fetch request {}", (Object)this.pendingRemoteFetchesOpt.get(), (Object)e);
            this.handleExceptionInCompletingRemoteStorageShareFetchRequest(acquiredNonRemoteFetchTopicPartitionData.keySet(), e);
        }
        catch (Exception e) {
            log.error("Unexpected error in processing delayed share fetch request", (Throwable)e);
            this.handleExceptionInCompletingRemoteStorageShareFetchRequest(acquiredNonRemoteFetchTopicPartitionData.keySet(), e);
        }
        finally {
            LinkedHashSet<TopicIdPartition> topicIdPartitions = new LinkedHashSet<TopicIdPartition>(this.partitionsAcquired.keySet());
            topicIdPartitions.addAll(acquiredNonRemoteFetchTopicPartitionData.keySet());
            this.releasePartitionLocksAndAddToActionQueue(topicIdPartitions);
        }
    }

    private void handleExceptionInCompletingRemoteStorageShareFetchRequest(Set<TopicIdPartition> acquiredNonRemoteFetchTopicPartitions, Exception e) {
        LinkedHashSet<TopicIdPartition> topicIdPartitions = new LinkedHashSet<TopicIdPartition>(this.partitionsAcquired.keySet());
        topicIdPartitions.addAll(acquiredNonRemoteFetchTopicPartitions);
        this.handleFetchException(this.shareFetch, topicIdPartitions, e);
    }

    private void cancelRemoteFetchTask(PendingRemoteFetches.RemoteFetch remoteFetch) {
        boolean cancelled = remoteFetch.remoteFetchTask().cancel(false);
        if (!cancelled) {
            log.debug("Remote fetch task for RemoteStorageFetchInfo: {} could not be cancelled and its isDone value is {}", (Object)remoteFetch.remoteFetchInfo(), (Object)remoteFetch.remoteFetchTask().isDone());
        }
    }

    private void completeRemoteShareFetchRequestOutsidePurgatory() {
        if (this.outsidePurgatoryCallbackLock.compareAndSet(false, true)) {
            this.completeRemoteStorageShareFetchRequest();
        }
    }

    private class PendingRemoteFetchTimerTask
    extends TimerTask {
        public PendingRemoteFetchTimerTask() {
            super(DelayedShareFetch.this.remoteFetchMaxWaitMs - DelayedShareFetch.this.shareFetch.fetchParams().maxWaitMs);
        }

        public void run() {
            log.trace("Expired remote storage fetch callback for group {}, member {}, topic partitions {}", new Object[]{DelayedShareFetch.this.shareFetch.groupId(), DelayedShareFetch.this.shareFetch.memberId(), DelayedShareFetch.this.partitionsAcquired.keySet()});
            DelayedShareFetch.this.expiredRequestMeter.mark();
            DelayedShareFetch.this.completeRemoteShareFetchRequestOutsidePurgatory();
        }
    }
}

