/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.snapshots;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.IndexCommit;
import org.opensearch.Version;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.SnapshotsInProgress;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.index.snapshots.IndexShardSnapshotFailedException;
import org.opensearch.index.IndexService;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.indices.IndicesService;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.snapshots.AbortedSnapshotException;
import org.opensearch.snapshots.Snapshot;
import org.opensearch.snapshots.UpdateIndexShardSnapshotStatusRequest;
import org.opensearch.snapshots.UpdateIndexShardSnapshotStatusResponse;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestDeduplicator;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

public class SnapshotShardsService
extends AbstractLifecycleComponent
implements ClusterStateListener,
IndexEventListener {
    private static final Logger logger = LogManager.getLogger(SnapshotShardsService.class);
    private final ClusterService clusterService;
    private final IndicesService indicesService;
    private final RepositoriesService repositoriesService;
    private final TransportService transportService;
    private final ThreadPool threadPool;
    private final Map<Snapshot, Map<ShardId, IndexShardSnapshotStatus>> shardSnapshots = new HashMap<Snapshot, Map<ShardId, IndexShardSnapshotStatus>>();
    private final TransportRequestDeduplicator<UpdateIndexShardSnapshotStatusRequest> remoteFailedRequestDeduplicator = new TransportRequestDeduplicator();

    public SnapshotShardsService(Settings settings, ClusterService clusterService, RepositoriesService repositoriesService, TransportService transportService, IndicesService indicesService) {
        this.indicesService = indicesService;
        this.repositoriesService = repositoriesService;
        this.transportService = transportService;
        this.clusterService = clusterService;
        this.threadPool = transportService.getThreadPool();
        if (DiscoveryNode.isDataNode(settings)) {
            clusterService.addListener(this);
        }
    }

    protected void doStart() {
    }

    protected void doStop() {
    }

    protected void doClose() {
        this.clusterService.removeListener(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void clusterChanged(ClusterChangedEvent event) {
        try {
            SnapshotsInProgress previousSnapshots = event.previousState().custom("snapshots", SnapshotsInProgress.EMPTY);
            SnapshotsInProgress currentSnapshots = event.state().custom("snapshots", SnapshotsInProgress.EMPTY);
            if (!previousSnapshots.equals(currentSnapshots)) {
                Map<Snapshot, Map<ShardId, IndexShardSnapshotStatus>> map = this.shardSnapshots;
                synchronized (map) {
                    this.cancelRemoved(currentSnapshots);
                    this.startNewSnapshots(currentSnapshots);
                }
            }
            String previousClusterManagerNodeId = event.previousState().nodes().getClusterManagerNodeId();
            String currentMasterNodeId = event.state().nodes().getClusterManagerNodeId();
            if (currentMasterNodeId != null && !currentMasterNodeId.equals(previousClusterManagerNodeId)) {
                this.syncShardStatsOnNewMaster(event);
            }
        }
        catch (Exception e) {
            assert (false) : new AssertionError((Object)e);
            logger.warn("Failed to update snapshot state ", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
        Map<Snapshot, Map<ShardId, IndexShardSnapshotStatus>> map = this.shardSnapshots;
        synchronized (map) {
            for (Map.Entry<Snapshot, Map<ShardId, IndexShardSnapshotStatus>> snapshotShards : this.shardSnapshots.entrySet()) {
                Map<ShardId, IndexShardSnapshotStatus> shards = snapshotShards.getValue();
                if (!shards.containsKey(shardId)) continue;
                logger.debug("[{}] shard closing, abort snapshotting for snapshot [{}]", (Object)shardId, (Object)snapshotShards.getKey().getSnapshotId());
                shards.get(shardId).abortIfNotCompleted("shard is closing, aborting");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<ShardId, IndexShardSnapshotStatus> currentSnapshotShards(Snapshot snapshot) {
        Map<Snapshot, Map<ShardId, IndexShardSnapshotStatus>> map = this.shardSnapshots;
        synchronized (map) {
            Map<ShardId, IndexShardSnapshotStatus> current = this.shardSnapshots.get(snapshot);
            return current == null ? null : new HashMap<ShardId, IndexShardSnapshotStatus>(current);
        }
    }

    private void cancelRemoved(SnapshotsInProgress snapshotsInProgress) {
        Iterator<Map.Entry<Snapshot, Map<ShardId, IndexShardSnapshotStatus>>> it = this.shardSnapshots.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Snapshot, Map<ShardId, IndexShardSnapshotStatus>> entry = it.next();
            Snapshot snapshot = entry.getKey();
            if (snapshotsInProgress.snapshot(snapshot) != null) continue;
            it.remove();
            for (IndexShardSnapshotStatus snapshotStatus : entry.getValue().values()) {
                snapshotStatus.abortIfNotCompleted("snapshot has been removed in cluster state, aborting");
            }
        }
    }

    private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) {
        String localNodeId = this.clusterService.localNode().getId();
        for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
            SnapshotsInProgress.State entryState = entry.state();
            if (entry.isClone()) continue;
            if (entryState == SnapshotsInProgress.State.STARTED) {
                HashMap<ShardId, IndexShardSnapshotStatus> startedShards = null;
                Snapshot snapshot = entry.snapshot();
                Map snapshotShards = this.shardSnapshots.getOrDefault(snapshot, Collections.emptyMap());
                for (Map.Entry<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : entry.shards().entrySet()) {
                    ShardId shardId = shard.getKey();
                    SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus = shard.getValue();
                    if (shardSnapshotStatus.state() != SnapshotsInProgress.ShardState.INIT || !localNodeId.equals(shardSnapshotStatus.nodeId()) || snapshotShards.containsKey(shardId)) continue;
                    logger.trace("[{}] - Adding shard to the queue", (Object)shardId);
                    if (startedShards == null) {
                        startedShards = new HashMap<ShardId, IndexShardSnapshotStatus>();
                    }
                    startedShards.put(shardId, IndexShardSnapshotStatus.newInitializing(shardSnapshotStatus.generation()));
                }
                if (startedShards == null || startedShards.isEmpty()) continue;
                this.shardSnapshots.computeIfAbsent(snapshot, s -> new HashMap()).putAll(startedShards);
                this.startNewShards(entry, (Map<ShardId, IndexShardSnapshotStatus>)startedShards);
                continue;
            }
            if (entryState != SnapshotsInProgress.State.ABORTED) continue;
            Snapshot snapshot = entry.snapshot();
            Map snapshotShards = this.shardSnapshots.getOrDefault(snapshot, Collections.emptyMap());
            for (Map.Entry<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : entry.shards().entrySet()) {
                IndexShardSnapshotStatus snapshotStatus = (IndexShardSnapshotStatus)snapshotShards.get(shard.getKey());
                if (snapshotStatus == null) {
                    if (shard.getValue().state() != SnapshotsInProgress.ShardState.ABORTED || !localNodeId.equals(shard.getValue().nodeId())) continue;
                    this.notifyFailedSnapshotShard(snapshot, shard.getKey(), shard.getValue().reason());
                    continue;
                }
                snapshotStatus.abortIfNotCompleted("snapshot has been aborted");
            }
        }
    }

    private void startNewShards(SnapshotsInProgress.Entry entry, Map<ShardId, IndexShardSnapshotStatus> startedShards) {
        this.threadPool.executor("snapshot").execute(() -> {
            final Snapshot snapshot = entry.snapshot();
            Map indicesMap = entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity()));
            for (Map.Entry shardEntry : startedShards.entrySet()) {
                final ShardId shardId = (ShardId)shardEntry.getKey();
                final IndexShardSnapshotStatus snapshotStatus = (IndexShardSnapshotStatus)shardEntry.getValue();
                IndexId indexId = (IndexId)indicesMap.get(shardId.getIndexName());
                assert (indexId != null);
                this.snapshot(shardId, snapshot, indexId, entry.userMetadata(), snapshotStatus, entry.version(), entry.remoteStoreIndexShallowCopy(), new ActionListener<String>(){

                    public void onResponse(String newGeneration) {
                        assert (newGeneration != null);
                        assert (newGeneration.equals(snapshotStatus.generation()));
                        if (logger.isDebugEnabled()) {
                            IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
                            logger.debug("snapshot [{}] completed to [{}] with [{}] at generation [{}]", (Object)snapshot, (Object)snapshot.getRepository(), (Object)lastSnapshotStatus, (Object)snapshotStatus.generation());
                        }
                        SnapshotShardsService.this.notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration);
                    }

                    public void onFailure(Exception e) {
                        String failure;
                        if (e instanceof AbortedSnapshotException) {
                            failure = "aborted";
                            logger.debug(() -> new ParameterizedMessage("[{}][{}] aborted shard snapshot", (Object)shardId, (Object)snapshot), (Throwable)e);
                        } else {
                            failure = SnapshotShardsService.summarizeFailure(e);
                            logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", (Object)shardId, (Object)snapshot), (Throwable)e);
                        }
                        snapshotStatus.moveToFailed(SnapshotShardsService.this.threadPool.absoluteTimeInMillis(), failure);
                        SnapshotShardsService.this.notifyFailedSnapshotShard(snapshot, shardId, failure);
                    }
                });
            }
        });
    }

    private boolean isRemoteSnapshot(ShardId shardId) {
        IndexShard shard;
        IndexService indexService = this.indicesService.indexService(shardId.getIndex());
        if (indexService != null && (shard = indexService.getShardOrNull(shardId.id())) != null) {
            return shard.isRemoteSnapshot();
        }
        return false;
    }

    static String summarizeFailure(Throwable t) {
        if (t.getCause() == null) {
            return t.getClass().getSimpleName() + "[" + t.getMessage() + "]";
        }
        StringBuilder sb = new StringBuilder();
        while (t != null) {
            sb.append(t.getClass().getSimpleName());
            if (t.getMessage() != null) {
                sb.append("[");
                sb.append(t.getMessage());
                sb.append("]");
            }
            if ((t = t.getCause()) == null) continue;
            sb.append("; nested: ");
        }
        return sb.toString();
    }

    private void snapshot(ShardId shardId, Snapshot snapshot, IndexId indexId, Map<String, Object> userMetadata, IndexShardSnapshotStatus snapshotStatus, Version version, boolean remoteStoreIndexShallowCopy, ActionListener<String> listener) {
        block16: {
            try {
                boolean closedIndex;
                IndexService indexService = this.indicesService.indexServiceSafe(shardId.getIndex());
                IndexShard indexShard = indexService.getShardOrNull(shardId.id());
                boolean bl = closedIndex = indexService.getMetadata().getState() == IndexMetadata.State.CLOSE;
                if (!indexShard.routingEntry().primary()) {
                    throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary");
                }
                if (indexShard.indexSettings().isSegRepEnabledOrRemoteNode() && !indexShard.isPrimaryMode()) {
                    throw new IndexShardSnapshotFailedException(shardId, "snapshot triggered on a new primary following failover and cannot proceed until promotion is complete");
                }
                if (indexShard.routingEntry().relocating()) {
                    throw new IndexShardSnapshotFailedException(shardId, "cannot snapshot while relocating");
                }
                IndexShardState indexShardState = indexShard.state();
                if (indexShardState == IndexShardState.CREATED || indexShardState == IndexShardState.RECOVERING) {
                    throw new IndexShardSnapshotFailedException(shardId, "shard didn't fully recover yet");
                }
                Repository repository = this.repositoriesService.repository(snapshot.getRepository());
                GatedCloseable<IndexCommit> wrappedSnapshot = null;
                try {
                    if (remoteStoreIndexShallowCopy && indexShard.indexSettings().isRemoteStoreEnabled()) {
                        long startTime = this.threadPool.relativeTimeInMillis();
                        long primaryTerm = indexShard.getOperationPrimaryTerm();
                        long commitGeneration = 0L;
                        Map<String, Long> indexFilesToFileLengthMap = null;
                        IndexCommit snapshotIndexCommit = null;
                        try {
                            if (closedIndex) {
                                RemoteSegmentMetadata lastRemoteUploadedIndexCommit = indexShard.fetchLastRemoteUploadedSegmentMetadata();
                                indexFilesToFileLengthMap = lastRemoteUploadedIndexCommit.getMetadata().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> ((RemoteSegmentStoreDirectory.UploadedSegmentMetadata)entry.getValue()).getLength()));
                                primaryTerm = lastRemoteUploadedIndexCommit.getPrimaryTerm();
                                commitGeneration = lastRemoteUploadedIndexCommit.getGeneration();
                            } else {
                                wrappedSnapshot = indexShard.acquireLastIndexCommitAndRefresh(true);
                                snapshotIndexCommit = wrappedSnapshot.get();
                                commitGeneration = snapshotIndexCommit.getGeneration();
                            }
                            indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration);
                        }
                        catch (IOException e) {
                            if (closedIndex) {
                                logger.warn("Exception while reading latest metadata file from remote store");
                                listener.onFailure((Exception)e);
                            }
                            wrappedSnapshot.close();
                            logger.warn("Exception while acquiring lock on primaryTerm = {} and generation = {}", (Object)primaryTerm, (Object)commitGeneration);
                            indexShard.flush(new FlushRequest(shardId.getIndexName()).force(true));
                            wrappedSnapshot = indexShard.acquireLastIndexCommit(false);
                            snapshotIndexCommit = wrappedSnapshot.get();
                            commitGeneration = snapshotIndexCommit.getGeneration();
                            indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration);
                        }
                        try {
                            repository.snapshotRemoteStoreIndexShard(indexShard.store(), snapshot.getSnapshotId(), indexId, snapshotIndexCommit, null, snapshotStatus, primaryTerm, commitGeneration, startTime, indexFilesToFileLengthMap, closedIndex ? listener : ActionListener.runBefore(listener, wrappedSnapshot::close));
                        }
                        catch (IndexShardSnapshotFailedException e) {
                            logger.error("Shallow Copy Snapshot Failed for Shard [" + indexId.getName() + "][" + shardId.getId() + "] for snapshot " + String.valueOf(snapshot.getSnapshotId()) + ", releasing acquired lock from remote store");
                            indexShard.releaseLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration);
                            throw e;
                        }
                        long endTime = this.threadPool.relativeTimeInMillis();
                        logger.debug("Time taken (in milliseconds) to complete shallow copy snapshot, for index " + indexId.getName() + ", shard " + shardId.getId() + " and snapshot " + String.valueOf(snapshot.getSnapshotId()) + " is " + (endTime - startTime));
                        break block16;
                    }
                    wrappedSnapshot = indexShard.acquireLastIndexCommit(true);
                    IndexCommit snapshotIndexCommit = wrappedSnapshot.get();
                    repository.snapshotShard(indexShard.store(), indexShard.mapperService(), snapshot.getSnapshotId(), indexId, wrappedSnapshot.get(), SnapshotShardsService.getShardStateId(indexShard, snapshotIndexCommit), snapshotStatus, version, userMetadata, (ActionListener<String>)ActionListener.runBefore(listener, wrappedSnapshot::close));
                }
                catch (Exception e) {
                    IOUtils.close(wrappedSnapshot);
                    throw e;
                }
            }
            catch (Exception e) {
                listener.onFailure(e);
            }
        }
    }

    @Nullable
    private static String getShardStateId(IndexShard indexShard, IndexCommit snapshotIndexCommit) throws IOException {
        Map userCommitData = snapshotIndexCommit.getUserData();
        SequenceNumbers.CommitInfo seqNumInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userCommitData.entrySet());
        long maxSeqNo = seqNumInfo.maxSeqNo;
        if (maxSeqNo != seqNumInfo.localCheckpoint || maxSeqNo != indexShard.getLastSyncedGlobalCheckpoint()) {
            return null;
        }
        return (String)userCommitData.get("history_uuid") + "-" + userCommitData.getOrDefault("force_merge_uuid", "na") + "-" + maxSeqNo;
    }

    private void syncShardStatsOnNewMaster(ClusterChangedEvent event) {
        SnapshotsInProgress snapshotsInProgress = (SnapshotsInProgress)event.state().custom("snapshots");
        if (snapshotsInProgress == null) {
            return;
        }
        this.remoteFailedRequestDeduplicator.clear();
        for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) {
            Map<ShardId, IndexShardSnapshotStatus> localShards;
            if (snapshot.state() != SnapshotsInProgress.State.STARTED && snapshot.state() != SnapshotsInProgress.State.ABORTED || (localShards = this.currentSnapshotShards(snapshot.snapshot())) == null) continue;
            Map<ShardId, SnapshotsInProgress.ShardSnapshotStatus> masterShards = snapshot.shards();
            for (Map.Entry<ShardId, IndexShardSnapshotStatus> localShard : localShards.entrySet()) {
                ShardId shardId = localShard.getKey();
                SnapshotsInProgress.ShardSnapshotStatus masterShard = masterShards.get(shardId);
                if (masterShard == null || masterShard.state().completed()) continue;
                IndexShardSnapshotStatus.Copy indexShardSnapshotStatus = localShard.getValue().asCopy();
                IndexShardSnapshotStatus.Stage stage = indexShardSnapshotStatus.getStage();
                if (stage == IndexShardSnapshotStatus.Stage.DONE) {
                    logger.debug("[{}] new cluster-manager thinks the shard [{}] is not completed but the shard is done locally, updating status on the master", (Object)snapshot.snapshot(), (Object)shardId);
                    this.notifySuccessfulSnapshotShard(snapshot.snapshot(), shardId, localShard.getValue().generation());
                    continue;
                }
                if (stage != IndexShardSnapshotStatus.Stage.FAILURE) continue;
                logger.debug("[{}] new cluster-manager thinks the shard [{}] is not completed but the shard failed locally, updating status on master", (Object)snapshot.snapshot(), (Object)shardId);
                this.notifyFailedSnapshotShard(snapshot.snapshot(), shardId, indexShardSnapshotStatus.getFailure());
            }
        }
    }

    private void notifySuccessfulSnapshotShard(Snapshot snapshot, ShardId shardId, String generation) {
        assert (generation != null);
        this.sendSnapshotShardUpdate(snapshot, shardId, new SnapshotsInProgress.ShardSnapshotStatus(this.clusterService.localNode().getId(), SnapshotsInProgress.ShardState.SUCCESS, generation));
    }

    private void notifyFailedSnapshotShard(Snapshot snapshot, ShardId shardId, String failure) {
        this.sendSnapshotShardUpdate(snapshot, shardId, new SnapshotsInProgress.ShardSnapshotStatus(this.clusterService.localNode().getId(), SnapshotsInProgress.ShardState.FAILED, failure, null));
    }

    private void sendSnapshotShardUpdate(final Snapshot snapshot, ShardId shardId, final SnapshotsInProgress.ShardSnapshotStatus status) {
        this.remoteFailedRequestDeduplicator.executeOnce(new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status), new ActionListener<Void>(){

            public void onResponse(Void aVoid) {
                logger.trace("[{}] [{}] updated snapshot state", (Object)snapshot, (Object)status);
            }

            public void onFailure(Exception e) {
                logger.warn(() -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", (Object)snapshot, (Object)status), (Throwable)e);
            }
        }, (req, reqListener) -> this.transportService.sendRequest(this.transportService.getLocalNode(), "internal:cluster/snapshot/update_snapshot_status", (TransportRequest)req, new TransportResponseHandler<UpdateIndexShardSnapshotStatusResponse>(){

            public UpdateIndexShardSnapshotStatusResponse read(StreamInput in) {
                return UpdateIndexShardSnapshotStatusResponse.INSTANCE;
            }

            @Override
            public void handleResponse(UpdateIndexShardSnapshotStatusResponse response) {
                reqListener.onResponse(null);
            }

            @Override
            public void handleException(TransportException exp) {
                reqListener.onFailure((Exception)((Object)exp));
            }

            @Override
            public String executor() {
                return "same";
            }
        }));
    }
}

