HDFS-2560. Refactor BPOfferService to be a static inner class. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1203442 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-11-18 00:44:17 +00:00
parent c36aff505a
commit 3b55f9a3fe
4 changed files with 209 additions and 157 deletions

View File

@ -7,6 +7,7 @@ Release 0.23.1 - UNRELEASED
NEW FEATURES NEW FEATURES
IMPROVEMENTS IMPROVEMENTS
HDFS-2560. Refactor BPOfferService to be a static inner class (todd)
OPTIMIZATIONS OPTIMIZATIONS

View File

@ -56,6 +56,10 @@ public class BlockPoolTokenSecretManager extends
return secretMgr; return secretMgr;
} }
public synchronized boolean isBlockPoolRegistered(String bpid) {
return map.containsKey(bpid);
}
/** Return an empty BlockTokenIdentifer */ /** Return an empty BlockTokenIdentifer */
@Override @Override
public BlockTokenIdentifier createIdentifier() { public BlockTokenIdentifier createIdentifier() {

View File

@ -135,6 +135,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo; import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
@ -269,7 +270,7 @@ public class DataNode extends Configured
List<InetSocketAddress> isas = DFSUtil.getNNServiceRpcAddresses(conf); List<InetSocketAddress> isas = DFSUtil.getNNServiceRpcAddresses(conf);
for(InetSocketAddress isa : isas) { for(InetSocketAddress isa : isas) {
BPOfferService bpos = new BPOfferService(isa); BPOfferService bpos = new BPOfferService(isa, DataNode.this);
nameNodeThreads.put(bpos.getNNSocketAddress(), bpos); nameNodeThreads.put(bpos.getNNSocketAddress(), bpos);
} }
} }
@ -367,19 +368,19 @@ public class DataNode extends Configured
} }
for (InetSocketAddress nnaddr : toStart) { for (InetSocketAddress nnaddr : toStart) {
BPOfferService bpos = new BPOfferService(nnaddr); BPOfferService bpos = new BPOfferService(nnaddr, DataNode.this);
nameNodeThreads.put(bpos.getNNSocketAddress(), bpos); nameNodeThreads.put(bpos.getNNSocketAddress(), bpos);
} }
for (BPOfferService bpos : toShutdown) {
remove(bpos);
}
} }
for (BPOfferService bpos : toShutdown) { for (BPOfferService bpos : toShutdown) {
bpos.stop(); bpos.stop();
bpos.join(); bpos.join();
} }
// stoping the BPOSes causes them to call remove() on their own when they
// clean up.
// Now start the threads that are not already running. // Now start the threads that are not already running.
startAll(); startAll();
} }
@ -396,7 +397,6 @@ public class DataNode extends Configured
Daemon dataXceiverServer = null; Daemon dataXceiverServer = null;
ThreadGroup threadGroup = null; ThreadGroup threadGroup = null;
long blockReportInterval; long blockReportInterval;
boolean resetBlockReportTime = true;
long initialBlockReportDelay = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT * 1000L; long initialBlockReportDelay = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT * 1000L;
long heartBeatInterval; long heartBeatInterval;
private boolean heartbeatsDisabledForTests = false; private boolean heartbeatsDisabledForTests = false;
@ -631,6 +631,7 @@ public class DataNode extends Configured
return; return;
} }
String reason = null; String reason = null;
assert data != null;
if (conf.getInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, if (conf.getInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT) < 0) { DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT) < 0) {
reason = "verification is turned off by configuration"; reason = "verification is turned off by configuration";
@ -741,11 +742,14 @@ public class DataNode extends Configured
* </ul> * </ul>
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class BPOfferService implements Runnable { static class BPOfferService implements Runnable {
final InetSocketAddress nnAddr; final InetSocketAddress nnAddr;
DatanodeRegistration bpRegistration; DatanodeRegistration bpRegistration;
NamespaceInfo bpNSInfo; NamespaceInfo bpNSInfo;
long lastBlockReport = 0; long lastBlockReport = 0;
boolean resetBlockReportTime = true;
private Thread bpThread; private Thread bpThread;
private DatanodeProtocol bpNamenode; private DatanodeProtocol bpNamenode;
private String blockPoolId; private String blockPoolId;
@ -754,14 +758,13 @@ public class DataNode extends Configured
private final LinkedList<Block> receivedBlockList = new LinkedList<Block>(); private final LinkedList<Block> receivedBlockList = new LinkedList<Block>();
private final LinkedList<String> delHints = new LinkedList<String>(); private final LinkedList<String> delHints = new LinkedList<String>();
private volatile boolean shouldServiceRun = true; private volatile boolean shouldServiceRun = true;
private boolean isBlockTokenInitialized = false;
UpgradeManagerDatanode upgradeManager = null; UpgradeManagerDatanode upgradeManager = null;
private final DataNode dn;
BPOfferService(InetSocketAddress isa) { BPOfferService(InetSocketAddress nnAddr, DataNode dn) {
this.bpRegistration = new DatanodeRegistration(getMachineName()); this.dn = dn;
bpRegistration.setInfoPort(infoServer.getPort()); this.bpRegistration = dn.createRegistration();
bpRegistration.setIpcPort(getIpcPort()); this.nnAddr = nnAddr;
this.nnAddr = isa;
} }
/** /**
@ -788,7 +791,6 @@ public class DataNode extends Configured
void setNamespaceInfo(NamespaceInfo nsinfo) { void setNamespaceInfo(NamespaceInfo nsinfo) {
bpNSInfo = nsinfo; bpNSInfo = nsinfo;
this.blockPoolId = nsinfo.getBlockPoolID(); this.blockPoolId = nsinfo.getBlockPoolID();
blockPoolManager.addBlockPool(this);
} }
void setNameNode(DatanodeProtocol dnProtocol) { void setNameNode(DatanodeProtocol dnProtocol) {
@ -797,7 +799,7 @@ public class DataNode extends Configured
private NamespaceInfo handshake() throws IOException { private NamespaceInfo handshake() throws IOException {
NamespaceInfo nsInfo = new NamespaceInfo(); NamespaceInfo nsInfo = new NamespaceInfo();
while (shouldRun && shouldServiceRun) { while (dn.shouldRun && shouldServiceRun) {
try { try {
nsInfo = bpNamenode.versionRequest(); nsInfo = bpNamenode.versionRequest();
// verify build version // verify build version
@ -833,7 +835,7 @@ public class DataNode extends Configured
return nsInfo; return nsInfo;
} }
void setupBP(Configuration conf, AbstractList<File> dataDirs) void setupBP(Configuration conf)
throws IOException { throws IOException {
// get NN proxy // get NN proxy
DatanodeProtocol dnp = DatanodeProtocol dnp =
@ -844,50 +846,17 @@ public class DataNode extends Configured
// handshake with NN // handshake with NN
NamespaceInfo nsInfo = handshake(); NamespaceInfo nsInfo = handshake();
setNamespaceInfo(nsInfo); setNamespaceInfo(nsInfo);
synchronized(DataNode.this) { dn.initBlockPool(this, nsInfo);
// we do not allow namenode from different cluster to register
if(clusterId != null && !clusterId.equals(nsInfo.clusterID)) {
throw new IOException(
"cannot register with the namenode because clusterid do not match:"
+ " nn=" + nsInfo.getBlockPoolID() + "; nn cid=" + nsInfo.clusterID +
";dn cid=" + clusterId);
}
setupBPStorage(); bpRegistration.setStorageID(dn.getStorageId());
StorageInfo storageInfo = dn.storage.getBPStorage(blockPoolId);
setClusterId(nsInfo.clusterID); if (storageInfo == null) {
} // it's null in the case of SimulatedDataSet
initPeriodicScanners(conf);
}
void setupBPStorage() throws IOException {
StartupOption startOpt = getStartupOption(conf);
assert startOpt != null : "Startup option must be set.";
boolean simulatedFSDataset = conf.getBoolean(
DFS_DATANODE_SIMULATEDDATASTORAGE_KEY,
DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT);
if (simulatedFSDataset) {
initFsDataSet(conf, dataDirs);
bpRegistration.setStorageID(getStorageId()); //same as DN
bpRegistration.storageInfo.layoutVersion = HdfsConstants.LAYOUT_VERSION; bpRegistration.storageInfo.layoutVersion = HdfsConstants.LAYOUT_VERSION;
bpRegistration.storageInfo.namespaceID = bpNSInfo.namespaceID; bpRegistration.setStorageInfo(nsInfo);
bpRegistration.storageInfo.clusterID = bpNSInfo.clusterID;
} else { } else {
// read storage info, lock data dirs and transition fs state if necessary bpRegistration.setStorageInfo(storageInfo);
storage.recoverTransitionRead(DataNode.this, blockPoolId, bpNSInfo,
dataDirs, startOpt);
LOG.info("setting up storage: nsid=" + storage.namespaceID + ";bpid="
+ blockPoolId + ";lv=" + storage.layoutVersion + ";nsInfo="
+ bpNSInfo);
bpRegistration.setStorageID(getStorageId());
bpRegistration.setStorageInfo(storage.getBPStorage(blockPoolId));
initFsDataSet(conf, dataDirs);
} }
data.addBlockPool(blockPoolId, conf);
} }
/** /**
@ -897,9 +866,9 @@ public class DataNode extends Configured
void scheduleBlockReport(long delay) { void scheduleBlockReport(long delay) {
if (delay > 0) { // send BR after random delay if (delay > 0) { // send BR after random delay
lastBlockReport = System.currentTimeMillis() lastBlockReport = System.currentTimeMillis()
- ( blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay))); - ( dn.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
} else { // send at next heartbeat } else { // send at next heartbeat
lastBlockReport = lastHeartbeat - blockReportInterval; lastBlockReport = lastHeartbeat - dn.blockReportInterval;
} }
resetBlockReportTime = true; // reset future BRs for randomness resetBlockReportTime = true; // reset future BRs for randomness
} }
@ -996,11 +965,11 @@ public class DataNode extends Configured
// send block report if timer has expired. // send block report if timer has expired.
DatanodeCommand cmd = null; DatanodeCommand cmd = null;
long startTime = now(); long startTime = now();
if (startTime - lastBlockReport > blockReportInterval) { if (startTime - lastBlockReport > dn.blockReportInterval) {
// Create block report // Create block report
long brCreateStartTime = now(); long brCreateStartTime = now();
BlockListAsLongs bReport = data.getBlockReport(blockPoolId); BlockListAsLongs bReport = dn.data.getBlockReport(blockPoolId);
// Send block report // Send block report
long brSendStartTime = now(); long brSendStartTime = now();
@ -1010,7 +979,7 @@ public class DataNode extends Configured
// Log the block report processing stats from Datanode perspective // Log the block report processing stats from Datanode perspective
long brSendCost = now() - brSendStartTime; long brSendCost = now() - brSendStartTime;
long brCreateCost = brSendStartTime - brCreateStartTime; long brCreateCost = brSendStartTime - brCreateStartTime;
metrics.addBlockReport(brSendCost); dn.metrics.addBlockReport(brSendCost);
LOG.info("BlockReport of " + bReport.getNumberOfBlocks() LOG.info("BlockReport of " + bReport.getNumberOfBlocks()
+ " blocks took " + brCreateCost + " msec to generate and " + " blocks took " + brCreateCost + " msec to generate and "
+ brSendCost + " msecs for RPC and NN processing"); + brSendCost + " msecs for RPC and NN processing");
@ -1018,7 +987,7 @@ public class DataNode extends Configured
// If we have sent the first block report, then wait a random // If we have sent the first block report, then wait a random
// time before we start the periodic block reports. // time before we start the periodic block reports.
if (resetBlockReportTime) { if (resetBlockReportTime) {
lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(blockReportInterval)); lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dn.blockReportInterval));
resetBlockReportTime = false; resetBlockReportTime = false;
} else { } else {
/* say the last block report was at 8:20:14. The current report /* say the last block report was at 8:20:14. The current report
@ -1028,7 +997,7 @@ public class DataNode extends Configured
* 2) unexpected like 11:35:43, next report should be at 12:20:14 * 2) unexpected like 11:35:43, next report should be at 12:20:14
*/ */
lastBlockReport += (now() - lastBlockReport) / lastBlockReport += (now() - lastBlockReport) /
blockReportInterval * blockReportInterval; dn.blockReportInterval * dn.blockReportInterval;
} }
LOG.info("sent block report, processed command:" + cmd); LOG.info("sent block report, processed command:" + cmd);
} }
@ -1038,12 +1007,12 @@ public class DataNode extends Configured
DatanodeCommand [] sendHeartBeat() throws IOException { DatanodeCommand [] sendHeartBeat() throws IOException {
return bpNamenode.sendHeartbeat(bpRegistration, return bpNamenode.sendHeartbeat(bpRegistration,
data.getCapacity(), dn.data.getCapacity(),
data.getDfsUsed(), dn.data.getDfsUsed(),
data.getRemaining(), dn.data.getRemaining(),
data.getBlockPoolUsed(blockPoolId), dn.data.getBlockPoolUsed(blockPoolId),
xmitsInProgress.get(), dn.xmitsInProgress.get(),
getXceiverCount(), data.getNumFailedVolumes()); dn.getXceiverCount(), dn.data.getNumFailedVolumes());
} }
//This must be called only by blockPoolManager //This must be called only by blockPoolManager
@ -1079,21 +1048,9 @@ public class DataNode extends Configured
if(upgradeManager != null) if(upgradeManager != null)
upgradeManager.shutdownUpgrade(); upgradeManager.shutdownUpgrade();
blockPoolManager.remove(this);
shouldServiceRun = false; shouldServiceRun = false;
RPC.stopProxy(bpNamenode); RPC.stopProxy(bpNamenode);
if (blockScanner != null) { dn.shutdownBlockPool(this);
blockScanner.removeBlockPool(this.getBlockPoolId());
}
if (data != null) {
data.shutdownBlockPool(this.getBlockPoolId());
}
if (storage != null) {
storage.removeBlockPoolStorage(this.getBlockPoolId());
}
} }
/** /**
@ -1102,21 +1059,21 @@ public class DataNode extends Configured
*/ */
private void offerService() throws Exception { private void offerService() throws Exception {
LOG.info("For namenode " + nnAddr + " using BLOCKREPORT_INTERVAL of " LOG.info("For namenode " + nnAddr + " using BLOCKREPORT_INTERVAL of "
+ blockReportInterval + "msec" + " Initial delay: " + dn.blockReportInterval + "msec" + " Initial delay: "
+ initialBlockReportDelay + "msec" + "; heartBeatInterval=" + dn.initialBlockReportDelay + "msec" + "; heartBeatInterval="
+ heartBeatInterval); + dn.heartBeatInterval);
// //
// Now loop for a long time.... // Now loop for a long time....
// //
while (shouldRun && shouldServiceRun) { while (dn.shouldRun && shouldServiceRun) {
try { try {
long startTime = now(); long startTime = now();
// //
// Every so often, send heartbeat or block-report // Every so often, send heartbeat or block-report
// //
if (startTime - lastHeartbeat > heartBeatInterval) { if (startTime - lastHeartbeat > dn.heartBeatInterval) {
// //
// All heartbeat messages include following info: // All heartbeat messages include following info:
// -- Datanode name // -- Datanode name
@ -1125,9 +1082,9 @@ public class DataNode extends Configured
// -- Bytes remaining // -- Bytes remaining
// //
lastHeartbeat = startTime; lastHeartbeat = startTime;
if (!heartbeatsDisabledForTests) { if (!dn.heartbeatsDisabledForTests) {
DatanodeCommand[] cmds = sendHeartBeat(); DatanodeCommand[] cmds = sendHeartBeat();
metrics.addHeartbeat(now() - startTime); dn.metrics.addHeartbeat(now() - startTime);
long startProcessCommands = now(); long startProcessCommands = now();
if (!processCommand(cmds)) if (!processCommand(cmds))
@ -1146,15 +1103,15 @@ public class DataNode extends Configured
processCommand(cmd); processCommand(cmd);
// Now safe to start scanning the block pool // Now safe to start scanning the block pool
if (blockScanner != null) { if (dn.blockScanner != null) {
blockScanner.addBlockPool(this.blockPoolId); dn.blockScanner.addBlockPool(this.blockPoolId);
} }
// //
// There is no work to do; sleep until hearbeat timer elapses, // There is no work to do; sleep until hearbeat timer elapses,
// or work arrives, and then iterate again. // or work arrives, and then iterate again.
// //
long waitTime = heartBeatInterval - long waitTime = dn.heartBeatInterval -
(System.currentTimeMillis() - lastHeartbeat); (System.currentTimeMillis() - lastHeartbeat);
synchronized(receivedBlockList) { synchronized(receivedBlockList) {
if (waitTime > 0 && receivedBlockList.size() == 0) { if (waitTime > 0 && receivedBlockList.size() == 0) {
@ -1177,7 +1134,7 @@ public class DataNode extends Configured
} }
LOG.warn("RemoteException in offerService", re); LOG.warn("RemoteException in offerService", re);
try { try {
long sleepTime = Math.min(1000, heartBeatInterval); long sleepTime = Math.min(1000, dn.heartBeatInterval);
Thread.sleep(sleepTime); Thread.sleep(sleepTime);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@ -1223,7 +1180,7 @@ public class DataNode extends Configured
(bpNSInfo.getLayoutVersion(), "namenode"); (bpNSInfo.getLayoutVersion(), "namenode");
} }
while(shouldRun && shouldServiceRun) { while(dn.shouldRun && shouldServiceRun) {
try { try {
// Use returned registration from namenode with updated machine name. // Use returned registration from namenode with updated machine name.
bpRegistration = bpNamenode.registerDatanode(bpRegistration); bpRegistration = bpNamenode.registerDatanode(bpRegistration);
@ -1231,8 +1188,6 @@ public class DataNode extends Configured
LOG.info("bpReg after =" + bpRegistration.storageInfo + LOG.info("bpReg after =" + bpRegistration.storageInfo +
";sid=" + bpRegistration.storageID + ";name="+bpRegistration.getName()); ";sid=" + bpRegistration.storageID + ";name="+bpRegistration.getName());
NetUtils.getHostname();
hostName = bpRegistration.getHost();
break; break;
} catch(SocketTimeoutException e) { // namenode is busy } catch(SocketTimeoutException e) { // namenode is busy
LOG.info("Problem connecting to server: " + nnAddr); LOG.info("Problem connecting to server: " + nnAddr);
@ -1242,46 +1197,12 @@ public class DataNode extends Configured
} }
} }
if (storage.getStorageID().equals("")) { dn.bpRegistrationSucceeded(bpRegistration, blockPoolId);
storage.setStorageID(bpRegistration.getStorageID());
storage.writeAll();
LOG.info("New storage id " + bpRegistration.getStorageID()
+ " is assigned to data-node " + bpRegistration.getName());
} else if(!storage.getStorageID().equals(bpRegistration.getStorageID())) {
throw new IOException("Inconsistent storage IDs. Name-node returned "
+ bpRegistration.getStorageID()
+ ". Expecting " + storage.getStorageID());
}
if (!isBlockTokenInitialized) {
/* first time registering with NN */
ExportedBlockKeys keys = bpRegistration.exportedKeys;
isBlockTokenEnabled = keys.isBlockTokenEnabled();
if (isBlockTokenEnabled) {
long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
long blockTokenLifetime = keys.getTokenLifetime();
LOG.info("Block token params received from NN: for block pool " +
blockPoolId + " keyUpdateInterval="
+ blockKeyUpdateInterval / (60 * 1000)
+ " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000)
+ " min(s)");
final BlockTokenSecretManager secretMgr =
new BlockTokenSecretManager(false, 0, blockTokenLifetime);
blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr);
}
isBlockTokenInitialized = true;
}
if (isBlockTokenEnabled) {
blockPoolTokenSecretManager.setKeys(blockPoolId,
bpRegistration.exportedKeys);
bpRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS;
}
LOG.info("in register:" + ";bpDNR="+bpRegistration.storageInfo); LOG.info("in register:" + ";bpDNR="+bpRegistration.storageInfo);
// random short delay - helps scatter the BR from all DNs // random short delay - helps scatter the BR from all DNs
scheduleBlockReport(initialBlockReportDelay); scheduleBlockReport(dn.initialBlockReportDelay);
} }
@ -1295,14 +1216,14 @@ public class DataNode extends Configured
*/ */
@Override @Override
public void run() { public void run() {
LOG.info(bpRegistration + "In BPOfferService.run, data = " + data LOG.info(bpRegistration + "In BPOfferService.run, data = " + dn.data
+ ";bp=" + blockPoolId); + ";bp=" + blockPoolId);
try { try {
// init stuff // init stuff
try { try {
// setup storage // setup storage
setupBP(conf, dataDirs); setupBP(dn.conf);
register(); register();
} catch (IOException ioe) { } catch (IOException ioe) {
// Initial handshake, storage recovery or registration failed // Initial handshake, storage recovery or registration failed
@ -1314,13 +1235,13 @@ public class DataNode extends Configured
initialized = true; // bp is initialized; initialized = true; // bp is initialized;
while (shouldRun && shouldServiceRun) { while (dn.shouldRun && shouldServiceRun) {
try { try {
startDistributedUpgradeIfNeeded(); startDistributedUpgradeIfNeeded();
offerService(); offerService();
} catch (Exception ex) { } catch (Exception ex) {
LOG.error("Exception in BPOfferService", ex); LOG.error("Exception in BPOfferService", ex);
if (shouldRun && shouldServiceRun) { if (dn.shouldRun && shouldServiceRun) {
try { try {
Thread.sleep(5000); Thread.sleep(5000);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
@ -1333,7 +1254,7 @@ public class DataNode extends Configured
LOG.warn("Unexpected exception", ex); LOG.warn("Unexpected exception", ex);
} finally { } finally {
LOG.warn(bpRegistration + " ending block pool service for: " LOG.warn(bpRegistration + " ending block pool service for: "
+ blockPoolId); + blockPoolId + " thread " + Thread.currentThread().getId());
cleanUp(); cleanUp();
} }
} }
@ -1374,8 +1295,8 @@ public class DataNode extends Configured
switch(cmd.getAction()) { switch(cmd.getAction()) {
case DatanodeProtocol.DNA_TRANSFER: case DatanodeProtocol.DNA_TRANSFER:
// Send a copy of a block to another datanode // Send a copy of a block to another datanode
transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets()); dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets());
metrics.incrBlocksReplicated(bcmd.getBlocks().length); dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length);
break; break;
case DatanodeProtocol.DNA_INVALIDATE: case DatanodeProtocol.DNA_INVALIDATE:
// //
@ -1384,16 +1305,16 @@ public class DataNode extends Configured
// //
Block toDelete[] = bcmd.getBlocks(); Block toDelete[] = bcmd.getBlocks();
try { try {
if (blockScanner != null) { if (dn.blockScanner != null) {
blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete); dn.blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete);
} }
// using global fsdataset // using global fsdataset
data.invalidate(bcmd.getBlockPoolId(), toDelete); dn.data.invalidate(bcmd.getBlockPoolId(), toDelete);
} catch(IOException e) { } catch(IOException e) {
checkDiskError(); dn.checkDiskError();
throw e; throw e;
} }
metrics.incrBlocksRemoved(toDelete.length); dn.metrics.incrBlocksRemoved(toDelete.length);
break; break;
case DatanodeProtocol.DNA_SHUTDOWN: case DatanodeProtocol.DNA_SHUTDOWN:
// shut down the data node // shut down the data node
@ -1402,12 +1323,12 @@ public class DataNode extends Configured
case DatanodeProtocol.DNA_REGISTER: case DatanodeProtocol.DNA_REGISTER:
// namenode requested a registration - at start or if NN lost contact // namenode requested a registration - at start or if NN lost contact
LOG.info("DatanodeCommand action: DNA_REGISTER"); LOG.info("DatanodeCommand action: DNA_REGISTER");
if (shouldRun && shouldServiceRun) { if (dn.shouldRun && shouldServiceRun) {
register(); register();
} }
break; break;
case DatanodeProtocol.DNA_FINALIZE: case DatanodeProtocol.DNA_FINALIZE:
storage.finalizeUpgrade(((DatanodeCommand.Finalize) cmd) dn.storage.finalizeUpgrade(((DatanodeCommand.Finalize) cmd)
.getBlockPoolId()); .getBlockPoolId());
break; break;
case UpgradeCommand.UC_ACTION_START_UPGRADE: case UpgradeCommand.UC_ACTION_START_UPGRADE:
@ -1415,12 +1336,12 @@ public class DataNode extends Configured
processDistributedUpgradeCommand((UpgradeCommand)cmd); processDistributedUpgradeCommand((UpgradeCommand)cmd);
break; break;
case DatanodeProtocol.DNA_RECOVERBLOCK: case DatanodeProtocol.DNA_RECOVERBLOCK:
recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks()); dn.recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
break; break;
case DatanodeProtocol.DNA_ACCESSKEYUPDATE: case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE"); LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
if (isBlockTokenEnabled) { if (dn.isBlockTokenEnabled) {
blockPoolTokenSecretManager.setKeys(blockPoolId, dn.blockPoolTokenSecretManager.setKeys(blockPoolId,
((KeyUpdateCommand) cmd).getExportedKeys()); ((KeyUpdateCommand) cmd).getExportedKeys());
} }
break; break;
@ -1430,7 +1351,7 @@ public class DataNode extends Configured
((BalancerBandwidthCommand) cmd).getBalancerBandwidthValue(); ((BalancerBandwidthCommand) cmd).getBalancerBandwidthValue();
if (bandwidth > 0) { if (bandwidth > 0) {
DataXceiverServer dxcs = DataXceiverServer dxcs =
(DataXceiverServer) dataXceiverServer.getRunnable(); (DataXceiverServer) dn.dataXceiverServer.getRunnable();
dxcs.balanceThrottler.setBandwidth(bandwidth); dxcs.balanceThrottler.setBandwidth(bandwidth);
} }
break; break;
@ -1449,7 +1370,7 @@ public class DataNode extends Configured
synchronized UpgradeManagerDatanode getUpgradeManager() { synchronized UpgradeManagerDatanode getUpgradeManager() {
if(upgradeManager == null) if(upgradeManager == null)
upgradeManager = upgradeManager =
new UpgradeManagerDatanode(DataNode.this, blockPoolId); new UpgradeManagerDatanode(dn, blockPoolId);
return upgradeManager; return upgradeManager;
} }
@ -1509,6 +1430,133 @@ public class DataNode extends Configured
blockPoolManager = new BlockPoolManager(conf); blockPoolManager = new BlockPoolManager(conf);
} }
/**
* Check that the registration returned from a NameNode is consistent
* with the information in the storage. If the storage is fresh/unformatted,
* sets the storage ID based on this registration.
* Also updates the block pool's state in the secret manager.
*/
private synchronized void bpRegistrationSucceeded(DatanodeRegistration bpRegistration,
String blockPoolId)
throws IOException {
hostName = bpRegistration.getHost();
if (storage.getStorageID().equals("")) {
// This is a fresh datanode -- take the storage ID provided by the
// NN and persist it.
storage.setStorageID(bpRegistration.getStorageID());
storage.writeAll();
LOG.info("New storage id " + bpRegistration.getStorageID()
+ " is assigned to data-node " + bpRegistration.getName());
} else if(!storage.getStorageID().equals(bpRegistration.getStorageID())) {
throw new IOException("Inconsistent storage IDs. Name-node returned "
+ bpRegistration.getStorageID()
+ ". Expecting " + storage.getStorageID());
}
registerBlockPoolWithSecretManager(bpRegistration, blockPoolId);
}
/**
* After the block pool has contacted the NN, registers that block pool
* with the secret manager, updating it with the secrets provided by the NN.
* @param bpRegistration
* @param blockPoolId
* @throws IOException
*/
private void registerBlockPoolWithSecretManager(DatanodeRegistration bpRegistration,
String blockPoolId) throws IOException {
ExportedBlockKeys keys = bpRegistration.exportedKeys;
isBlockTokenEnabled = keys.isBlockTokenEnabled();
// TODO should we check that all federated nns are either enabled or
// disabled?
if (!isBlockTokenEnabled) return;
if (!blockPoolTokenSecretManager.isBlockPoolRegistered(blockPoolId)) {
long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
long blockTokenLifetime = keys.getTokenLifetime();
LOG.info("Block token params received from NN: for block pool " +
blockPoolId + " keyUpdateInterval="
+ blockKeyUpdateInterval / (60 * 1000)
+ " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000)
+ " min(s)");
final BlockTokenSecretManager secretMgr =
new BlockTokenSecretManager(false, 0, blockTokenLifetime);
blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr);
}
blockPoolTokenSecretManager.setKeys(blockPoolId,
bpRegistration.exportedKeys);
bpRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS;
}
/**
* Remove the given block pool from the block scanner, dataset, and storage.
*/
private void shutdownBlockPool(BPOfferService bpos) {
blockPoolManager.remove(bpos);
String bpId = bpos.getBlockPoolId();
if (blockScanner != null) {
blockScanner.removeBlockPool(bpId);
}
if (data != null) {
data.shutdownBlockPool(bpId);
}
if (storage != null) {
storage.removeBlockPoolStorage(bpId);
}
}
void initBlockPool(BPOfferService bpOfferService,
NamespaceInfo nsInfo) throws IOException {
String blockPoolId = nsInfo.getBlockPoolID();
blockPoolManager.addBlockPool(bpOfferService);
synchronized (this) {
// we do not allow namenode from different cluster to register
if(clusterId != null && !clusterId.equals(nsInfo.clusterID)) {
throw new IOException(
"cannot register with the namenode because clusterid do not match:"
+ " nn=" + nsInfo.getBlockPoolID() + "; nn cid=" + nsInfo.clusterID +
";dn cid=" + clusterId);
}
setClusterId(nsInfo.clusterID);
}
StartupOption startOpt = getStartupOption(conf);
assert startOpt != null : "Startup option must be set.";
boolean simulatedFSDataset = conf.getBoolean(
DFS_DATANODE_SIMULATEDDATASTORAGE_KEY,
DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT);
if (!simulatedFSDataset) {
// read storage info, lock data dirs and transition fs state if necessary
storage.recoverTransitionRead(DataNode.this, blockPoolId, nsInfo,
dataDirs, startOpt);
StorageInfo bpStorage = storage.getBPStorage(blockPoolId);
LOG.info("setting up storage: nsid=" +
bpStorage.getNamespaceID() + ";bpid="
+ blockPoolId + ";lv=" + storage.getLayoutVersion() +
";nsInfo=" + nsInfo);
}
initFsDataSet();
initPeriodicScanners(conf);
data.addBlockPool(nsInfo.getBlockPoolID(), conf);
}
private DatanodeRegistration createRegistration() {
DatanodeRegistration reg = new DatanodeRegistration(getMachineName());
reg.setInfoPort(infoServer.getPort());
reg.setIpcPort(getIpcPort());
return reg;
}
BPOfferService[] getAllBpOs() { BPOfferService[] getAllBpOs() {
return blockPoolManager.getAllNamenodeThreads(); return blockPoolManager.getAllNamenodeThreads();
} }
@ -1521,8 +1569,7 @@ public class DataNode extends Configured
* Initializes the {@link #data}. The initialization is done only once, when * Initializes the {@link #data}. The initialization is done only once, when
* handshake with the the first namenode is completed. * handshake with the the first namenode is completed.
*/ */
private synchronized void initFsDataSet(Configuration conf, private synchronized void initFsDataSet() throws IOException {
AbstractList<File> dataDirs) throws IOException {
if (data != null) { // Already initialized if (data != null) { // Already initialized
return; return;
} }

View File

@ -61,7 +61,7 @@ public class DataNodeTestUtils {
bpos.setNamespaceInfo(nsifno); bpos.setNamespaceInfo(nsifno);
dn.setBPNamenode(bpid, nn); dn.setBPNamenode(bpid, nn);
bpos.setupBPStorage(); dn.initBlockPool(bpos, nsifno);
} }
} }
} }