HDFS-7944. Minor cleanup of BlockPoolManager#getAllNamenodeThreads. (Arpit Agarwal)

This commit is contained in:
Arpit Agarwal 2015-03-30 21:24:51 -07:00
parent cce66ba3c9
commit 85dc3c14b2
12 changed files with 52 additions and 55 deletions

View File

@ -365,6 +365,9 @@ Release 2.8.0 - UNRELEASED
HDFS-7645. Rolling upgrade is restoring blocks from trash multiple times.
(Vinayakumar B and Keisuke Ogiwara via Arpit Agarwal)
HDFS-7944. Minor cleanup of BlockPoolManager#getAllNamenodeThreads.
(Arpit Agarwal)
OPTIMIZATIONS
BUG FIXES

View File

@ -20,11 +20,8 @@ package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
@ -53,7 +50,7 @@ class BlockPoolManager {
private final Map<String, BPOfferService> bpByBlockPoolId =
Maps.newHashMap();
private final List<BPOfferService> offerServices =
Lists.newArrayList();
new CopyOnWriteArrayList<>();
private final DataNode dn;
@ -74,12 +71,14 @@ class BlockPoolManager {
}
/**
* Returns the array of BPOfferService objects.
* Returns a list of BPOfferService objects. The underlying list
* implementation is a CopyOnWriteArrayList so it can be safely
* iterated while BPOfferServices are being added or removed.
*
* Caution: The BPOfferService returned could be shutdown any time.
*/
synchronized BPOfferService[] getAllNamenodeThreads() {
BPOfferService[] bposArray = new BPOfferService[offerServices.size()];
return offerServices.toArray(bposArray);
synchronized List<BPOfferService> getAllNamenodeThreads() {
return Collections.unmodifiableList(offerServices);
}
synchronized BPOfferService get(String bpid) {
@ -110,15 +109,13 @@ class BlockPoolManager {
}
}
void shutDownAll(BPOfferService[] bposArray) throws InterruptedException {
if (bposArray != null) {
for (BPOfferService bpos : bposArray) {
bpos.stop(); //interrupts the threads
}
//now join
for (BPOfferService bpos : bposArray) {
bpos.join();
}
void shutDownAll(List<BPOfferService> bposList) throws InterruptedException {
for (BPOfferService bpos : bposList) {
bpos.stop(); //interrupts the threads
}
//now join
for (BPOfferService bpos : bposList) {
bpos.join();
}
}

View File

@ -1352,12 +1352,12 @@ public class DataNode extends ReconfigurableBase
blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
}
BPOfferService[] getAllBpOs() {
List<BPOfferService> getAllBpOs() {
return blockPoolManager.getAllNamenodeThreads();
}
int getBpOsCount() {
return blockPoolManager.getAllNamenodeThreads().length;
return blockPoolManager.getAllNamenodeThreads().size();
}
/**
@ -1654,11 +1654,8 @@ public class DataNode extends ReconfigurableBase
}
}
// We need to make a copy of the original blockPoolManager#offerServices to
// make sure blockPoolManager#shutDownAll() can still access all the
// BPOfferServices, since after setting DataNode#shouldRun to false the
// offerServices may be modified.
BPOfferService[] bposArray = this.blockPoolManager == null ? null
List<BPOfferService> bposArray = (this.blockPoolManager == null)
? new ArrayList<BPOfferService>()
: this.blockPoolManager.getAllNamenodeThreads();
// If shutdown is not for restart, set shouldRun to false early.
if (!shutdownForUpgrade) {
@ -2338,8 +2335,7 @@ public class DataNode extends ReconfigurableBase
while (shouldRun) {
try {
blockPoolManager.joinAll();
if (blockPoolManager.getAllNamenodeThreads() != null
&& blockPoolManager.getAllNamenodeThreads().length == 0) {
if (blockPoolManager.getAllNamenodeThreads().size() == 0) {
shouldRun = false;
}
// Terminate if shutdown is complete or 2 seconds after all BPs

View File

@ -177,7 +177,7 @@ public class TestBlockRecovery {
}
};
// Trigger a heartbeat so that it acknowledges the NN as active.
dn.getAllBpOs()[0].triggerHeartbeatForTests();
dn.getAllBpOs().get(0).triggerHeartbeatForTests();
}
/**

View File

@ -162,8 +162,8 @@ public class TestBlockScanner {
boolean testedRewind = false, testedSave = false, testedLoad = false;
int blocksProcessed = 0, savedBlocksProcessed = 0;
try {
BPOfferService bpos[] = ctx.datanode.getAllBpOs();
assertEquals(1, bpos.length);
List<BPOfferService> bpos = ctx.datanode.getAllBpOs();
assertEquals(1, bpos.size());
BlockIterator iter = volume.newBlockIterator(ctx.bpids[0], "test");
assertEquals(ctx.bpids[0], iter.getBlockPoolId());
iter.setMaxStalenessMs(maxStaleness);

View File

@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -64,11 +65,11 @@ public class TestDataNodeExit {
private void stopBPServiceThreads(int numStopThreads, DataNode dn)
throws Exception {
BPOfferService[] bpoList = dn.getAllBpOs();
List<BPOfferService> bpoList = dn.getAllBpOs();
int expected = dn.getBpOsCount() - numStopThreads;
int index = numStopThreads - 1;
while (index >= 0) {
bpoList[index--].stop();
bpoList.get(index--).stop();
}
int iterations = 3000; // Total 30 seconds MAX wait time
while(dn.getBpOsCount() != expected && iterations > 0) {

View File

@ -103,8 +103,8 @@ public class TestDataNodeMultipleRegistrations {
LOG.info("BP: " + bpos);
}
BPOfferService bpos1 = dn.getAllBpOs()[0];
BPOfferService bpos2 = dn.getAllBpOs()[1];
BPOfferService bpos1 = dn.getAllBpOs().get(0);
BPOfferService bpos2 = dn.getAllBpOs().get(1);
// The order of bpos is not guaranteed, so fix the order
if (getNNSocketAddress(bpos1).equals(nn2.getNameNodeAddress())) {
@ -173,7 +173,7 @@ public class TestDataNodeMultipleRegistrations {
}
// try block report
BPOfferService bpos1 = dn.getAllBpOs()[0];
BPOfferService bpos1 = dn.getAllBpOs().get(0);
bpos1.triggerBlockReportForTests();
assertEquals("wrong nn address",
@ -184,7 +184,7 @@ public class TestDataNodeMultipleRegistrations {
cluster.shutdown();
// Ensure all the BPOfferService threads are shutdown
assertEquals(0, dn.getAllBpOs().length);
assertEquals(0, dn.getAllBpOs().size());
cluster = null;
} finally {
if (cluster != null) {
@ -202,16 +202,16 @@ public class TestDataNodeMultipleRegistrations {
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
BPOfferService [] bposs = dn.getAllBpOs();
LOG.info("dn bpos len (should be 2):" + bposs.length);
Assert.assertEquals("should've registered with two namenodes", bposs.length,2);
List<BPOfferService> bposs = dn.getAllBpOs();
LOG.info("dn bpos len (should be 2):" + bposs.size());
Assert.assertEquals("should've registered with two namenodes", bposs.size(),2);
// add another namenode
cluster.addNameNode(conf, 9938);
Thread.sleep(500);// lets wait for the registration to happen
bposs = dn.getAllBpOs();
LOG.info("dn bpos len (should be 3):" + bposs.length);
Assert.assertEquals("should've registered with three namenodes", bposs.length,3);
LOG.info("dn bpos len (should be 3):" + bposs.size());
Assert.assertEquals("should've registered with three namenodes", bposs.size(),3);
// change cluster id and another Namenode
StartupOption.FORMAT.setClusterId("DifferentCID");
@ -221,8 +221,8 @@ public class TestDataNodeMultipleRegistrations {
Thread.sleep(500);// lets wait for the registration to happen
bposs = dn.getAllBpOs();
LOG.info("dn bpos len (still should be 3):" + bposs.length);
Assert.assertEquals("should've registered with three namenodes", 3, bposs.length);
LOG.info("dn bpos len (still should be 3):" + bposs.size());
Assert.assertEquals("should've registered with three namenodes", 3, bposs.size());
} finally {
cluster.shutdown();
}
@ -250,7 +250,7 @@ public class TestDataNodeMultipleRegistrations {
DataNode dn = cluster.getDataNodes().get(0);
assertTrue("Datanode should be running", dn.isDatanodeUp());
assertEquals("Only one BPOfferService should be running", 1,
dn.getAllBpOs().length);
dn.getAllBpOs().size());
} finally {
cluster.shutdown();
}
@ -274,7 +274,7 @@ public class TestDataNodeMultipleRegistrations {
DataNode dn = cluster.getDataNodes().get(0);
assertTrue("Datanode should be running", dn.isDatanodeUp());
assertEquals("BPOfferService should be running", 1,
dn.getAllBpOs().length);
dn.getAllBpOs().size());
DataNodeProperties dnProp = cluster.stopDataNode(0);
cluster.getNameNode(0).stop();

View File

@ -229,7 +229,7 @@ public class TestDatanodeProtocolRetryPolicy {
};
// Trigger a heartbeat so that it acknowledges the NN as active.
dn.getAllBpOs()[0].triggerHeartbeatForTests();
dn.getAllBpOs().get(0).triggerHeartbeatForTests();
waitForBlockReport(namenode);
}

View File

@ -83,7 +83,7 @@ public class TestDeleteBlockPool {
Configuration nn1Conf = cluster.getConfiguration(1);
nn1Conf.set(DFSConfigKeys.DFS_NAMESERVICES, "namesServerId2");
dn1.refreshNamenodes(nn1Conf);
assertEquals(1, dn1.getAllBpOs().length);
assertEquals(1, dn1.getAllBpOs().size());
try {
dn1.deleteBlockPool(bpid1, false);
@ -123,7 +123,7 @@ public class TestDeleteBlockPool {
}
dn2.refreshNamenodes(nn1Conf);
assertEquals(1, dn2.getAllBpOs().length);
assertEquals(1, dn2.getAllBpOs().size());
verifyBlockPoolDirectories(true, dn2StorageDir1, bpid1);
verifyBlockPoolDirectories(true, dn2StorageDir2, bpid1);
@ -184,7 +184,7 @@ public class TestDeleteBlockPool {
Configuration nn1Conf = cluster.getConfiguration(0);
nn1Conf.set(DFSConfigKeys.DFS_NAMESERVICES, "namesServerId1");
dn1.refreshNamenodes(nn1Conf);
assertEquals(1, dn1.getAllBpOs().length);
assertEquals(1, dn1.getAllBpOs().size());
DFSAdmin admin = new DFSAdmin(nn1Conf);
String dn1Address = dn1.getDatanodeId().getIpAddr() + ":" + dn1.getIpcPort();

View File

@ -69,7 +69,7 @@ public class TestIncrementalBlockReports {
fs = cluster.getFileSystem();
singletonNn = cluster.getNameNode();
singletonDn = cluster.getDataNodes().get(0);
bpos = singletonDn.getAllBpOs()[0];
bpos = singletonDn.getAllBpOs().get(0);
actor = bpos.getBPServiceActors().get(0);
storageUuid = singletonDn.getFSDataset().getVolumes().get(0).getStorageID();
}

View File

@ -59,13 +59,13 @@ public class TestRefreshNamenodes {
.build();
DataNode dn = cluster.getDataNodes().get(0);
assertEquals(1, dn.getAllBpOs().length);
assertEquals(1, dn.getAllBpOs().size());
cluster.addNameNode(conf, nnPort2);
assertEquals(2, dn.getAllBpOs().length);
assertEquals(2, dn.getAllBpOs().size());
cluster.addNameNode(conf, nnPort3);
assertEquals(3, dn.getAllBpOs().length);
assertEquals(3, dn.getAllBpOs().size());
cluster.addNameNode(conf, nnPort4);

View File

@ -91,7 +91,7 @@ public final class TestTriggerBlockReport {
new Block(5678, 512, 1000), BlockStatus.DELETED_BLOCK, null);
DataNode datanode = cluster.getDataNodes().get(0);
BPServiceActor actor =
datanode.getAllBpOs()[0].getBPServiceActors().get(0);
datanode.getAllBpOs().get(0).getBPServiceActors().get(0);
String storageUuid =
datanode.getFSDataset().getVolumes().get(0).getStorageID();
actor.notifyNamenodeDeletedBlock(rdbi, storageUuid);