/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode.sps;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.namenode.sps.Context;
import org.apache.hadoop.hdfs.server.namenode.sps.ItemInfo;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class BlockStorageMovementNeeded {
    public static final Logger LOG = LoggerFactory.getLogger(BlockStorageMovementNeeded.class);
    private final Queue<ItemInfo> storageMovementNeeded = new LinkedList<ItemInfo>();
    private final Map<Long, DirPendingWorkInfo> pendingWorkForDirectory = new HashMap<Long, DirPendingWorkInfo>();
    private final Context ctxt;
    private Daemon pathIdCollector;
    private SPSPathIdProcessor pathIDProcessor;
    private static long statusClearanceElapsedTimeMs = 300000L;

    public BlockStorageMovementNeeded(Context context) {
        this.ctxt = context;
        this.pathIDProcessor = new SPSPathIdProcessor();
    }

    public synchronized void add(ItemInfo trackInfo) {
        if (trackInfo != null) {
            this.storageMovementNeeded.add(trackInfo);
        }
    }

    @VisibleForTesting
    public synchronized void addAll(long startPath, List<ItemInfo> itemInfoList, boolean scanCompleted) {
        this.storageMovementNeeded.addAll(itemInfoList);
        this.updatePendingDirScanStats(startPath, itemInfoList.size(), scanCompleted);
    }

    @VisibleForTesting
    public synchronized void add(ItemInfo itemInfo, boolean scanCompleted) {
        this.storageMovementNeeded.add(itemInfo);
        if (itemInfo.getStartPath() == itemInfo.getFile()) {
            return;
        }
        this.updatePendingDirScanStats(itemInfo.getStartPath(), 1, scanCompleted);
    }

    private void updatePendingDirScanStats(long startPath, int numScannedFiles, boolean scanCompleted) {
        DirPendingWorkInfo pendingWork = this.pendingWorkForDirectory.get(startPath);
        if (pendingWork == null) {
            pendingWork = new DirPendingWorkInfo();
            this.pendingWorkForDirectory.put(startPath, pendingWork);
        }
        pendingWork.addPendingWorkCount(numScannedFiles);
        if (scanCompleted) {
            pendingWork.markScanCompleted();
        }
    }

    public synchronized ItemInfo get() {
        return this.storageMovementNeeded.poll();
    }

    public synchronized int size() {
        return this.storageMovementNeeded.size();
    }

    public synchronized void clearAll() {
        this.storageMovementNeeded.clear();
        this.pendingWorkForDirectory.clear();
    }

    public synchronized void removeItemTrackInfo(ItemInfo trackInfo, boolean isSuccess) throws IOException {
        if (trackInfo.isDir()) {
            long startId = trackInfo.getStartPath();
            if (!this.ctxt.isFileExist(startId)) {
                this.pendingWorkForDirectory.remove(startId);
            } else {
                DirPendingWorkInfo pendingWork = this.pendingWorkForDirectory.get(startId);
                if (pendingWork != null) {
                    pendingWork.decrementPendingWorkCount();
                    if (pendingWork.isDirWorkDone()) {
                        this.ctxt.removeSPSHint(startId);
                        this.pendingWorkForDirectory.remove(startId);
                    }
                }
            }
        } else {
            this.ctxt.removeSPSHint(trackInfo.getFile());
        }
    }

    public synchronized void clearQueuesWithNotification() {
        ItemInfo itemInfo;
        Long trackId;
        while ((trackId = this.ctxt.getNextSPSPath()) != null) {
            try {
                this.ctxt.removeSPSHint(trackId);
            }
            catch (IOException ie) {
                LOG.warn("Failed to remove SPS xattr for track id " + trackId, (Throwable)ie);
            }
        }
        while ((itemInfo = this.get()) != null) {
            try {
                if (itemInfo.isDir()) continue;
                this.ctxt.removeSPSHint(itemInfo.getFile());
            }
            catch (IOException ie) {
                LOG.warn("Failed to remove SPS xattr for track id " + itemInfo.getFile(), (Throwable)ie);
            }
        }
        this.clearAll();
    }

    public void activate() {
        this.pathIdCollector = new Daemon((Runnable)this.pathIDProcessor);
        this.pathIdCollector.setName("SPSPathIdProcessor");
        this.pathIdCollector.start();
    }

    public void close() {
        if (this.pathIdCollector != null) {
            this.pathIdCollector.interrupt();
        }
    }

    @VisibleForTesting
    public static void setStatusClearanceElapsedTimeMs(long statusClearanceElapsedTimeMs) {
        BlockStorageMovementNeeded.statusClearanceElapsedTimeMs = statusClearanceElapsedTimeMs;
    }

    @VisibleForTesting
    public static long getStatusClearanceElapsedTimeMs() {
        return statusClearanceElapsedTimeMs;
    }

    public void markScanCompletedForDir(long inode) {
        DirPendingWorkInfo pendingWork = this.pendingWorkForDirectory.get(inode);
        if (pendingWork != null) {
            pendingWork.markScanCompleted();
        }
    }

    public static class DirPendingWorkInfo {
        private int pendingWorkCount = 0;
        private boolean fullyScanned = false;

        public synchronized void addPendingWorkCount(int count) {
            this.pendingWorkCount += count;
        }

        public synchronized void decrementPendingWorkCount() {
            --this.pendingWorkCount;
        }

        public synchronized boolean isDirWorkDone() {
            return this.pendingWorkCount <= 0 && this.fullyScanned;
        }

        public synchronized void markScanCompleted() {
            this.fullyScanned = true;
        }
    }

    private class SPSPathIdProcessor
    implements Runnable {
        private static final int MAX_RETRY_COUNT = 3;

        private SPSPathIdProcessor() {
        }

        @Override
        public void run() {
            LOG.info("Starting SPSPathIdProcessor!.");
            Long startINode = null;
            int retryCount = 0;
            while (BlockStorageMovementNeeded.this.ctxt.isRunning()) {
                try {
                    if (BlockStorageMovementNeeded.this.ctxt.isInSafeMode()) continue;
                    if (startINode == null) {
                        retryCount = 0;
                        startINode = BlockStorageMovementNeeded.this.ctxt.getNextSPSPath();
                    }
                    if (startINode == null) {
                        Thread.sleep(3000L);
                    } else {
                        BlockStorageMovementNeeded.this.ctxt.scanAndCollectFiles(startINode);
                        DirPendingWorkInfo dirPendingWorkInfo = (DirPendingWorkInfo)BlockStorageMovementNeeded.this.pendingWorkForDirectory.get(startINode);
                        if (dirPendingWorkInfo != null && dirPendingWorkInfo.isDirWorkDone()) {
                            try {
                                BlockStorageMovementNeeded.this.ctxt.removeSPSHint(startINode);
                            }
                            catch (FileNotFoundException e) {
                                startINode = null;
                            }
                            BlockStorageMovementNeeded.this.pendingWorkForDirectory.remove(startINode);
                        }
                    }
                    startINode = null;
                }
                catch (Throwable t) {
                    String reClass = t.getClass().getName();
                    if (InterruptedException.class.getName().equals(reClass)) {
                        LOG.info("SPSPathIdProcessor thread is interrupted. Stopping..");
                        break;
                    }
                    LOG.warn("Exception while scanning file inodes to satisfy the policy", t);
                    try {
                        Thread.sleep(3000L);
                    }
                    catch (InterruptedException e) {
                        LOG.info("Interrupted while waiting in SPSPathIdProcessor", t);
                        break;
                    }
                    if (++retryCount < 3) continue;
                    LOG.warn("Skipping this inode {} due to too many retries.", (Object)startINode);
                    startINode = null;
                }
            }
        }
    }
}

