From ae29d9ee0419bce28530da5ef1c6fe36a6d50ad0 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 25 Feb 2014 01:24:26 +0000 Subject: [PATCH 1/3] MAPREDUCE-5761. Added a simple log message to denote when encrypted shuffle is on in the shuffle-handler. Contributed by Jian He. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1571514 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 +++ .../src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java | 1 + 2 files changed, 4 insertions(+) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 30bc3137de9..65ade2c57a3 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -173,6 +173,9 @@ Release 2.4.0 - UNRELEASED MAPREDUCE-5699. Allow setting tags on MR jobs (kasha) + MAPREDUCE-5761. Added a simple log message to denote when encrypted shuffle + is on in the shuffle-handler. (Jian He via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index f9f924bd243..9a192463b8e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -367,6 +367,7 @@ public class ShuffleHandler extends AuxiliaryService { SHUFFLE = getShuffle(conf); if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) { + LOG.info("Encrypted shuffle is enabled."); sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf); sslFactory.init(); } From ad70f26b1fe6579166a042fec2e9f21ec56464cb Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 25 Feb 2014 02:07:19 +0000 Subject: [PATCH 2/3] YARN-1734. Fixed ResourceManager to update the configurations when it transits from standby to active mode so as to assimilate any changes that happened while it was in standby mode. Contributed by Xuan Gong. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1571539 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 4 + .../server/resourcemanager/AdminService.java | 44 ++++++++- .../resourcemanager/TestRMAdminService.java | 92 +++++++++++++++++++ 3 files changed, 138 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b399dd172ac..969a889125d 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -348,6 +348,10 @@ Release 2.4.0 - UNRELEASED re-registration after a RESYNC and thus avoid hanging. (Rohith Sharma via vinodkv) + YARN-1734. Fixed ResourceManager to update the configurations when it + transits from standby to active mode so as to assimilate any changes that + happened while it was in standby mode. (Xuan Gong via vinodkv) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 70845c775e4..c53d40f54a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -250,10 +250,20 @@ public class AdminService extends CompositeService implements @Override public synchronized void transitionToActive( HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException { + // call refreshAdminAcls before HA state transition + // for the case that adminAcls have been updated in previous active RM + try { + refreshAdminAcls(false); + } catch (YarnException ex) { + throw new ServiceFailedException("Can not execute refreshAdminAcls", ex); + } + UserGroupInformation user = checkAccess("transitionToActive"); checkHaStateChange(reqInfo); try { rm.transitionToActive(); + // call all refresh*s for active RM to get the updated configurations. + refreshAll(); RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive", "RMHAProtocolService"); } catch (Exception e) { @@ -268,6 +278,13 @@ public class AdminService extends CompositeService implements @Override public synchronized void transitionToStandby( HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException { + // call refreshAdminAcls before HA state transition + // for the case that adminAcls have been updated in previous active RM + try { + refreshAdminAcls(false); + } catch (YarnException ex) { + throw new ServiceFailedException("Can not execute refreshAdminAcls", ex); + } UserGroupInformation user = checkAccess("transitionToStandby"); checkHaStateChange(reqInfo); try { @@ -406,10 +423,15 @@ public class AdminService extends CompositeService implements @Override public RefreshAdminAclsResponse refreshAdminAcls( RefreshAdminAclsRequest request) throws YarnException, IOException { + return refreshAdminAcls(true); + } + + private RefreshAdminAclsResponse refreshAdminAcls(boolean checkRMHAState) + throws YarnException, IOException { String argName = "refreshAdminAcls"; UserGroupInformation user = checkAcls(argName); - - if (!isRMActive()) { + + if (checkRMHAState && !isRMActive()) { RMAuditLogger.logFailure(user.getShortUserName(), argName, adminAcl.toString(), "AdminService", "ResourceManager is not active. Can not refresh user-groups."); @@ -521,6 +543,24 @@ public class AdminService extends CompositeService implements return conf; } + private void refreshAll() throws ServiceFailedException { + try { + refreshQueues(RefreshQueuesRequest.newInstance()); + refreshNodes(RefreshNodesRequest.newInstance()); + refreshSuperUserGroupsConfiguration( + RefreshSuperUserGroupsConfigurationRequest.newInstance()); + refreshUserToGroupsMappings( + RefreshUserToGroupsMappingsRequest.newInstance()); + if (getConfig().getBoolean( + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, + false)) { + refreshServiceAcls(RefreshServiceAclsRequest.newInstance()); + } + } catch (Exception ex) { + throw new ServiceFailedException(ex.getMessage()); + } + } + @VisibleForTesting public AccessControlList getAccessControlList() { return this.adminAcl; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java index e67b81f36f2..60259cddbd5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java @@ -34,12 +34,16 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.security.GroupMappingServiceProvider; import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; @@ -518,6 +522,94 @@ public class TestRMAdminService { Assert.assertTrue(excludeHosts.contains("0.0.0.0:123")); } + @Test + public void testRMHAWithFileSystemBasedConfiguration() throws IOException, + YarnException { + StateChangeRequestInfo requestInfo = new StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); + configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + configuration.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2"); + int base = 100; + for (String confKey : YarnConfiguration + .getServiceAddressConfKeys(configuration)) { + configuration.set(HAUtil.addSuffix(confKey, "rm1"), "0.0.0.0:" + + (base + 20)); + configuration.set(HAUtil.addSuffix(confKey, "rm2"), "0.0.0.0:" + + (base + 40)); + base = base * 2; + } + Configuration conf1 = new Configuration(configuration); + conf1.set(YarnConfiguration.RM_HA_ID, "rm1"); + Configuration conf2 = new Configuration(configuration); + conf2.set(YarnConfiguration.RM_HA_ID, "rm2"); + + // upload default configurations + uploadDefaultConfiguration(); + + MockRM rm1 = null; + MockRM rm2 = null; + try { + rm1 = new MockRM(conf1); + rm1.init(conf1); + rm1.start(); + Assert.assertTrue(rm1.getRMContext().getHAServiceState() + == HAServiceState.STANDBY); + + rm2 = new MockRM(conf2); + rm2.init(conf1); + rm2.start(); + Assert.assertTrue(rm2.getRMContext().getHAServiceState() + == HAServiceState.STANDBY); + + rm1.adminService.transitionToActive(requestInfo); + Assert.assertTrue(rm1.getRMContext().getHAServiceState() + == HAServiceState.ACTIVE); + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.set("yarn.scheduler.capacity.maximum-applications", "5000"); + uploadConfiguration(csConf, "capacity-scheduler.xml"); + + rm1.adminService.refreshQueues(RefreshQueuesRequest.newInstance()); + + int maxApps = + ((CapacityScheduler) rm1.getRMContext().getScheduler()) + .getConfiguration().getMaximumSystemApplications(); + Assert.assertEquals(maxApps, 5000); + + // Before failover happens, the maxApps is + // still the default value on the standby rm : rm2 + int maxAppsBeforeFailOver = + ((CapacityScheduler) rm2.getRMContext().getScheduler()) + .getConfiguration().getMaximumSystemApplications(); + Assert.assertEquals(maxAppsBeforeFailOver, 10000); + + // Do the failover + rm1.adminService.transitionToStandby(requestInfo); + rm2.adminService.transitionToActive(requestInfo); + Assert.assertTrue(rm1.getRMContext().getHAServiceState() + == HAServiceState.STANDBY); + Assert.assertTrue(rm2.getRMContext().getHAServiceState() + == HAServiceState.ACTIVE); + + int maxAppsAfter = + ((CapacityScheduler) rm2.getRMContext().getScheduler()) + .getConfiguration().getMaximumSystemApplications(); + + Assert.assertEquals(maxAppsAfter, 5000); + } finally { + if (rm1 != null) { + rm1.stop(); + } + if (rm2 != null) { + rm2.stop(); + } + } + } + private String writeConfigurationXML(Configuration conf, String confXMLName) throws IOException { DataOutputStream output = null; From 440c3cd1050f2a871a73d44406c0013b6ff73f2e Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Tue, 25 Feb 2014 02:16:29 +0000 Subject: [PATCH 3/3] HDFS-5922. DN heartbeat thread can get stuck in tight loop. (Arpit Agarwal) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1571542 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../apache/hadoop/hdfs/DFSOutputStream.java | 1 + .../hdfs/server/datanode/BPServiceActor.java | 26 ++- .../datanode/TestIncrementalBlockReports.java | 211 ++++++++++++++++++ 4 files changed, 230 insertions(+), 10 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index edf805f29fa..30fbc526c52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -613,6 +613,8 @@ Release 2.4.0 - UNRELEASED HDFS-5981. PBImageXmlWriter generates malformed XML. (Haohui Mai via cnauroth) + HDFS-5922. DN heartbeat thread can get stuck in tight loop. (Arpit Agarwal) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 4f4e71a944d..dc5ccf4a5f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -1108,6 +1108,7 @@ public class DFSOutputStream extends FSOutputSummer excluded.length > 0 ? excluded : null); block = lb.getBlock(); block.setNumBytes(0); + bytesSent = 0; accessToken = lb.getBlockToken(); nodes = lb.getLocations(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index a1c2cb2f50c..a11c97166dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -101,7 +101,10 @@ class BPServiceActor implements Runnable { private final Map pendingIncrementalBRperStorage = Maps.newHashMap(); - private volatile int pendingReceivedRequests = 0; + // IBR = Incremental Block Report. If this flag is set then an IBR will be + // sent immediately by the actor thread without waiting for the IBR timer + // to elapse. + private volatile boolean sendImmediateIBR = false; private volatile boolean shouldServiceRun = true; private final DataNode dn; private final DNConf dnConf; @@ -283,12 +286,10 @@ class BPServiceActor implements Runnable { if (perStorageMap.getBlockInfoCount() > 0) { // Send newly-received and deleted blockids to namenode ReceivedDeletedBlockInfo[] rdbi = perStorageMap.dequeueBlockInfos(); - pendingReceivedRequests = - (pendingReceivedRequests > rdbi.length ? - (pendingReceivedRequests - rdbi.length) : 0); reports.add(new StorageReceivedDeletedBlocks(storageUuid, rdbi)); } } + sendImmediateIBR = false; } if (reports.size() == 0) { @@ -312,8 +313,8 @@ class BPServiceActor implements Runnable { // didn't put something newer in the meantime. PerStoragePendingIncrementalBR perStorageMap = pendingIncrementalBRperStorage.get(report.getStorageID()); - pendingReceivedRequests += - perStorageMap.putMissingBlockInfos(report.getBlocks()); + perStorageMap.putMissingBlockInfos(report.getBlocks()); + sendImmediateIBR = true; } } } @@ -371,7 +372,7 @@ class BPServiceActor implements Runnable { ReceivedDeletedBlockInfo bInfo, String storageUuid) { synchronized (pendingIncrementalBRperStorage) { addPendingReplicationBlockInfo(bInfo, storageUuid); - pendingReceivedRequests++; + sendImmediateIBR = true; pendingIncrementalBRperStorage.notifyAll(); } } @@ -433,6 +434,11 @@ class BPServiceActor implements Runnable { } } + @VisibleForTesting + boolean hasPendingIBR() { + return sendImmediateIBR; + } + /** * Report the list blocks to the Namenode * @return DatanodeCommands returned by the NN. May be null. @@ -676,8 +682,8 @@ class BPServiceActor implements Runnable { } } } - if (pendingReceivedRequests > 0 - || (startTime - lastDeletedReport > dnConf.deleteReportInterval)) { + if (sendImmediateIBR || + (startTime - lastDeletedReport > dnConf.deleteReportInterval)) { reportReceivedDeletedBlocks(); lastDeletedReport = startTime; } @@ -701,7 +707,7 @@ class BPServiceActor implements Runnable { long waitTime = dnConf.heartBeatInterval - (Time.now() - lastHeartbeat); synchronized(pendingIncrementalBRperStorage) { - if (waitTime > 0 && pendingReceivedRequests == 0) { + if (waitTime > 0 && !sendImmediateIBR) { try { pendingIncrementalBRperStorage.wait(waitTime); } catch (InterruptedException ie) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java new file mode 100644 index 00000000000..5faa0e73bb1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static junit.framework.Assert.assertFalse; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.times; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.*; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; + +import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * Verify that incremental block reports are generated in response to + * block additions/deletions. + */ +public class TestIncrementalBlockReports { + public static final Log LOG = LogFactory.getLog(TestIncrementalBlockReports.class); + + private static final short DN_COUNT = 1; + private static final long DUMMY_BLOCK_ID = 5678; + private static final long DUMMY_BLOCK_LENGTH = 1024 * 1024; + private static final long DUMMY_BLOCK_GENSTAMP = 1000; + + private MiniDFSCluster cluster = null; + private DistributedFileSystem fs; + private Configuration conf; + private NameNode singletonNn; + private DataNode singletonDn; + private BPOfferService bpos; // BPOS to use for block injection. + private BPServiceActor actor; // BPSA to use for block injection. + private String storageUuid; // DatanodeStorage to use for block injection. + + @Before + public void startCluster() throws IOException { + conf = new HdfsConfiguration(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DN_COUNT).build(); + fs = cluster.getFileSystem(); + singletonNn = cluster.getNameNode(); + singletonDn = cluster.getDataNodes().get(0); + bpos = singletonDn.getAllBpOs()[0]; + actor = bpos.getBPServiceActors().get(0); + storageUuid = singletonDn.getFSDataset().getVolumes().get(0).getStorageID(); + } + + private static Block getDummyBlock() { + return new Block(DUMMY_BLOCK_ID, DUMMY_BLOCK_LENGTH, DUMMY_BLOCK_GENSTAMP); + } + + /** + * Inject a fake 'received' block into the BPServiceActor state. + */ + private void injectBlockReceived() { + ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo( + getDummyBlock(), BlockStatus.RECEIVED_BLOCK, null); + actor.notifyNamenodeBlockImmediately(rdbi, storageUuid); + } + + /** + * Inject a fake 'deleted' block into the BPServiceActor state. + */ + private void injectBlockDeleted() { + ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo( + getDummyBlock(), BlockStatus.DELETED_BLOCK, null); + actor.notifyNamenodeDeletedBlock(rdbi, storageUuid); + } + + /** + * Spy on calls from the DN to the NN. + * @return spy object that can be used for Mockito verification. + */ + DatanodeProtocolClientSideTranslatorPB spyOnDnCallsToNn() { + return DataNodeTestUtils.spyOnBposToNN(singletonDn, singletonNn); + } + + /** + * Ensure that an IBR is generated immediately for a block received by + * the DN. + * + * @throws InterruptedException + * @throws IOException + */ + @Test (timeout=60000) + public void testReportBlockReceived() throws InterruptedException, IOException { + try { + DatanodeProtocolClientSideTranslatorPB nnSpy = spyOnDnCallsToNn(); + injectBlockReceived(); + + // Sleep for a very short time, this is necessary since the IBR is + // generated asynchronously. + Thread.sleep(2000); + + // Ensure that the received block was reported immediately. + Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); + } finally { + cluster.shutdown(); + cluster = null; + } + } + + /** + * Ensure that a delayed IBR is generated for a block deleted on the DN. + * + * @throws InterruptedException + * @throws IOException + */ + @Test (timeout=60000) + public void testReportBlockDeleted() throws InterruptedException, IOException { + try { + // Trigger a block report to reset the IBR timer. + DataNodeTestUtils.triggerBlockReport(singletonDn); + + // Spy on calls from the DN to the NN + DatanodeProtocolClientSideTranslatorPB nnSpy = spyOnDnCallsToNn(); + injectBlockDeleted(); + + // Sleep for a very short time since IBR is generated + // asynchronously. + Thread.sleep(2000); + + // Ensure that no block report was generated immediately. + // Deleted blocks are reported when the IBR timer elapses. + Mockito.verify(nnSpy, times(0)).blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); + + // Trigger a block report, this also triggers an IBR. + DataNodeTestUtils.triggerBlockReport(singletonDn); + Thread.sleep(2000); + + // Ensure that the deleted block is reported. + Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); + + } finally { + cluster.shutdown(); + cluster = null; + } + } + + /** + * Add a received block entry and then replace it. Ensure that a single + * IBR is generated and that pending receive request state is cleared. + * This test case verifies the failure in HDFS-5922. + * + * @throws InterruptedException + * @throws IOException + */ + @Test (timeout=60000) + public void testReplaceReceivedBlock() throws InterruptedException, IOException { + try { + // Spy on calls from the DN to the NN + DatanodeProtocolClientSideTranslatorPB nnSpy = spyOnDnCallsToNn(); + injectBlockReceived(); + injectBlockReceived(); // Overwrite the existing entry. + + // Sleep for a very short time since IBR is generated + // asynchronously. + Thread.sleep(2000); + + // Ensure that the received block is reported. + Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); + + // Ensure that no more IBRs are pending. + assertFalse(actor.hasPendingIBR()); + + } finally { + cluster.shutdown(); + cluster = null; + } + } +}