diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 9cc45168835..9dd87d0968d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlock import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -138,7 +139,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements VolumeFailureSummary volumeFailureSummary, boolean requestFullBlockReportLease, @Nonnull SlowPeerReports slowPeers, - @Nonnull SlowDiskReports slowDisks) throws IOException { + @Nonnull SlowDiskReports slowDisks, + BlocksStorageMovementResult[] blksMovementResults) throws IOException { HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder() .setRegistration(PBHelper.convert(registration)) .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount) @@ -161,6 +163,11 @@ public class DatanodeProtocolClientSideTranslatorPB implements if (slowDisks.haveSlowDisks()) { builder.addAllSlowDisks(PBHelper.convertSlowDiskInfo(slowDisks)); } + + // Adding blocks movement results to the heart beat request. + builder.addAllBlksMovementResults( + PBHelper.convertBlksMovResults(blksMovementResults)); + HeartbeatResponseProto resp; try { resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java index 5cba284681f..40458eff2e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java @@ -122,7 +122,9 @@ public class DatanodeProtocolServerSideTranslatorPB implements request.getXceiverCount(), request.getFailedVolumes(), volumeFailureSummary, request.getRequestFullBlockReportLease(), PBHelper.convertSlowPeerInfo(request.getSlowPeersList()), - PBHelper.convertSlowDiskInfo(request.getSlowDisksList())); + PBHelper.convertSlowDiskInfo(request.getSlowDisksList()), + PBHelper.convertBlksMovResults( + request.getBlksMovementResultsList())); } catch (IOException e) { throw new ServiceException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 74cd90700e8..10555baf4d3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailur import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlocksStorageMovementResultProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; @@ -102,6 +103,8 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStr import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult.Status; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations; @@ -956,6 +959,55 @@ public class PBHelper { return SlowDiskReports.create(slowDisksMap); } + public static BlocksStorageMovementResult[] convertBlksMovResults( + List protos) { + BlocksStorageMovementResult[] results = + new BlocksStorageMovementResult[protos.size()]; + for (int i = 0; i < protos.size(); i++) { + BlocksStorageMovementResultProto resultProto = protos.get(i); + BlocksStorageMovementResult.Status status; + switch (resultProto.getStatus()) { + case SUCCESS: + status = Status.SUCCESS; + break; + case FAILURE: + status = Status.FAILURE; + break; + default: + throw new AssertionError("Unknown status: " + resultProto.getStatus()); + } + results[i] = new BlocksStorageMovementResult(resultProto.getTrackID(), + status); + } + return results; + } + + public static List convertBlksMovResults( + BlocksStorageMovementResult[] blocksMovementResults) { + List blocksMovementResultsProto = + new ArrayList<>(); + BlocksStorageMovementResultProto.Builder builder = + BlocksStorageMovementResultProto.newBuilder(); + for (int i = 0; i < blocksMovementResults.length; i++) { + BlocksStorageMovementResult report = blocksMovementResults[i]; + builder.setTrackID(report.getTrackId()); + BlocksStorageMovementResultProto.Status status; + switch (report.getStatus()) { + case SUCCESS: + status = BlocksStorageMovementResultProto.Status.SUCCESS; + break; + case FAILURE: + status = BlocksStorageMovementResultProto.Status.FAILURE; + break; + default: + throw new AssertionError("Unknown status: " + report.getStatus()); + } + builder.setStatus(status); + blocksMovementResultsProto.add(builder.build()); + } + return blocksMovementResultsProto; + } + public static JournalInfo convert(JournalInfoProto info) { int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0; int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 8581e7853f3..b7e8be0e00c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -5000,4 +5000,8 @@ public class BlockManager implements BlockStatsMXBean { public void satisfyStoragePolicy(long id) { storageMovementNeeded.add(id); } + + public StoragePolicySatisfier getStoragePolicySatisfier() { + return sps; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index a94d2df4315..0f93fb0f048 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -511,6 +512,10 @@ class BPServiceActor implements Runnable { outliersReportDue && dn.getDiskMetrics() != null ? SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) : SlowDiskReports.EMPTY_REPORT; + + BlocksStorageMovementResult[] blksMovementResults = + getBlocksMovementResults(); + HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration, reports, dn.getFSDataset().getCacheCapacity(), @@ -521,15 +526,33 @@ class BPServiceActor implements Runnable { volumeFailureSummary, requestBlockReportLease, slowPeers, - slowDisks); + slowDisks, + blksMovementResults); if (outliersReportDue) { // If the report was due and successfully sent, schedule the next one. scheduler.scheduleNextOutlierReport(); } + + // Remove the blocks movement results after successfully transferring + // to namenode. + dn.getStoragePolicySatisfyWorker().getBlocksMovementsCompletionHandler() + .remove(blksMovementResults); + return response; } + private BlocksStorageMovementResult[] getBlocksMovementResults() { + List trackIdVsMovementStatus = dn + .getStoragePolicySatisfyWorker().getBlocksMovementsCompletionHandler() + .getBlksMovementResults(); + BlocksStorageMovementResult[] blksMovementResult = + new BlocksStorageMovementResult[trackIdVsMovementStatus.size()]; + trackIdVsMovementStatus.toArray(blksMovementResult); + + return blksMovementResult; + } + @VisibleForTesting void sendLifelineForTests() throws IOException { lifelineSender.sendLifeline(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java index 604fb4a9c75..1bd851eccd5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java @@ -29,6 +29,7 @@ import java.io.OutputStream; import java.net.Socket; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.concurrent.Callable; @@ -57,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; @@ -218,7 +220,8 @@ public class StoragePolicySatisfyWorker { OutputStream unbufOut = sock.getOutputStream(); InputStream unbufIn = sock.getInputStream(); Token accessToken = datanode.getBlockAccessToken( - extendedBlock, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE)); + extendedBlock, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE), + new StorageType[]{targetStorageType}, new String[0]); DataEncryptionKeyFactory keyFactory = datanode .getDataEncryptionKeyFactoryForBlock(extendedBlock); @@ -257,7 +260,7 @@ public class StoragePolicySatisfyWorker { Token accessToken, DatanodeInfo srcDn, StorageType destinStorageType) throws IOException { new Sender(out).replaceBlock(eb, destinStorageType, accessToken, - srcDn.getDatanodeUuid(), srcDn); + srcDn.getDatanodeUuid(), srcDn, null); } /** Receive a reportedBlock copy response from the input stream. */ @@ -276,7 +279,7 @@ public class StoragePolicySatisfyWorker { /** * Block movement status code. */ - enum BlockMovementStatus { + public static enum BlockMovementStatus { /** Success. */ DN_BLK_STORAGE_MOVEMENT_SUCCESS(0), /** @@ -343,26 +346,72 @@ public class StoragePolicySatisfyWorker { /** * Blocks movements completion handler, which is used to collect details of - * the completed list of block movements and notify the namenode about the - * success or failures. + * the completed list of block movements and this status(success or failure) + * will be send to the namenode via heartbeat. */ static class BlocksMovementsCompletionHandler { - private final List completedBlocks = new ArrayList<>(); + private final List trackIdVsMovementStatus = + new ArrayList<>(); /** - * Collect all the block movement results and notify namenode. + * Collect all the block movement results. Later this will be send to + * namenode via heart beat. * * @param results * result of all the block movements per trackId */ - void handle(List results) { - completedBlocks.addAll(results); - // TODO: notify namenode about the success/failures. + void handle(List resultsPerTrackId) { + BlocksStorageMovementResult.Status status = + BlocksStorageMovementResult.Status.SUCCESS; + long trackId = -1; + for (BlockMovementResult blockMovementResult : resultsPerTrackId) { + trackId = blockMovementResult.getTrackId(); + if (blockMovementResult.status == + BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE) { + status = BlocksStorageMovementResult.Status.FAILURE; + // If any of the block movement is failed, then mark as failure so + // that namenode can take a decision to retry the blocks associated to + // the given trackId. + break; + } + } + + // Adding to the tracking results list. Later this will be send to + // namenode via datanode heartbeat. + synchronized (trackIdVsMovementStatus) { + trackIdVsMovementStatus.add( + new BlocksStorageMovementResult(trackId, status)); + } } - @VisibleForTesting - List getCompletedBlocks() { - return completedBlocks; + /** + * @return unmodifiable list of blocks storage movement results. + */ + List getBlksMovementResults() { + synchronized (trackIdVsMovementStatus) { + if (trackIdVsMovementStatus.size() <= 0) { + return new ArrayList<>(); + } + List results = Collections + .unmodifiableList(trackIdVsMovementStatus); + return results; + } + } + + /** + * Remove the blocks storage movement results. + * + * @param results + * set of blocks storage movement results + */ + void remove(BlocksStorageMovementResult[] results) { + if (results != null) { + synchronized (trackIdVsMovementStatus) { + for (BlocksStorageMovementResult blocksMovementResult : results) { + trackIdVsMovementStatus.remove(blocksMovementResult); + } + } + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 5ef07b71db0..f4572d29e42 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -264,6 +264,7 @@ import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger; import org.apache.hadoop.hdfs.server.namenode.top.TopConf; import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics; import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -3860,7 +3861,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, VolumeFailureSummary volumeFailureSummary, boolean requestFullBlockReportLease, @Nonnull SlowPeerReports slowPeers, - @Nonnull SlowDiskReports slowDisks) throws IOException { + @Nonnull SlowDiskReports slowDisks, + BlocksStorageMovementResult[] blksMovementResults) throws IOException { readLock(); try { //get datanode commands @@ -3874,6 +3876,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, if (requestFullBlockReportLease) { blockReportLeaseId = blockManager.requestBlockReportLeaseId(nodeReg); } + + // TODO: Handle blocks movement results send by the coordinator datanode. + // This has to be revisited as part of HDFS-11029. + blockManager.getStoragePolicySatisfier() + .handleBlocksStorageMovementResults(blksMovementResults); + //create ha status final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat( haContext.getState().getServiceState(), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index c5b9d5a053f..fdca492b759 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -156,6 +156,7 @@ import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; @@ -1498,13 +1499,15 @@ public class NameNodeRpcServer implements NamenodeProtocols { int failedVolumes, VolumeFailureSummary volumeFailureSummary, boolean requestFullBlockReportLease, @Nonnull SlowPeerReports slowPeers, - @Nonnull SlowDiskReports slowDisks) throws IOException { + @Nonnull SlowDiskReports slowDisks, + BlocksStorageMovementResult[] blkMovementStatus) throws IOException { checkNNStartup(); verifyRequest(nodeReg); return namesystem.handleHeartbeat(nodeReg, report, dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress, failedVolumes, volumeFailureSummary, requestFullBlockReportLease, - slowPeers, slowDisks); + slowPeers, slowDisks, + blkMovementStatus); } @Override // DatanodeProtocol diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java index b5aed376234..fbe686a796c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java @@ -39,11 +39,14 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + /** * Setting storagePolicy on a file after the file write will only update the new * storage policy type in Namespace, but physical block storage movement will @@ -394,4 +397,24 @@ public class StoragePolicySatisfier implements Runnable { return typeNodeMap.get(type); } } + + // TODO: Temporarily keeping the results for assertion. This has to be + // revisited as part of HDFS-11029. + @VisibleForTesting + List results = new ArrayList<>(); + + /** + * Receives the movement results of collection of blocks associated to a + * trackId. + * + * @param blksMovementResults + * movement status of the set of blocks associated to a trackId. + */ + void handleBlocksStorageMovementResults( + BlocksStorageMovementResult[] blksMovementResults) { + if (blksMovementResults.length <= 0) { + return; + } + results.addAll(Arrays.asList(blksMovementResults)); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java new file mode 100644 index 00000000000..1afba340545 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +/** + * This class represents, movement status of a set of blocks associated to a + * track Id. + */ +public class BlocksStorageMovementResult { + + private final long trackId; + private final Status status; + + /** + * SUCCESS - If all the blocks associated to track id has moved successfully + * or maximum possible movements done. + * + *

