From 596a196c1c7d7624f9a4e324801359f4a237d5df Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Tue, 25 Feb 2014 02:18:06 +0000 Subject: [PATCH] HDFS-5922. Merging r1571542 from trunk to branch-2. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1571543 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../apache/hadoop/hdfs/DFSOutputStream.java | 1 + .../hdfs/server/datanode/BPServiceActor.java | 26 ++- .../datanode/TestIncrementalBlockReports.java | 211 ++++++++++++++++++ 4 files changed, 230 insertions(+), 10 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 3ab962b717a..09596c7659e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -284,6 +284,8 @@ Release 2.4.0 - UNRELEASED HDFS-5981. PBImageXmlWriter generates malformed XML. (Haohui Mai via cnauroth) + HDFS-5922. DN heartbeat thread can get stuck in tight loop. (Arpit Agarwal) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 726866bfa99..0132f431322 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -1113,6 +1113,7 @@ private LocatedBlock nextBlockOutputStream() throws IOException { excluded.length > 0 ? excluded : null); block = lb.getBlock(); block.setNumBytes(0); + bytesSent = 0; accessToken = lb.getBlockToken(); nodes = lb.getLocations(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index a1c2cb2f50c..a11c97166dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -101,7 +101,10 @@ class BPServiceActor implements Runnable { private final Map pendingIncrementalBRperStorage = Maps.newHashMap(); - private volatile int pendingReceivedRequests = 0; + // IBR = Incremental Block Report. If this flag is set then an IBR will be + // sent immediately by the actor thread without waiting for the IBR timer + // to elapse. + private volatile boolean sendImmediateIBR = false; private volatile boolean shouldServiceRun = true; private final DataNode dn; private final DNConf dnConf; @@ -283,12 +286,10 @@ private void reportReceivedDeletedBlocks() throws IOException { if (perStorageMap.getBlockInfoCount() > 0) { // Send newly-received and deleted blockids to namenode ReceivedDeletedBlockInfo[] rdbi = perStorageMap.dequeueBlockInfos(); - pendingReceivedRequests = - (pendingReceivedRequests > rdbi.length ? - (pendingReceivedRequests - rdbi.length) : 0); reports.add(new StorageReceivedDeletedBlocks(storageUuid, rdbi)); } } + sendImmediateIBR = false; } if (reports.size() == 0) { @@ -312,8 +313,8 @@ private void reportReceivedDeletedBlocks() throws IOException { // didn't put something newer in the meantime. PerStoragePendingIncrementalBR perStorageMap = pendingIncrementalBRperStorage.get(report.getStorageID()); - pendingReceivedRequests += - perStorageMap.putMissingBlockInfos(report.getBlocks()); + perStorageMap.putMissingBlockInfos(report.getBlocks()); + sendImmediateIBR = true; } } } @@ -371,7 +372,7 @@ void notifyNamenodeBlockImmediately( ReceivedDeletedBlockInfo bInfo, String storageUuid) { synchronized (pendingIncrementalBRperStorage) { addPendingReplicationBlockInfo(bInfo, storageUuid); - pendingReceivedRequests++; + sendImmediateIBR = true; pendingIncrementalBRperStorage.notifyAll(); } } @@ -433,6 +434,11 @@ void triggerDeletionReportForTests() { } } + @VisibleForTesting + boolean hasPendingIBR() { + return sendImmediateIBR; + } + /** * Report the list blocks to the Namenode * @return DatanodeCommands returned by the NN. May be null. @@ -676,8 +682,8 @@ private void offerService() throws Exception { } } } - if (pendingReceivedRequests > 0 - || (startTime - lastDeletedReport > dnConf.deleteReportInterval)) { + if (sendImmediateIBR || + (startTime - lastDeletedReport > dnConf.deleteReportInterval)) { reportReceivedDeletedBlocks(); lastDeletedReport = startTime; } @@ -701,7 +707,7 @@ private void offerService() throws Exception { long waitTime = dnConf.heartBeatInterval - (Time.now() - lastHeartbeat); synchronized(pendingIncrementalBRperStorage) { - if (waitTime > 0 && pendingReceivedRequests == 0) { + if (waitTime > 0 && !sendImmediateIBR) { try { pendingIncrementalBRperStorage.wait(waitTime); } catch (InterruptedException ie) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java new file mode 100644 index 00000000000..5faa0e73bb1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static junit.framework.Assert.assertFalse; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.times; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.*; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; + +import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * Verify that incremental block reports are generated in response to + * block additions/deletions. + */ +public class TestIncrementalBlockReports { + public static final Log LOG = LogFactory.getLog(TestIncrementalBlockReports.class); + + private static final short DN_COUNT = 1; + private static final long DUMMY_BLOCK_ID = 5678; + private static final long DUMMY_BLOCK_LENGTH = 1024 * 1024; + private static final long DUMMY_BLOCK_GENSTAMP = 1000; + + private MiniDFSCluster cluster = null; + private DistributedFileSystem fs; + private Configuration conf; + private NameNode singletonNn; + private DataNode singletonDn; + private BPOfferService bpos; // BPOS to use for block injection. + private BPServiceActor actor; // BPSA to use for block injection. + private String storageUuid; // DatanodeStorage to use for block injection. + + @Before + public void startCluster() throws IOException { + conf = new HdfsConfiguration(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DN_COUNT).build(); + fs = cluster.getFileSystem(); + singletonNn = cluster.getNameNode(); + singletonDn = cluster.getDataNodes().get(0); + bpos = singletonDn.getAllBpOs()[0]; + actor = bpos.getBPServiceActors().get(0); + storageUuid = singletonDn.getFSDataset().getVolumes().get(0).getStorageID(); + } + + private static Block getDummyBlock() { + return new Block(DUMMY_BLOCK_ID, DUMMY_BLOCK_LENGTH, DUMMY_BLOCK_GENSTAMP); + } + + /** + * Inject a fake 'received' block into the BPServiceActor state. + */ + private void injectBlockReceived() { + ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo( + getDummyBlock(), BlockStatus.RECEIVED_BLOCK, null); + actor.notifyNamenodeBlockImmediately(rdbi, storageUuid); + } + + /** + * Inject a fake 'deleted' block into the BPServiceActor state. + */ + private void injectBlockDeleted() { + ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo( + getDummyBlock(), BlockStatus.DELETED_BLOCK, null); + actor.notifyNamenodeDeletedBlock(rdbi, storageUuid); + } + + /** + * Spy on calls from the DN to the NN. + * @return spy object that can be used for Mockito verification. + */ + DatanodeProtocolClientSideTranslatorPB spyOnDnCallsToNn() { + return DataNodeTestUtils.spyOnBposToNN(singletonDn, singletonNn); + } + + /** + * Ensure that an IBR is generated immediately for a block received by + * the DN. + * + * @throws InterruptedException + * @throws IOException + */ + @Test (timeout=60000) + public void testReportBlockReceived() throws InterruptedException, IOException { + try { + DatanodeProtocolClientSideTranslatorPB nnSpy = spyOnDnCallsToNn(); + injectBlockReceived(); + + // Sleep for a very short time, this is necessary since the IBR is + // generated asynchronously. + Thread.sleep(2000); + + // Ensure that the received block was reported immediately. + Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); + } finally { + cluster.shutdown(); + cluster = null; + } + } + + /** + * Ensure that a delayed IBR is generated for a block deleted on the DN. + * + * @throws InterruptedException + * @throws IOException + */ + @Test (timeout=60000) + public void testReportBlockDeleted() throws InterruptedException, IOException { + try { + // Trigger a block report to reset the IBR timer. + DataNodeTestUtils.triggerBlockReport(singletonDn); + + // Spy on calls from the DN to the NN + DatanodeProtocolClientSideTranslatorPB nnSpy = spyOnDnCallsToNn(); + injectBlockDeleted(); + + // Sleep for a very short time since IBR is generated + // asynchronously. + Thread.sleep(2000); + + // Ensure that no block report was generated immediately. + // Deleted blocks are reported when the IBR timer elapses. + Mockito.verify(nnSpy, times(0)).blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); + + // Trigger a block report, this also triggers an IBR. + DataNodeTestUtils.triggerBlockReport(singletonDn); + Thread.sleep(2000); + + // Ensure that the deleted block is reported. + Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); + + } finally { + cluster.shutdown(); + cluster = null; + } + } + + /** + * Add a received block entry and then replace it. Ensure that a single + * IBR is generated and that pending receive request state is cleared. + * This test case verifies the failure in HDFS-5922. + * + * @throws InterruptedException + * @throws IOException + */ + @Test (timeout=60000) + public void testReplaceReceivedBlock() throws InterruptedException, IOException { + try { + // Spy on calls from the DN to the NN + DatanodeProtocolClientSideTranslatorPB nnSpy = spyOnDnCallsToNn(); + injectBlockReceived(); + injectBlockReceived(); // Overwrite the existing entry. + + // Sleep for a very short time since IBR is generated + // asynchronously. + Thread.sleep(2000); + + // Ensure that the received block is reported. + Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); + + // Ensure that no more IBRs are pending. + assertFalse(actor.hasPendingIBR()); + + } finally { + cluster.shutdown(); + cluster = null; + } + } +}