From e34331c31d68cb22891db48011db5b36ad178af1 Mon Sep 17 00:00:00 2001 From: Rakesh Radhakrishnan Date: Tue, 31 Jan 2017 23:44:01 +0530 Subject: [PATCH] HDFS-11243. [SPS]: Add a protocol command from NN to DN for dropping the SPS work and queues. Contributed by Uma Maheswara Rao G --- .../hadoop/hdfs/protocolPB/PBHelper.java | 12 ++++ .../server/blockmanagement/BlockManager.java | 13 ++-- .../blockmanagement/DatanodeDescriptor.java | 18 ++++++ .../blockmanagement/DatanodeManager.java | 19 ++++++ .../hdfs/server/datanode/BPOfferService.java | 4 ++ .../datanode/BlockStorageMovementTracker.java | 12 ++++ .../datanode/StoragePolicySatisfyWorker.java | 22 ++++++- .../namenode/StoragePolicySatisfier.java | 25 ++++++-- .../server/protocol/DatanodeProtocol.java | 2 + .../server/protocol/DropSPSWorkCommand.java | 36 +++++++++++ .../src/main/proto/DatanodeProtocol.proto | 9 +++ .../TestStoragePolicySatisfyWorker.java | 59 +++++++++++++++++++ 12 files changed, 216 insertions(+), 15 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DropSPSWorkCommand.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 10555baf4d3..0c036086039 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdComma import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DropSPSWorkCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto; @@ -112,6 +113,7 @@ import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DropSPSWorkCommand; import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand; import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; @@ -143,6 +145,10 @@ public class PBHelper { private static final RegisterCommandProto REG_CMD_PROTO = RegisterCommandProto.newBuilder().build(); private static final RegisterCommand REG_CMD = new RegisterCommand(); + private static final DropSPSWorkCommandProto DROP_SPS_WORK_CMD_PROTO = + DropSPSWorkCommandProto.newBuilder().build(); + private static final DropSPSWorkCommand DROP_SPS_WORK_CMD = + new DropSPSWorkCommand(); private PBHelper() { /** Hidden constructor */ @@ -478,6 +484,8 @@ public class PBHelper { return PBHelper.convert(proto.getBlkECReconstructionCmd()); case BlockStorageMovementCommand: return PBHelper.convert(proto.getBlkStorageMovementCmd()); + case DropSPSWorkCommand: + return DROP_SPS_WORK_CMD; default: return null; } @@ -617,6 +625,10 @@ public class PBHelper { .setBlkStorageMovementCmd( convert((BlockStorageMovementCommand) datanodeCommand)); break; + case DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND: + builder.setCmdType(DatanodeCommandProto.Type.DropSPSWorkCommand) + .setDropSPSWorkCmd(DROP_SPS_WORK_CMD_PROTO); + break; case DatanodeProtocol.DNA_UNKNOWN: //Not expected default: builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 5bacc7d381c..9a9632f5399 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -720,13 +720,13 @@ public class BlockManager implements BlockStatsMXBean { mxBeanName = MBeans.register("NameNode", "BlockStats", this); bmSafeMode.activate(blockTotal); if (sps != null && !haEnabled) { - sps.start(); + sps.start(false); } } public void close() { if (sps != null) { - sps.stop(); + sps.stop(false); } bmSafeMode.close(); try { @@ -5053,7 +5053,7 @@ public class BlockManager implements BlockStatsMXBean { return; } - sps.start(); + sps.start(true); } /** @@ -5067,12 +5067,7 @@ public class BlockManager implements BlockStatsMXBean { LOG.info("Storage policy satisfier is already stopped."); return; } - sps.stop(); - // TODO: add command to DNs for stop in-progress processing SPS commands? - // to avoid confusions in cluster, I think sending commands from centralized - // place would be better to drop pending queues at DN. Anyway in progress - // work will be finished in a while, but this command can void starting - // fresh movements at DN. + sps.stop(true); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index dbf0f7e50da..0311b029d97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -214,6 +214,7 @@ public class DatanodeDescriptor extends DatanodeInfo { */ private final Queue storageMovementBlocks = new LinkedList<>(); + private volatile boolean dropSPSWork = false; /* Variables for maintaining number of blocks scheduled to be written to * this storage. This count is approximate and might be slightly bigger @@ -1104,4 +1105,21 @@ public class DatanodeDescriptor extends DatanodeInfo { return storageMovementBlocks.poll(); } } + + /** + * Set whether to drop SPS related queues at DN side. + * + * @param dropSPSWork + * - true if need to drop SPS queues, otherwise false. + */ + public synchronized void setDropSPSWork(boolean dropSPSWork) { + this.dropSPSWork = dropSPSWork; + } + + /** + * @return true if need to drop SPS queues at DN. + */ + public synchronized boolean shouldDropSPSWork() { + return this.dropSPSWork; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index ba49c1378f9..da340a8581d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1750,6 +1750,13 @@ public class DatanodeManager { blkStorageMovementInfosBatch.getBlockMovingInfo())); } + if (nodeinfo.shouldDropSPSWork()) { + cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND); + // Set back to false to indicate that the new value has been sent to the + // datanode. + nodeinfo.setDropSPSWork(false); + } + if (!cmds.isEmpty()) { return cmds.toArray(new DatanodeCommand[cmds.size()]); } @@ -1978,5 +1985,17 @@ public class DatanodeManager { return slowDiskTracker != null ? slowDiskTracker.getSlowDiskReportAsJsonString() : null; } + + /** + * Mark all DNs to drop SPS queues. A DNA_DROP_SPS_WORK_COMMAND will be added + * in heartbeat response, which will indicate DN to drop SPS queues + */ + public void addDropSPSWorkCommandsToAllDNs() { + synchronized (this) { + for (DatanodeDescriptor dn : datanodeMap.values()) { + dn.setDropSPSWork(true); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 22d88b59476..79109b737ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -802,6 +802,10 @@ class BPOfferService { blkSPSCmd.getTrackID(), blkSPSCmd.getBlockPoolId(), blkSPSCmd.getBlockMovingTasks()); break; + case DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND: + LOG.info("DatanodeCommand action: DNA_DROP_SPS_WORK_COMMAND"); + dn.getStoragePolicySatisfyWorker().dropSPSWork(); + break; default: LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java index bd35b0927a6..e623cef0e86 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java @@ -146,4 +146,16 @@ public class BlockStorageMovementTracker implements Runnable { moverTaskFutures.notify(); } } + + /** + * Clear the pending movement and movement result queues. + */ + void removeAll() { + synchronized (moverTaskFutures) { + moverTaskFutures.clear(); + } + synchronized (movementResults) { + movementResults.clear(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java index 10adbfde224..a96ac98aa31 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java @@ -115,7 +115,6 @@ public class StoragePolicySatisfyWorker { TimeUnit.SECONDS, new SynchronousQueue(), new Daemon.DaemonFactory() { private final AtomicInteger threadIndex = new AtomicInteger(0); - @Override public Thread newThread(Runnable r) { Thread t = super.newThread(r); @@ -421,10 +420,31 @@ public class StoragePolicySatisfyWorker { } } } + + /** + * Clear the trackID vs movement status tracking map. + */ + void removeAll() { + synchronized (trackIdVsMovementStatus) { + trackIdVsMovementStatus.clear(); + } + } + } @VisibleForTesting BlocksMovementsCompletionHandler getBlocksMovementsCompletionHandler() { return handler; } + + /** + * Drop the in-progress SPS work queues. + */ + public void dropSPSWork() { + LOG.info("Received request to drop StoragePolicySatisfierWorker queues. " + + "So, none of the SPS Worker queued block movements will" + + " be scheduled."); + movementTracker.removeAll(); + handler.removeAll(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java index 1c48910ed7b..dc58294e979 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java @@ -99,9 +99,14 @@ public class StoragePolicySatisfier implements Runnable { * Start storage policy satisfier demon thread. Also start block storage * movements monitor for retry the attempts if needed. */ - public synchronized void start() { + public synchronized void start(boolean reconfigStart) { isRunning = true; - LOG.info("Starting StoragePolicySatisfier."); + if (reconfigStart) { + LOG.info("Starting StoragePolicySatisfier, as admin requested to " + + "activate it."); + } else { + LOG.info("Starting StoragePolicySatisfier."); + } storagePolicySatisfierThread = new Daemon(this); storagePolicySatisfierThread.setName("StoragePolicySatisfier"); storagePolicySatisfierThread.start(); @@ -110,10 +115,17 @@ public class StoragePolicySatisfier implements Runnable { /** * Stop storage policy satisfier demon thread. + * + * @param reconfigStop */ - public synchronized void stop() { + public synchronized void stop(boolean reconfigStop) { isRunning = false; - LOG.info("Stopping StoragePolicySatisfier."); + if (reconfigStop) { + LOG.info("Stopping StoragePolicySatisfier, as admin requested to " + + "deactivate it."); + } else { + LOG.info("Stopping StoragePolicySatisfier."); + } if (storagePolicySatisfierThread == null) { return; } @@ -123,7 +135,10 @@ public class StoragePolicySatisfier implements Runnable { } catch (InterruptedException ie) { } this.storageMovementsMonitor.stop(); - this.clearQueues(); + if (reconfigStop) { + this.clearQueues(); + this.blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs(); + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index 858f59be4d9..892efb3a5d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -80,6 +80,8 @@ public interface DatanodeProtocol { final static int DNA_UNCACHE = 10; // uncache blocks final static int DNA_ERASURE_CODING_RECONSTRUCTION = 11; // erasure coding reconstruction command final static int DNA_BLOCK_STORAGE_MOVEMENT = 12; // block storage movement command + final static int DNA_DROP_SPS_WORK_COMMAND = 13; // block storage movement + // command /** * Register Datanode. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DropSPSWorkCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DropSPSWorkCommand.java new file mode 100644 index 00000000000..806f713fbb8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DropSPSWorkCommand.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * A DropSPSWorkCommand is an instruction to a datanode to drop the SPSWorker's + * pending block storage movement queues. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class DropSPSWorkCommand extends DatanodeCommand { + public static final DropSPSWorkCommand DNA_DROP_SPS_WORK_COMMAND = + new DropSPSWorkCommand(); + + public DropSPSWorkCommand() { + super(DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index 77b0f86fb13..899dc7ef1d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -61,6 +61,7 @@ message DatanodeCommandProto { BlockIdCommand = 8; BlockECReconstructionCommand = 9; BlockStorageMovementCommand = 10; + DropSPSWorkCommand = 11; } required Type cmdType = 1; // Type of the command @@ -76,6 +77,7 @@ message DatanodeCommandProto { optional BlockIdCommandProto blkIdCmd = 8; optional BlockECReconstructionCommandProto blkECReconstructionCmd = 9; optional BlockStorageMovementCommandProto blkStorageMovementCmd = 10; + optional DropSPSWorkCommandProto dropSPSWorkCmd = 11; } /** @@ -165,6 +167,13 @@ message BlockStorageMovementCommandProto { repeated BlockStorageMovementProto blockStorageMovement = 3; } +/** + * Instruct datanode to drop SPS work queues + */ +message DropSPSWorkCommandProto { + // void +} + /** * Block storage movement information */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java index 8e02d4106b8..86b8b50942b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.datanode; +import static org.junit.Assert.*; + import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -186,6 +188,63 @@ public class TestStoragePolicySatisfyWorker { waitForBlockMovementCompletion(worker, inode.getId(), 1, 30000); } + /** + * Tests that drop SPS work method clears all the queues. + * + * @throws Exception + */ + @Test(timeout = 120000) + public void testDropSPSWork() throws Exception { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(20).build(); + + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + final String file = "/testDropSPSWork"; + DFSTestUtil.createFile(dfs, new Path(file), false, 1024, 50 * 100, + DEFAULT_BLOCK_SIZE, (short) 2, 0, false, null); + + // move to ARCHIVE + dfs.setStoragePolicy(new Path(file), "COLD"); + + DataNode src = cluster.getDataNodes().get(2); + DatanodeInfo targetDnInfo = + DFSTestUtil.getLocalDatanodeInfo(src.getXferPort()); + + StoragePolicySatisfyWorker worker = + new StoragePolicySatisfyWorker(conf, src); + List blockMovingInfos = new ArrayList<>(); + List locatedBlocks = + dfs.getClient().getLocatedBlocks(file, 0).getLocatedBlocks(); + for (LocatedBlock locatedBlock : locatedBlocks) { + BlockMovingInfo blockMovingInfo = + prepareBlockMovingInfo(locatedBlock.getBlock().getLocalBlock(), + locatedBlock.getLocations()[0], targetDnInfo, + locatedBlock.getStorageTypes()[0], StorageType.ARCHIVE); + blockMovingInfos.add(blockMovingInfo); + } + INode inode = cluster.getNamesystem().getFSDirectory().getINode(file); + worker.processBlockMovingTasks(inode.getId(), + cluster.getNamesystem().getBlockPoolId(), blockMovingInfos); + // Wait till results queue build up + waitForBlockMovementResult(worker, inode.getId(), 30000); + worker.dropSPSWork(); + assertTrue(worker.getBlocksMovementsCompletionHandler() + .getBlksMovementResults().size() == 0); + } + + private void waitForBlockMovementResult( + final StoragePolicySatisfyWorker worker, final long inodeId, int timeout) + throws Exception { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + List completedBlocks = worker + .getBlocksMovementsCompletionHandler().getBlksMovementResults(); + return completedBlocks.size() > 0; + } + }, 100, timeout); + } + private void waitForBlockMovementCompletion( final StoragePolicySatisfyWorker worker, final long inodeId, int expectedFailedItemsCount, int timeout) throws Exception {