+ * FAILURE - If any of its(trackId) blocks movement failed and requires to + * retry these failed blocks movements. Example selected target node is no + * more running or no space. So, retrying by selecting new target node might + * work. + */ + public static enum Status { + SUCCESS, FAILURE; + } + + /** + * BlocksStorageMovementResult constructor. + * + * @param trackId + * tracking identifier + * @param status + * block movement status + */ + public BlocksStorageMovementResult(long trackId, Status status) { + this.trackId = trackId; + this.status = status; + } + + public long getTrackId() { + return trackId; + } + + public Status getStatus() { + return status; + } + +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index 283f367032b..858f59be4d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -111,6 +111,8 @@ public interface DatanodeProtocol { * @param slowPeers Details of peer DataNodes that were detected as being * slow to respond to packet writes. Empty report if no * slow peers were detected by the DataNode. + * @param blksMovementResults array of movement status of a set of blocks + * associated to a trackId. * @throws IOException on error */ @Idempotent @@ -124,7 +126,8 @@ public interface DatanodeProtocol { VolumeFailureSummary volumeFailureSummary, boolean requestFullBlockReportLease, @Nonnull SlowPeerReports slowPeers, - @Nonnull SlowDiskReports slowDisks) + @Nonnull SlowDiskReports slowDisks, + BlocksStorageMovementResult[] blksMovementResults) throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index 8e198094844..77b0f86fb13 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -176,6 +176,18 @@ message BlockStorageMovementProto { required StorageTypesProto targetStorageTypes = 5; } +/** + * Movement status of the set of blocks associated to a trackId. + */ +message BlocksStorageMovementResultProto { + enum Status { + SUCCESS = 1; // block movement succeeded + FAILURE = 2; // block movement failed and needs to retry + } + required uint64 trackID = 1; + required Status status = 2; +} + /** * registration - Information of the datanode registering with the namenode */ @@ -219,6 +231,7 @@ message VolumeFailureSummaryProto { * volumeFailureSummary - info about volume failures * slowPeers - info about peer DataNodes that are suspected to be slow. * slowDisks - info about DataNode disks that are suspected to be slow. + * blksMovementResults - status of the scheduled blocks movements */ message HeartbeatRequestProto { required DatanodeRegistrationProto registration = 1; // Datanode info @@ -232,6 +245,7 @@ message HeartbeatRequestProto { optional bool requestFullBlockReportLease = 9 [ default = false ]; repeated SlowPeerReportProto slowPeers = 10; repeated SlowDiskReportProto slowDisks = 11; + repeated BlocksStorageMovementResultProto blksMovementResults = 12; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java index 05b6d3023b3..9530e20baa9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; @@ -116,7 +117,7 @@ public class TestNameNodePrunesMissingStorages { cluster.stopDataNode(0); cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0, 0, null, true, SlowPeerReports.EMPTY_REPORT, - SlowDiskReports.EMPTY_REPORT); + SlowDiskReports.EMPTY_REPORT, new BlocksStorageMovementResult[0]); // Check that the missing storage was pruned. assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java index b453991e91a..bd831d6a67b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; @@ -167,7 +168,8 @@ public class InternalDataNodeTestUtils { Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), Mockito.any(SlowPeerReports.class), - Mockito.any(SlowDiskReports.class))).thenReturn( + Mockito.any(SlowDiskReports.class), + Mockito.any(BlocksStorageMovementResult[].class))).thenReturn( new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat( HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current() .nextLong() | 1L)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 4863ca18f92..3d006e08c2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -123,6 +124,8 @@ public class TestBPOfferService { Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf(); Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")) .when(mockDn).getMetrics(); + Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn)).when(mockDn) + .getStoragePolicySatisfyWorker(); // Set up a simulated dataset with our fake BP mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf)); @@ -157,7 +160,8 @@ public class TestBPOfferService { Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), Mockito.any(SlowPeerReports.class), - Mockito.any(SlowDiskReports.class)); + Mockito.any(SlowDiskReports.class), + Mockito.any(BlocksStorageMovementResult[].class)); mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0); datanodeCommands[nnIdx] = new DatanodeCommand[0]; return mock; @@ -376,6 +380,8 @@ public class TestBPOfferService { Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf(); Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")). when(mockDn).getMetrics(); + Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn)).when(mockDn) + .getStoragePolicySatisfyWorker(); final AtomicInteger count = new AtomicInteger(); Mockito.doAnswer(new Answer() { @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 07fd4ae58d7..a05fdfd86ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -93,6 +93,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -232,7 +233,8 @@ public class TestBlockRecovery { Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), Mockito.any(SlowPeerReports.class), - Mockito.any(SlowDiskReports.class))) + Mockito.any(SlowDiskReports.class), + Mockito.any(BlocksStorageMovementResult[].class))) .thenReturn(new HeartbeatResponse( new DatanodeCommand[0], new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java index 28427bce75a..b15b5308d40 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; @@ -172,7 +173,8 @@ public class TestDataNodeLifeline { any(VolumeFailureSummary.class), anyBoolean(), any(SlowPeerReports.class), - any(SlowDiskReports.class)); + any(SlowDiskReports.class), + any(BlocksStorageMovementResult[].class)); // Intercept lifeline to trigger latch count-down on each call. doAnswer(new LatchCountingAnswer(lifelinesSent)) @@ -237,7 +239,8 @@ public class TestDataNodeLifeline { any(VolumeFailureSummary.class), anyBoolean(), any(SlowPeerReports.class), - any(SlowDiskReports.class)); + any(SlowDiskReports.class), + any(BlocksStorageMovementResult[].class)); // While waiting on the latch for the expected number of heartbeat messages, // poll DataNode tracking information. We expect that the DataNode always diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java index bb1d9eff0ee..d7ac3f995ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; @@ -222,7 +223,8 @@ public class TestDatanodeProtocolRetryPolicy { Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), Mockito.any(SlowPeerReports.class), - Mockito.any(SlowDiskReports.class)); + Mockito.any(SlowDiskReports.class), + Mockito.any(BlocksStorageMovementResult[].class)); dn = new DataNode(conf, locations, null, null) { @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index 2dbd5b9bd01..b9f21a044db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.Page import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -208,7 +209,8 @@ public class TestFsDatasetCache { (StorageReport[]) any(), anyLong(), anyLong(), anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(), anyBoolean(), any(SlowPeerReports.class), - any(SlowDiskReports.class)); + any(SlowDiskReports.class), + (BlocksStorageMovementResult[]) any()); } finally { lock.writeLock().unlock(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java index ea3eec31ba0..1eb44e057e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java @@ -34,10 +34,9 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult; -import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementStatus; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.INode; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; @@ -191,12 +190,12 @@ public class TestStoragePolicySatisfyWorker { GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { - List completedBlocks = worker - .getBlocksMovementsCompletionHandler().getCompletedBlocks(); + List completedBlocks = worker + .getBlocksMovementsCompletionHandler().getBlksMovementResults(); int failedCount = 0; - for (BlockMovementResult blockMovementResult : completedBlocks) { - if (BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE == - blockMovementResult.getStatus()) { + for (BlocksStorageMovementResult blkMovementResult : completedBlocks) { + if (blkMovementResult.getStatus() == + BlocksStorageMovementResult.Status.FAILURE) { failedCount++; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java index 5f62ddb084b..df120ca1c5a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; @@ -110,7 +111,8 @@ public class TestStorageReport { anyLong(), anyLong(), anyInt(), anyInt(), anyInt(), Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), Mockito.any(SlowPeerReports.class), - Mockito.any(SlowDiskReports.class)); + Mockito.any(SlowDiskReports.class), + Mockito.any(BlocksStorageMovementResult[].class)); StorageReport[] reports = captor.getValue(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index 3a3c47177aa..1e016f71737 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -956,8 +957,8 @@ public class NNThroughputBenchmark implements Tool { DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) }; DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep, 0L, 0L, 0, 0, 0, null, true, - SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT) - .getCommands(); + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT, + new BlocksStorageMovementResult[0]).getCommands(); if(cmds != null) { for (DatanodeCommand cmd : cmds ) { if(LOG.isDebugEnabled()) { @@ -1007,8 +1008,8 @@ public class NNThroughputBenchmark implements Tool { false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0) }; DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep, 0L, 0L, 0, 0, 0, null, true, - SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT) - .getCommands(); + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT, + new BlocksStorageMovementResult[0]).getCommands(); if (cmds != null) { for (DatanodeCommand cmd : cmds) { if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index b85527a9481..4584adde207 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; @@ -130,7 +131,8 @@ public class NameNodeAdapter { return namesystem.handleHeartbeat(nodeReg, BlockManagerTestUtil.getStorageReportsForDatanode(dd), dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true, - SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT); + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT, + new BlocksStorageMovementResult[0]); } public static boolean setReplication(final FSNamesystem ns, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java index 366f584fa03..36beaa82657 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -139,8 +140,8 @@ public class TestDeadDatanode { false, 0, 0, 0, 0, 0) }; DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true, - SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT) - .getCommands(); + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT, + new BlocksStorageMovementResult[0]).getCommands(); assertEquals(1, cmd.length); assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER .getAction()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java index 37664b5f6d3..cbfdfc6df85 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.IOException; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -27,6 +28,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Before; import org.junit.Test; @@ -146,6 +148,54 @@ public class TestStoragePolicySatisfier { } } + /** + * Tests to verify that the block storage movement results will be propagated + * to Namenode via datanode heartbeat. + */ + @Test(timeout = 300000) + public void testPerTrackIdBlocksStorageMovementResults() throws Exception { + try { + // Change policy to ONE_SSD + distributedFS.setStoragePolicy(new Path(file), "ONE_SSD"); + FSNamesystem namesystem = hdfsCluster.getNamesystem(); + INode inode = namesystem.getFSDirectory().getINode(file); + + StorageType[][] newtypes = + new StorageType[][]{{StorageType.SSD, StorageType.DISK}}; + + // Making sure SDD based nodes added to cluster. Adding SSD based + // datanodes. + startAdditionalDNs(config, 1, numOfDatanodes, newtypes, + storagesPerDatanode, capacity, hdfsCluster); + namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); + hdfsCluster.triggerHeartbeats(); + + // Wait till the block is moved to SSD areas + waitExpectedStorageType(file, StorageType.SSD, distributedFS, 1, 30000); + waitExpectedStorageType(file, StorageType.DISK, distributedFS, 2, 30000); + + // TODO: Temporarily using the results from StoragePolicySatisfier class. + // This has to be revisited as part of HDFS-11029. + waitForBlocksMovementResult(1, 30000); + } finally { + hdfsCluster.shutdown(); + } + } + + private void waitForBlocksMovementResult(int expectedResultsCount, + int timeout) throws TimeoutException, InterruptedException { + BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); + final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier(); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + LOG.info("expectedResultsCount={} actualResultsCount={}", + expectedResultsCount, sps.results.size()); + return expectedResultsCount == sps.results.size(); + } + }, 100, timeout); + } + private void writeContent(final DistributedFileSystem dfs, final String fileName) throws IOException { // write to DISK