From 5845c36c16c423107183287cce3be9357dad7564 Mon Sep 17 00:00:00 2001 From: Rakesh Radhakrishnan Date: Mon, 29 Jan 2018 03:10:48 +0530 Subject: [PATCH] HDFS-13050: [SPS]: Create start/stop script to start external SPS process. Contributed by Surendra Singh Lilhore. --- .../hadoop-hdfs/src/main/bin/hdfs | 5 + .../server/blockmanagement/BlockManager.java | 9 ++ .../hadoop/hdfs/server/mover/Mover.java | 2 +- .../hdfs/server/namenode/sps/Context.java | 5 - .../namenode/sps/IntraSPSNameNodeContext.java | 4 - .../sps/IntraSPSNameNodeFileIdCollector.java | 12 +- .../hdfs/server/namenode/sps/SPSPathIds.java | 1 + .../namenode/sps/StoragePolicySatisfier.java | 83 +++++++---- .../sps/ExternalSPSBlockMoveTaskHandler.java | 2 +- .../hdfs/server/sps/ExternalSPSContext.java | 57 +------- .../sps/ExternalSPSFileIDCollector.java | 12 +- .../sps/ExternalStoragePolicySatisfier.java | 130 ++++++++++++++++++ .../src/site/markdown/ArchivalStorage.md | 10 +- .../sps/TestStoragePolicySatisfier.java | 22 +-- .../TestExternalStoragePolicySatisfier.java | 33 +++-- 15 files changed, 259 insertions(+), 128 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs index bc6e7a4aa97..94426a561fb 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs @@ -63,6 +63,7 @@ function hadoop_usage hadoop_add_subcommand "secondarynamenode" daemon "run the DFS secondary namenode" hadoop_add_subcommand "snapshotDiff" client "diff two snapshots of a directory or diff the current directory contents with a snapshot" hadoop_add_subcommand "storagepolicies" admin "list/get/set/satisfyStoragePolicy block storage policies" + hadoop_add_subcommand "sps" daemon "run external storagepolicysatisfier" hadoop_add_subcommand "version" client "print the version" hadoop_add_subcommand "zkfc" daemon "run the ZK Failover Controller daemon" hadoop_generate_usage "${HADOOP_SHELL_EXECNAME}" false @@ -201,6 +202,10 @@ function hdfscmd_case storagepolicies) HADOOP_CLASSNAME=org.apache.hadoop.hdfs.tools.StoragePolicyAdmin ;; + sps) + HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true" + HADOOP_CLASSNAME=org.apache.hadoop.hdfs.server.sps.ExternalStoragePolicySatisfier + ;; version) HADOOP_CLASSNAME=org.apache.hadoop.util.VersionInfo ;; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index ac6d44b035f..4ea64a37b91 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -94,6 +94,9 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; +import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeBlockMoveTaskHandler; +import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext; +import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeFileIdCollector; import org.apache.hadoop.hdfs.server.namenode.sps.SPSPathIds; import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; @@ -5106,9 +5109,15 @@ public class BlockManager implements BlockStatsMXBean { return; } updateSPSMode(StoragePolicySatisfierMode.INTERNAL); + sps.init(new IntraSPSNameNodeContext(this.namesystem, this, sps), + new IntraSPSNameNodeFileIdCollector(this.namesystem.getFSDirectory(), + sps), + new IntraSPSNameNodeBlockMoveTaskHandler(this, this.namesystem), null); sps.start(true, spsMode); } + + /** * Enable storage policy satisfier by starting its service. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index b4e9716803d..2cc0e271b63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -672,7 +672,7 @@ public class Mover { } if (spsRunning) { System.err.println("Mover failed due to StoragePolicySatisfier" - + " is running. Exiting with status " + + " service running inside namenode. Exiting with status " + ExitStatus.SKIPPED_DUE_TO_SPS + "... "); return ExitStatus.SKIPPED_DUE_TO_SPS.getExitCode(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java index bddbc1b8476..ff4ad6ba41e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java @@ -175,9 +175,4 @@ public interface Context { */ String getFilePath(Long inodeId); - /** - * Close the resources. - */ - void close() throws IOException; - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java index 191886c70bc..ff6cc21fa22 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java @@ -196,8 +196,4 @@ public class IntraSPSNameNodeContext implements Context { return namesystem.getFilePath(inodeId); } - @Override - public void close() throws IOException { - // Nothing to clean. - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java index f7cd7545669..7a44dd93eaf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java @@ -158,11 +158,15 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser */ public synchronized int remainingCapacity() { int size = service.processingQueueSize(); - if (size >= maxQueueLimitToScan) { - return 0; - } else { - return (maxQueueLimitToScan - size); + int remainingSize = 0; + if (size < maxQueueLimitToScan) { + remainingSize = maxQueueLimitToScan - size; } + if (LOG.isDebugEnabled()) { + LOG.debug("SPS processing Q -> maximum capacity:{}, current size:{}," + + " remaining size:{}", maxQueueLimitToScan, size, remainingSize); + } + return remainingSize; } class SPSTraverseInfo extends TraverseInfo { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java index cd6ad224ede..e0f4999be05 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java @@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceStability; public class SPSPathIds { // List of pending dir to satisfy the policy + // TODO: Make this bounded queue. private final Queue spsDirsToBeTraveresed = new LinkedList(); /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java index 89799fc92f1..4ddfe2ef87f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -174,10 +175,11 @@ public class StoragePolicySatisfier implements SPSService, Runnable { return; } if (reconfigStart) { - LOG.info("Starting StoragePolicySatisfier, as admin requested to " - + "start it."); + LOG.info("Starting {} StoragePolicySatisfier, as admin requested to " + + "start it.", StringUtils.toLowerCase(spsMode.toString())); } else { - LOG.info("Starting StoragePolicySatisfier."); + LOG.info("Starting {} StoragePolicySatisfier.", + StringUtils.toLowerCase(spsMode.toString())); } // Ensure that all the previously submitted block movements(if any) have to @@ -243,7 +245,14 @@ public class StoragePolicySatisfier implements SPSService, Runnable { @Override public void run() { - while (ctxt.isRunning()) { + while (isRunning) { + // Check if dependent service is running + if (!ctxt.isRunning()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Upstream service is down, skipping the sps work."); + } + continue; + } try { if (!ctxt.isInSafeMode()) { ItemInfo itemInfo = storageMovementNeeded.get(); @@ -284,33 +293,39 @@ public class StoragePolicySatisfier implements SPSService, Runnable { // Just add to monitor, so it will be tracked for report and // be removed on storage movement attempt finished report. case BLOCKS_TARGETS_PAIRED: + if (LOG.isDebugEnabled()) { + LOG.debug("Block analysis status:{} for the file path:{}." + + " Adding to attempt monitor queue for the storage " + + "movement attempt finished report", + status.status, fileStatus.getPath()); + } this.storageMovementsMonitor.add(new AttemptedItemInfo(itemInfo .getStartId(), itemInfo.getFileId(), monotonicNow(), status.assignedBlocks, itemInfo.getRetryCount())); break; case NO_BLOCKS_TARGETS_PAIRED: if (LOG.isDebugEnabled()) { - LOG.debug("Adding trackID " + trackId - + " back to retry queue as none of the blocks" - + " found its eligible targets."); + LOG.debug("Adding trackID:{} for the file path:{} back to" + + " retry queue as none of the blocks found its eligible" + + " targets.", trackId, fileStatus.getPath()); } itemInfo.increRetryCount(); this.storageMovementNeeded.add(itemInfo); break; case FEW_LOW_REDUNDANCY_BLOCKS: if (LOG.isDebugEnabled()) { - LOG.debug("Adding trackID " + trackId - + " back to retry queue as some of the blocks" - + " are low redundant."); + LOG.debug("Adding trackID:{} for the file path:{} back to " + + "retry queue as some of the blocks are low redundant.", + trackId, fileStatus.getPath()); } itemInfo.increRetryCount(); this.storageMovementNeeded.add(itemInfo); break; case BLOCKS_FAILED_TO_MOVE: if (LOG.isDebugEnabled()) { - LOG.debug("Adding trackID " + trackId - + " back to retry queue as some of the blocks" - + " movement failed."); + LOG.debug("Adding trackID:{} for the file path:{} back to " + + "retry queue as some of the blocks movement failed.", + trackId, fileStatus.getPath()); } this.storageMovementNeeded.add(itemInfo); break; @@ -318,8 +333,9 @@ public class StoragePolicySatisfier implements SPSService, Runnable { case BLOCKS_TARGET_PAIRING_SKIPPED: case BLOCKS_ALREADY_SATISFIED: default: - LOG.info("Block analysis skipped or blocks already satisfied" - + " with storages. So, Cleaning up the Xattrs."); + LOG.info("Block analysis status:{} for the file path:{}." + + " So, Cleaning up the Xattrs.", status.status, + fileStatus.getPath()); storageMovementNeeded.removeItemTrackInfo(itemInfo, true); break; } @@ -346,20 +362,20 @@ public class StoragePolicySatisfier implements SPSService, Runnable { if (isRunning) { synchronized (this) { if (isRunning) { - isRunning = false; - // Stopping monitor thread and clearing queues as well - this.clearQueues(); - this.storageMovementsMonitor.stopGracefully(); - if (!(t instanceof InterruptedException)) { - LOG.info("StoragePolicySatisfier received an exception" - + " while shutting down.", t); + if (t instanceof InterruptedException) { + isRunning = false; + LOG.info("Stopping StoragePolicySatisfier."); + // Stopping monitor thread and clearing queues as well + this.clearQueues(); + this.storageMovementsMonitor.stopGracefully(); + } else { + LOG.error( + "StoragePolicySatisfier thread received runtime exception, " + + "ignoring", t); } - LOG.info("Stopping StoragePolicySatisfier."); } } } - LOG.error("StoragePolicySatisfier thread received runtime exception. " - + "Stopping Storage policy satisfier work", t); return; } @@ -374,9 +390,8 @@ public class StoragePolicySatisfier implements SPSService, Runnable { final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete(); if (!lastBlkComplete) { // Postpone, currently file is under construction - // So, should we add back? or leave it to user - LOG.info("BlockCollectionID: {} file is under construction. So, postpone" - + " this to the next retry iteration", fileInfo.getFileId()); + LOG.info("File: {} is under construction. So, postpone" + + " this to the next retry iteration", fileInfo.getPath()); return new BlocksMovingAnalysis( BlocksMovingAnalysis.Status.ANALYSIS_SKIPPED_FOR_RETRY, new ArrayList<>()); @@ -384,8 +399,8 @@ public class StoragePolicySatisfier implements SPSService, Runnable { List blocks = locatedBlocks.getLocatedBlocks(); if (blocks.size() == 0) { - LOG.info("BlockCollectionID: {} file is not having any blocks." - + " So, skipping the analysis.", fileInfo.getFileId()); + LOG.info("File: {} is not having any blocks." + + " So, skipping the analysis.", fileInfo.getPath()); return new BlocksMovingAnalysis( BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED, new ArrayList<>()); @@ -970,4 +985,12 @@ public class StoragePolicySatisfier implements SPSService, Runnable { public void markScanCompletedForPath(Long inodeId) { getStorageMovementQueue().markScanCompletedForDir(inodeId); } + + /** + * Join main SPS thread. + */ + public void join() throws InterruptedException { + //TODO Add join here on SPS rpc server also + storagePolicySatisfierThread.join(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java index a1c8eec27ea..4a762649d66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java @@ -110,7 +110,7 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler { /** * Initializes block movement tracker daemon and starts the thread. */ - void init() { + public void init() { movementTrackerThread = new Daemon(this.blkMovementTracker); movementTrackerThread.setName("BlockStorageMovementTracker"); movementTrackerThread.start(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java index e5b04bab044..e3b3bbb5477 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java @@ -19,19 +19,13 @@ package org.apache.hadoop.hdfs.server.sps; import java.io.IOException; -import java.net.URI; -import java.util.Collection; -import java.util.Collections; -import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.UnresolvedLinkException; -import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; @@ -57,13 +51,12 @@ public class ExternalSPSContext implements Context { LoggerFactory.getLogger(ExternalSPSContext.class); private SPSService service; private NameNodeConnector nnc = null; - private Object nnConnectionLock = new Object(); private BlockStoragePolicySuite createDefaultSuite = BlockStoragePolicySuite.createDefaultSuite(); - public ExternalSPSContext(SPSService service) { + public ExternalSPSContext(SPSService service, NameNodeConnector nnc) { this.service = service; - initializeNamenodeConnector(); + this.nnc = nnc; } @Override @@ -73,7 +66,6 @@ public class ExternalSPSContext implements Context { @Override public boolean isInSafeMode() { - initializeNamenodeConnector(); try { return nnc != null ? nnc.getDistributedFileSystem().isInSafeMode() : false; @@ -85,7 +77,6 @@ public class ExternalSPSContext implements Context { @Override public boolean isMoverRunning() { - initializeNamenodeConnector(); try { FSDataOutputStream out = nnc.getDistributedFileSystem() .append(HdfsServerConstants.MOVER_ID_PATH); @@ -101,7 +92,6 @@ public class ExternalSPSContext implements Context { @Override public long getFileID(String path) throws UnresolvedLinkException, AccessControlException, ParentNotDirectoryException { - initializeNamenodeConnector(); HdfsFileStatus fs = null; try { fs = (HdfsFileStatus) nnc.getDistributedFileSystem().getFileStatus( @@ -121,7 +111,6 @@ public class ExternalSPSContext implements Context { @Override public boolean isFileExist(long inodeId) { - initializeNamenodeConnector(); String filePath = null; try { filePath = getFilePath(inodeId); @@ -145,14 +134,12 @@ public class ExternalSPSContext implements Context { @Override public void removeSPSHint(long inodeId) throws IOException { - initializeNamenodeConnector(); nnc.getDistributedFileSystem().removeXAttr(new Path(getFilePath(inodeId)), HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY); } @Override public int getNumLiveDataNodes() { - initializeNamenodeConnector(); try { return nnc.getDistributedFileSystem() .getDataNodeStats(DatanodeReportType.LIVE).length; @@ -164,7 +151,6 @@ public class ExternalSPSContext implements Context { @Override public HdfsFileStatus getFileInfo(long inodeID) throws IOException { - initializeNamenodeConnector(); return nnc.getDistributedFileSystem().getClient() .getLocatedFileInfo(getFilePath(inodeID), false); } @@ -172,13 +158,11 @@ public class ExternalSPSContext implements Context { @Override public DatanodeStorageReport[] getLiveDatanodeStorageReport() throws IOException { - initializeNamenodeConnector(); return nnc.getLiveDatanodeStorageReport(); } @Override public boolean hasLowRedundancyBlocks(long inodeID) { - initializeNamenodeConnector(); try { return nnc.getNNProtocolConnection().hasLowRedundancyBlocks(inodeID); } catch (IOException e) { @@ -191,7 +175,6 @@ public class ExternalSPSContext implements Context { @Override public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type, long estimatedSize) { - initializeNamenodeConnector(); try { return nnc.getNNProtocolConnection().checkDNSpaceForScheduling(dn, type, estimatedSize); @@ -204,7 +187,6 @@ public class ExternalSPSContext implements Context { @Override public Long getNextSPSPathId() { - initializeNamenodeConnector(); try { return nnc.getNNProtocolConnection().getNextSPSPathId(); } catch (IOException e) { @@ -233,39 +215,4 @@ public class ExternalSPSContext implements Context { return null; } } - - @Override - public void close() throws IOException { - synchronized (nnConnectionLock) { - if (nnc != null) { - nnc.close(); - } - } - } - - private void initializeNamenodeConnector() { - synchronized (nnConnectionLock) { - if (nnc == null) { - try { - nnc = getNameNodeConnector(service.getConf()); - } catch (IOException e) { - LOG.warn("Exception while creating Namenode Connector.." - + "Namenode might not have started.", e); - } - } - } - } - - public static NameNodeConnector getNameNodeConnector(Configuration conf) - throws IOException { - final Collection namenodes = DFSUtil.getInternalNsRpcUris(conf); - List nncs = Collections.emptyList(); - NameNodeConnector.checkOtherInstanceRunning(false); - nncs = NameNodeConnector.newNameNodeConnectors(namenodes, - ExternalSPSContext.class.getSimpleName(), - HdfsServerConstants.MOVER_ID_PATH, conf, - NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS); - return nncs.get(0); - } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java index 964ee8c10b9..ff277ba6646 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java @@ -139,11 +139,15 @@ public class ExternalSPSFileIDCollector implements FileIdCollector { */ public int remainingCapacity() { int size = service.processingQueueSize(); - if (size >= maxQueueLimitToScan) { - return 0; - } else { - return (maxQueueLimitToScan - size); + int remainingSize = 0; + if (size < maxQueueLimitToScan) { + remainingSize = maxQueueLimitToScan - size; } + if (LOG.isDebugEnabled()) { + LOG.debug("SPS processing Q -> maximum capacity:{}, current size:{}," + + " remaining size:{}", maxQueueLimitToScan, size, remainingSize); + } + return remainingSize; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java new file mode 100644 index 00000000000..c64abc3f9df --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.sps; + +import static org.apache.hadoop.util.ExitUtil.terminate; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; +import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener; +import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class starts and runs external SPS service. + */ +@InterfaceAudience.Private +public class ExternalStoragePolicySatisfier { + public static final Logger LOG = LoggerFactory + .getLogger(ExternalStoragePolicySatisfier.class); + + /** + * Main method to start SPS service. + */ + public static void main(String args[]) throws Exception { + NameNodeConnector nnc = null; + try { + StringUtils.startupShutdownMessage(StoragePolicySatisfier.class, args, + LOG); + HdfsConfiguration spsConf = new HdfsConfiguration(); + //TODO : login with SPS keytab + StoragePolicySatisfier sps = new StoragePolicySatisfier(spsConf); + nnc = getNameNodeConnector(spsConf); + + boolean spsRunning; + spsRunning = nnc.getDistributedFileSystem().getClient() + .isStoragePolicySatisfierRunning(); + if (spsRunning) { + throw new RuntimeException( + "Startup failed due to StoragePolicySatisfier" + + " running inside Namenode."); + } + + ExternalSPSContext context = new ExternalSPSContext(sps, nnc); + ExternalBlockMovementListener blkMoveListener = + new ExternalBlockMovementListener(); + ExternalSPSBlockMoveTaskHandler externalHandler = + new ExternalSPSBlockMoveTaskHandler(spsConf, nnc, sps); + externalHandler.init(); + sps.init(context, new ExternalSPSFileIDCollector(context, sps), + externalHandler, blkMoveListener); + sps.start(true, StoragePolicySatisfierMode.EXTERNAL); + if (sps != null) { + sps.join(); + } + } catch (Throwable e) { + LOG.error("Failed to start storage policy satisfier.", e); + terminate(1, e); + } finally { + if (nnc != null) { + nnc.close(); + } + } + } + + private static NameNodeConnector getNameNodeConnector(Configuration conf) + throws IOException, InterruptedException { + final Collection namenodes = DFSUtil.getInternalNsRpcUris(conf); + final Path externalSPSPathId = HdfsServerConstants.MOVER_ID_PATH; + while (true) { + try { + final List nncs = NameNodeConnector + .newNameNodeConnectors(namenodes, + StoragePolicySatisfier.class.getSimpleName(), + externalSPSPathId, conf, + NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS); + return nncs.get(0); + } catch (IOException e) { + LOG.warn("Failed to connect with namenode", e); + Thread.sleep(3000); // retry the connection after few secs + } + } + } + + /** + * It is implementation of BlockMovementListener. + */ + private static class ExternalBlockMovementListener + implements BlockMovementListener { + + private List actualBlockMovements = new ArrayList<>(); + + @Override + public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) { + for (Block block : moveAttemptFinishedBlks) { + actualBlockMovements.add(block); + } + LOG.info("Movement attempted blocks", actualBlockMovements); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md index c10bfc382ae..25a6cd92ef5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md @@ -238,5 +238,13 @@ Check the running status of Storage Policy Satisfier service in namenode. If it ### Enable(internal service inside NN or external service outside NN) or Disable SPS without restarting Namenode If administrator wants to switch modes of SPS feature while Namenode is running, first he/she needs to update the desired value(internal or external or none) for the configuration item `dfs.storage.policy.satisfier.mode` in configuration file (`hdfs-site.xml`) and then run the following Namenode reconfig command -+ hdfs dfsadmin -reconfig namenode start +* Command: + hdfs dfsadmin -reconfig namenode start + +### Start External SPS Service. +If administrator wants to start external sps, first he/she needs to configure property `dfs.storage.policy.satisfier.mode` with `external` value in configuration file (`hdfs-site.xml`) and then run Namenode reconfig command. After this start external sps service using following command + +* Command: + + hdfs --daemon start sps diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java index 935d4f2b8b3..135d996d277 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java @@ -603,7 +603,7 @@ public class TestStoragePolicySatisfier { if (out != null) { out.close(); } - hdfsCluster.shutdown(); + shutdownCluster(); } } @@ -626,9 +626,7 @@ public class TestStoragePolicySatisfier { Assert.assertTrue("SPS should be running as " + "no Mover really running", running); } finally { - if (hdfsCluster != null) { - hdfsCluster.shutdown(); - } + shutdownCluster(); } } @@ -672,9 +670,7 @@ public class TestStoragePolicySatisfier { DFSTestUtil.waitExpectedStorageType( file1, StorageType.DISK, 2, 30000, dfs); } finally { - if (hdfsCluster != null) { - hdfsCluster.shutdown(); - } + shutdownCluster(); } } @@ -1381,7 +1377,11 @@ public class TestStoragePolicySatisfier { // Remove 10 element and make queue free, So other traversing will start. for (int i = 0; i < 10; i++) { String path = expectedTraverseOrder.remove(0); - long trackId = sps.getStorageMovementQueue().get().getFileId(); + ItemInfo itemInfo = sps.getStorageMovementQueue().get(); + if (itemInfo == null) { + continue; + } + long trackId = itemInfo.getFileId(); INode inode = fsDir.getInode(trackId); assertTrue("Failed to traverse tree, expected " + path + " but got " + inode.getFullPathName(), path.equals(inode.getFullPathName())); @@ -1392,7 +1392,11 @@ public class TestStoragePolicySatisfier { // Check other element traversed in order and E, M, U, R, S should not be // added in queue which we already removed from expected list for (String path : expectedTraverseOrder) { - long trackId = sps.getStorageMovementQueue().get().getFileId(); + ItemInfo itemInfo = sps.getStorageMovementQueue().get(); + if (itemInfo == null) { + continue; + } + long trackId = itemInfo.getFileId(); INode inode = fsDir.getInode(trackId); assertTrue("Failed to traverse tree, expected " + path + " but got " + inode.getFullPathName(), path.equals(inode.getFullPathName())); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java index fe08b8ff25f..febc2ea6a28 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java @@ -22,7 +22,6 @@ import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -43,8 +42,6 @@ import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier; import org.junit.Assert; import org.junit.Ignore; -import com.google.common.collect.Maps; - /** * Tests the external sps service plugins. */ @@ -95,7 +92,8 @@ public class TestExternalStoragePolicySatisfier SPSService spsService = blkMgr.getSPSService(); spsService.stopGracefully(); - ExternalSPSContext context = new ExternalSPSContext(spsService); + ExternalSPSContext context = new ExternalSPSContext(spsService, + getNameNodeConnector(conf)); ExternalBlockMovementListener blkMoveListener = new ExternalBlockMovementListener(); @@ -124,7 +122,8 @@ public class TestExternalStoragePolicySatisfier spsService = blkMgr.getSPSService(); spsService.stopGracefully(); - ExternalSPSContext context = new ExternalSPSContext(spsService); + ExternalSPSContext context = new ExternalSPSContext(spsService, + getNameNodeConnector(getConf())); ExternalBlockMovementListener blkMoveListener = new ExternalBlockMovementListener(); ExternalSPSBlockMoveTaskHandler externalHandler = @@ -161,16 +160,22 @@ public class TestExternalStoragePolicySatisfier throws IOException { final Collection namenodes = DFSUtil.getInternalNsRpcUris(conf); Assert.assertEquals(1, namenodes.size()); - Map> nnMap = Maps.newHashMap(); - for (URI nn : namenodes) { - nnMap.put(nn, null); - } final Path externalSPSPathId = new Path("/system/tmp.id"); - final List nncs = NameNodeConnector - .newNameNodeConnectors(nnMap, - StoragePolicySatisfier.class.getSimpleName(), externalSPSPathId, - conf, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS); - return nncs.get(0); + NameNodeConnector.checkOtherInstanceRunning(false); + while (true) { + try { + final List nncs = NameNodeConnector + .newNameNodeConnectors(namenodes, + StoragePolicySatisfier.class.getSimpleName(), + externalSPSPathId, conf, + NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS); + return nncs.get(0); + } catch (IOException e) { + LOG.warn("Failed to connect with namenode", e); + // Ignore + } + + } } /**