From 3b55f9a3fe54a53a49c5e1c812fbce12deea60dd Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Fri, 18 Nov 2011 00:44:17 +0000 Subject: [PATCH] HDFS-2560. Refactor BPOfferService to be a static inner class. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1203442 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 1 + .../block/BlockPoolTokenSecretManager.java | 4 + .../hadoop/hdfs/server/datanode/DataNode.java | 359 ++++++++++-------- .../server/datanode/DataNodeTestUtils.java | 2 +- 4 files changed, 209 insertions(+), 157 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index acae1aec37b..fdddb3cc9e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.23.1 - UNRELEASED NEW FEATURES IMPROVEMENTS + HDFS-2560. Refactor BPOfferService to be a static inner class (todd) OPTIMIZATIONS diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java index 2cb8b41ffd0..05fba79fe63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java @@ -55,6 +55,10 @@ public class BlockPoolTokenSecretManager extends } return secretMgr; } + + public synchronized boolean isBlockPoolRegistered(String bpid) { + return map.containsKey(bpid); + } /** Return an empty BlockTokenIdentifer */ @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 8f6b0f92aec..09d555b0c61 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -135,6 +135,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; @@ -269,7 +270,7 @@ public class DataNode extends Configured List isas = DFSUtil.getNNServiceRpcAddresses(conf); for(InetSocketAddress isa : isas) { - BPOfferService bpos = new BPOfferService(isa); + BPOfferService bpos = new BPOfferService(isa, DataNode.this); nameNodeThreads.put(bpos.getNNSocketAddress(), bpos); } } @@ -367,19 +368,19 @@ public class DataNode extends Configured } for (InetSocketAddress nnaddr : toStart) { - BPOfferService bpos = new BPOfferService(nnaddr); + BPOfferService bpos = new BPOfferService(nnaddr, DataNode.this); nameNodeThreads.put(bpos.getNNSocketAddress(), bpos); } - - for (BPOfferService bpos : toShutdown) { - remove(bpos); - } } for (BPOfferService bpos : toShutdown) { bpos.stop(); bpos.join(); } + + // stoping the BPOSes causes them to call remove() on their own when they + // clean up. + // Now start the threads that are not already running. startAll(); } @@ -396,7 +397,6 @@ public class DataNode extends Configured Daemon dataXceiverServer = null; ThreadGroup threadGroup = null; long blockReportInterval; - boolean resetBlockReportTime = true; long initialBlockReportDelay = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT * 1000L; long heartBeatInterval; private boolean heartbeatsDisabledForTests = false; @@ -631,6 +631,7 @@ public class DataNode extends Configured return; } String reason = null; + assert data != null; if (conf.getInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT) < 0) { reason = "verification is turned off by configuration"; @@ -741,11 +742,14 @@ public class DataNode extends Configured * */ @InterfaceAudience.Private - class BPOfferService implements Runnable { + static class BPOfferService implements Runnable { final InetSocketAddress nnAddr; DatanodeRegistration bpRegistration; NamespaceInfo bpNSInfo; long lastBlockReport = 0; + + boolean resetBlockReportTime = true; + private Thread bpThread; private DatanodeProtocol bpNamenode; private String blockPoolId; @@ -754,14 +758,13 @@ public class DataNode extends Configured private final LinkedList receivedBlockList = new LinkedList(); private final LinkedList delHints = new LinkedList(); private volatile boolean shouldServiceRun = true; - private boolean isBlockTokenInitialized = false; UpgradeManagerDatanode upgradeManager = null; + private final DataNode dn; - BPOfferService(InetSocketAddress isa) { - this.bpRegistration = new DatanodeRegistration(getMachineName()); - bpRegistration.setInfoPort(infoServer.getPort()); - bpRegistration.setIpcPort(getIpcPort()); - this.nnAddr = isa; + BPOfferService(InetSocketAddress nnAddr, DataNode dn) { + this.dn = dn; + this.bpRegistration = dn.createRegistration(); + this.nnAddr = nnAddr; } /** @@ -788,7 +791,6 @@ public class DataNode extends Configured void setNamespaceInfo(NamespaceInfo nsinfo) { bpNSInfo = nsinfo; this.blockPoolId = nsinfo.getBlockPoolID(); - blockPoolManager.addBlockPool(this); } void setNameNode(DatanodeProtocol dnProtocol) { @@ -797,7 +799,7 @@ public class DataNode extends Configured private NamespaceInfo handshake() throws IOException { NamespaceInfo nsInfo = new NamespaceInfo(); - while (shouldRun && shouldServiceRun) { + while (dn.shouldRun && shouldServiceRun) { try { nsInfo = bpNamenode.versionRequest(); // verify build version @@ -833,7 +835,7 @@ public class DataNode extends Configured return nsInfo; } - void setupBP(Configuration conf, AbstractList dataDirs) + void setupBP(Configuration conf) throws IOException { // get NN proxy DatanodeProtocol dnp = @@ -844,52 +846,19 @@ public class DataNode extends Configured // handshake with NN NamespaceInfo nsInfo = handshake(); setNamespaceInfo(nsInfo); - synchronized(DataNode.this) { - // we do not allow namenode from different cluster to register - if(clusterId != null && !clusterId.equals(nsInfo.clusterID)) { - throw new IOException( - "cannot register with the namenode because clusterid do not match:" - + " nn=" + nsInfo.getBlockPoolID() + "; nn cid=" + nsInfo.clusterID + - ";dn cid=" + clusterId); - } - - setupBPStorage(); - - setClusterId(nsInfo.clusterID); - } - - initPeriodicScanners(conf); - } - - void setupBPStorage() throws IOException { - StartupOption startOpt = getStartupOption(conf); - assert startOpt != null : "Startup option must be set."; - - boolean simulatedFSDataset = conf.getBoolean( - DFS_DATANODE_SIMULATEDDATASTORAGE_KEY, - DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT); + dn.initBlockPool(this, nsInfo); - if (simulatedFSDataset) { - initFsDataSet(conf, dataDirs); - bpRegistration.setStorageID(getStorageId()); //same as DN + bpRegistration.setStorageID(dn.getStorageId()); + StorageInfo storageInfo = dn.storage.getBPStorage(blockPoolId); + if (storageInfo == null) { + // it's null in the case of SimulatedDataSet bpRegistration.storageInfo.layoutVersion = HdfsConstants.LAYOUT_VERSION; - bpRegistration.storageInfo.namespaceID = bpNSInfo.namespaceID; - bpRegistration.storageInfo.clusterID = bpNSInfo.clusterID; + bpRegistration.setStorageInfo(nsInfo); } else { - // read storage info, lock data dirs and transition fs state if necessary - storage.recoverTransitionRead(DataNode.this, blockPoolId, bpNSInfo, - dataDirs, startOpt); - LOG.info("setting up storage: nsid=" + storage.namespaceID + ";bpid=" - + blockPoolId + ";lv=" + storage.layoutVersion + ";nsInfo=" - + bpNSInfo); - - bpRegistration.setStorageID(getStorageId()); - bpRegistration.setStorageInfo(storage.getBPStorage(blockPoolId)); - initFsDataSet(conf, dataDirs); + bpRegistration.setStorageInfo(storageInfo); } - data.addBlockPool(blockPoolId, conf); } - + /** * This methods arranges for the data node to send the block report at * the next heartbeat. @@ -897,9 +866,9 @@ public class DataNode extends Configured void scheduleBlockReport(long delay) { if (delay > 0) { // send BR after random delay lastBlockReport = System.currentTimeMillis() - - ( blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay))); + - ( dn.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay))); } else { // send at next heartbeat - lastBlockReport = lastHeartbeat - blockReportInterval; + lastBlockReport = lastHeartbeat - dn.blockReportInterval; } resetBlockReportTime = true; // reset future BRs for randomness } @@ -996,11 +965,11 @@ public class DataNode extends Configured // send block report if timer has expired. DatanodeCommand cmd = null; long startTime = now(); - if (startTime - lastBlockReport > blockReportInterval) { + if (startTime - lastBlockReport > dn.blockReportInterval) { // Create block report long brCreateStartTime = now(); - BlockListAsLongs bReport = data.getBlockReport(blockPoolId); + BlockListAsLongs bReport = dn.data.getBlockReport(blockPoolId); // Send block report long brSendStartTime = now(); @@ -1010,7 +979,7 @@ public class DataNode extends Configured // Log the block report processing stats from Datanode perspective long brSendCost = now() - brSendStartTime; long brCreateCost = brSendStartTime - brCreateStartTime; - metrics.addBlockReport(brSendCost); + dn.metrics.addBlockReport(brSendCost); LOG.info("BlockReport of " + bReport.getNumberOfBlocks() + " blocks took " + brCreateCost + " msec to generate and " + brSendCost + " msecs for RPC and NN processing"); @@ -1018,7 +987,7 @@ public class DataNode extends Configured // If we have sent the first block report, then wait a random // time before we start the periodic block reports. if (resetBlockReportTime) { - lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(blockReportInterval)); + lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dn.blockReportInterval)); resetBlockReportTime = false; } else { /* say the last block report was at 8:20:14. The current report @@ -1028,7 +997,7 @@ public class DataNode extends Configured * 2) unexpected like 11:35:43, next report should be at 12:20:14 */ lastBlockReport += (now() - lastBlockReport) / - blockReportInterval * blockReportInterval; + dn.blockReportInterval * dn.blockReportInterval; } LOG.info("sent block report, processed command:" + cmd); } @@ -1038,12 +1007,12 @@ public class DataNode extends Configured DatanodeCommand [] sendHeartBeat() throws IOException { return bpNamenode.sendHeartbeat(bpRegistration, - data.getCapacity(), - data.getDfsUsed(), - data.getRemaining(), - data.getBlockPoolUsed(blockPoolId), - xmitsInProgress.get(), - getXceiverCount(), data.getNumFailedVolumes()); + dn.data.getCapacity(), + dn.data.getDfsUsed(), + dn.data.getRemaining(), + dn.data.getBlockPoolUsed(blockPoolId), + dn.xmitsInProgress.get(), + dn.getXceiverCount(), dn.data.getNumFailedVolumes()); } //This must be called only by blockPoolManager @@ -1079,21 +1048,9 @@ public class DataNode extends Configured if(upgradeManager != null) upgradeManager.shutdownUpgrade(); - - blockPoolManager.remove(this); shouldServiceRun = false; RPC.stopProxy(bpNamenode); - if (blockScanner != null) { - blockScanner.removeBlockPool(this.getBlockPoolId()); - } - - if (data != null) { - data.shutdownBlockPool(this.getBlockPoolId()); - } - - if (storage != null) { - storage.removeBlockPoolStorage(this.getBlockPoolId()); - } + dn.shutdownBlockPool(this); } /** @@ -1102,21 +1059,21 @@ public class DataNode extends Configured */ private void offerService() throws Exception { LOG.info("For namenode " + nnAddr + " using BLOCKREPORT_INTERVAL of " - + blockReportInterval + "msec" + " Initial delay: " - + initialBlockReportDelay + "msec" + "; heartBeatInterval=" - + heartBeatInterval); + + dn.blockReportInterval + "msec" + " Initial delay: " + + dn.initialBlockReportDelay + "msec" + "; heartBeatInterval=" + + dn.heartBeatInterval); // // Now loop for a long time.... // - while (shouldRun && shouldServiceRun) { + while (dn.shouldRun && shouldServiceRun) { try { long startTime = now(); // // Every so often, send heartbeat or block-report // - if (startTime - lastHeartbeat > heartBeatInterval) { + if (startTime - lastHeartbeat > dn.heartBeatInterval) { // // All heartbeat messages include following info: // -- Datanode name @@ -1125,9 +1082,9 @@ public class DataNode extends Configured // -- Bytes remaining // lastHeartbeat = startTime; - if (!heartbeatsDisabledForTests) { + if (!dn.heartbeatsDisabledForTests) { DatanodeCommand[] cmds = sendHeartBeat(); - metrics.addHeartbeat(now() - startTime); + dn.metrics.addHeartbeat(now() - startTime); long startProcessCommands = now(); if (!processCommand(cmds)) @@ -1146,15 +1103,15 @@ public class DataNode extends Configured processCommand(cmd); // Now safe to start scanning the block pool - if (blockScanner != null) { - blockScanner.addBlockPool(this.blockPoolId); + if (dn.blockScanner != null) { + dn.blockScanner.addBlockPool(this.blockPoolId); } // // There is no work to do; sleep until hearbeat timer elapses, // or work arrives, and then iterate again. // - long waitTime = heartBeatInterval - + long waitTime = dn.heartBeatInterval - (System.currentTimeMillis() - lastHeartbeat); synchronized(receivedBlockList) { if (waitTime > 0 && receivedBlockList.size() == 0) { @@ -1177,7 +1134,7 @@ public class DataNode extends Configured } LOG.warn("RemoteException in offerService", re); try { - long sleepTime = Math.min(1000, heartBeatInterval); + long sleepTime = Math.min(1000, dn.heartBeatInterval); Thread.sleep(sleepTime); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); @@ -1223,7 +1180,7 @@ public class DataNode extends Configured (bpNSInfo.getLayoutVersion(), "namenode"); } - while(shouldRun && shouldServiceRun) { + while(dn.shouldRun && shouldServiceRun) { try { // Use returned registration from namenode with updated machine name. bpRegistration = bpNamenode.registerDatanode(bpRegistration); @@ -1231,8 +1188,6 @@ public class DataNode extends Configured LOG.info("bpReg after =" + bpRegistration.storageInfo + ";sid=" + bpRegistration.storageID + ";name="+bpRegistration.getName()); - NetUtils.getHostname(); - hostName = bpRegistration.getHost(); break; } catch(SocketTimeoutException e) { // namenode is busy LOG.info("Problem connecting to server: " + nnAddr); @@ -1241,47 +1196,13 @@ public class DataNode extends Configured } catch (InterruptedException ie) {} } } - - if (storage.getStorageID().equals("")) { - storage.setStorageID(bpRegistration.getStorageID()); - storage.writeAll(); - LOG.info("New storage id " + bpRegistration.getStorageID() - + " is assigned to data-node " + bpRegistration.getName()); - } else if(!storage.getStorageID().equals(bpRegistration.getStorageID())) { - throw new IOException("Inconsistent storage IDs. Name-node returned " - + bpRegistration.getStorageID() - + ". Expecting " + storage.getStorageID()); - } - - if (!isBlockTokenInitialized) { - /* first time registering with NN */ - ExportedBlockKeys keys = bpRegistration.exportedKeys; - isBlockTokenEnabled = keys.isBlockTokenEnabled(); - if (isBlockTokenEnabled) { - long blockKeyUpdateInterval = keys.getKeyUpdateInterval(); - long blockTokenLifetime = keys.getTokenLifetime(); - LOG.info("Block token params received from NN: for block pool " + - blockPoolId + " keyUpdateInterval=" - + blockKeyUpdateInterval / (60 * 1000) - + " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000) - + " min(s)"); - final BlockTokenSecretManager secretMgr = - new BlockTokenSecretManager(false, 0, blockTokenLifetime); - blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr); - } - isBlockTokenInitialized = true; - } - - if (isBlockTokenEnabled) { - blockPoolTokenSecretManager.setKeys(blockPoolId, - bpRegistration.exportedKeys); - bpRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS; - } + + dn.bpRegistrationSucceeded(bpRegistration, blockPoolId); LOG.info("in register:" + ";bpDNR="+bpRegistration.storageInfo); // random short delay - helps scatter the BR from all DNs - scheduleBlockReport(initialBlockReportDelay); + scheduleBlockReport(dn.initialBlockReportDelay); } @@ -1295,14 +1216,14 @@ public class DataNode extends Configured */ @Override public void run() { - LOG.info(bpRegistration + "In BPOfferService.run, data = " + data + LOG.info(bpRegistration + "In BPOfferService.run, data = " + dn.data + ";bp=" + blockPoolId); try { // init stuff try { // setup storage - setupBP(conf, dataDirs); + setupBP(dn.conf); register(); } catch (IOException ioe) { // Initial handshake, storage recovery or registration failed @@ -1314,13 +1235,13 @@ public class DataNode extends Configured initialized = true; // bp is initialized; - while (shouldRun && shouldServiceRun) { + while (dn.shouldRun && shouldServiceRun) { try { startDistributedUpgradeIfNeeded(); offerService(); } catch (Exception ex) { LOG.error("Exception in BPOfferService", ex); - if (shouldRun && shouldServiceRun) { + if (dn.shouldRun && shouldServiceRun) { try { Thread.sleep(5000); } catch (InterruptedException ie) { @@ -1333,7 +1254,7 @@ public class DataNode extends Configured LOG.warn("Unexpected exception", ex); } finally { LOG.warn(bpRegistration + " ending block pool service for: " - + blockPoolId); + + blockPoolId + " thread " + Thread.currentThread().getId()); cleanUp(); } } @@ -1374,8 +1295,8 @@ public class DataNode extends Configured switch(cmd.getAction()) { case DatanodeProtocol.DNA_TRANSFER: // Send a copy of a block to another datanode - transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets()); - metrics.incrBlocksReplicated(bcmd.getBlocks().length); + dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets()); + dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length); break; case DatanodeProtocol.DNA_INVALIDATE: // @@ -1384,16 +1305,16 @@ public class DataNode extends Configured // Block toDelete[] = bcmd.getBlocks(); try { - if (blockScanner != null) { - blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete); + if (dn.blockScanner != null) { + dn.blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete); } // using global fsdataset - data.invalidate(bcmd.getBlockPoolId(), toDelete); + dn.data.invalidate(bcmd.getBlockPoolId(), toDelete); } catch(IOException e) { - checkDiskError(); + dn.checkDiskError(); throw e; } - metrics.incrBlocksRemoved(toDelete.length); + dn.metrics.incrBlocksRemoved(toDelete.length); break; case DatanodeProtocol.DNA_SHUTDOWN: // shut down the data node @@ -1402,12 +1323,12 @@ public class DataNode extends Configured case DatanodeProtocol.DNA_REGISTER: // namenode requested a registration - at start or if NN lost contact LOG.info("DatanodeCommand action: DNA_REGISTER"); - if (shouldRun && shouldServiceRun) { + if (dn.shouldRun && shouldServiceRun) { register(); } break; case DatanodeProtocol.DNA_FINALIZE: - storage.finalizeUpgrade(((DatanodeCommand.Finalize) cmd) + dn.storage.finalizeUpgrade(((DatanodeCommand.Finalize) cmd) .getBlockPoolId()); break; case UpgradeCommand.UC_ACTION_START_UPGRADE: @@ -1415,12 +1336,12 @@ public class DataNode extends Configured processDistributedUpgradeCommand((UpgradeCommand)cmd); break; case DatanodeProtocol.DNA_RECOVERBLOCK: - recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks()); + dn.recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks()); break; case DatanodeProtocol.DNA_ACCESSKEYUPDATE: LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE"); - if (isBlockTokenEnabled) { - blockPoolTokenSecretManager.setKeys(blockPoolId, + if (dn.isBlockTokenEnabled) { + dn.blockPoolTokenSecretManager.setKeys(blockPoolId, ((KeyUpdateCommand) cmd).getExportedKeys()); } break; @@ -1430,7 +1351,7 @@ public class DataNode extends Configured ((BalancerBandwidthCommand) cmd).getBalancerBandwidthValue(); if (bandwidth > 0) { DataXceiverServer dxcs = - (DataXceiverServer) dataXceiverServer.getRunnable(); + (DataXceiverServer) dn.dataXceiverServer.getRunnable(); dxcs.balanceThrottler.setBandwidth(bandwidth); } break; @@ -1449,7 +1370,7 @@ public class DataNode extends Configured synchronized UpgradeManagerDatanode getUpgradeManager() { if(upgradeManager == null) upgradeManager = - new UpgradeManagerDatanode(DataNode.this, blockPoolId); + new UpgradeManagerDatanode(dn, blockPoolId); return upgradeManager; } @@ -1509,6 +1430,133 @@ public class DataNode extends Configured blockPoolManager = new BlockPoolManager(conf); } + /** + * Check that the registration returned from a NameNode is consistent + * with the information in the storage. If the storage is fresh/unformatted, + * sets the storage ID based on this registration. + * Also updates the block pool's state in the secret manager. + */ + private synchronized void bpRegistrationSucceeded(DatanodeRegistration bpRegistration, + String blockPoolId) + throws IOException { + hostName = bpRegistration.getHost(); + + if (storage.getStorageID().equals("")) { + // This is a fresh datanode -- take the storage ID provided by the + // NN and persist it. + storage.setStorageID(bpRegistration.getStorageID()); + storage.writeAll(); + LOG.info("New storage id " + bpRegistration.getStorageID() + + " is assigned to data-node " + bpRegistration.getName()); + } else if(!storage.getStorageID().equals(bpRegistration.getStorageID())) { + throw new IOException("Inconsistent storage IDs. Name-node returned " + + bpRegistration.getStorageID() + + ". Expecting " + storage.getStorageID()); + } + + registerBlockPoolWithSecretManager(bpRegistration, blockPoolId); + } + + /** + * After the block pool has contacted the NN, registers that block pool + * with the secret manager, updating it with the secrets provided by the NN. + * @param bpRegistration + * @param blockPoolId + * @throws IOException + */ + private void registerBlockPoolWithSecretManager(DatanodeRegistration bpRegistration, + String blockPoolId) throws IOException { + ExportedBlockKeys keys = bpRegistration.exportedKeys; + isBlockTokenEnabled = keys.isBlockTokenEnabled(); + // TODO should we check that all federated nns are either enabled or + // disabled? + if (!isBlockTokenEnabled) return; + + if (!blockPoolTokenSecretManager.isBlockPoolRegistered(blockPoolId)) { + long blockKeyUpdateInterval = keys.getKeyUpdateInterval(); + long blockTokenLifetime = keys.getTokenLifetime(); + LOG.info("Block token params received from NN: for block pool " + + blockPoolId + " keyUpdateInterval=" + + blockKeyUpdateInterval / (60 * 1000) + + " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000) + + " min(s)"); + final BlockTokenSecretManager secretMgr = + new BlockTokenSecretManager(false, 0, blockTokenLifetime); + blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr); + } + + blockPoolTokenSecretManager.setKeys(blockPoolId, + bpRegistration.exportedKeys); + bpRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS; + } + + /** + * Remove the given block pool from the block scanner, dataset, and storage. + */ + private void shutdownBlockPool(BPOfferService bpos) { + blockPoolManager.remove(bpos); + + String bpId = bpos.getBlockPoolId(); + if (blockScanner != null) { + blockScanner.removeBlockPool(bpId); + } + + if (data != null) { + data.shutdownBlockPool(bpId); + } + + if (storage != null) { + storage.removeBlockPoolStorage(bpId); + } + } + + void initBlockPool(BPOfferService bpOfferService, + NamespaceInfo nsInfo) throws IOException { + String blockPoolId = nsInfo.getBlockPoolID(); + + blockPoolManager.addBlockPool(bpOfferService); + + synchronized (this) { + // we do not allow namenode from different cluster to register + if(clusterId != null && !clusterId.equals(nsInfo.clusterID)) { + throw new IOException( + "cannot register with the namenode because clusterid do not match:" + + " nn=" + nsInfo.getBlockPoolID() + "; nn cid=" + nsInfo.clusterID + + ";dn cid=" + clusterId); + } + + setClusterId(nsInfo.clusterID); + } + + StartupOption startOpt = getStartupOption(conf); + assert startOpt != null : "Startup option must be set."; + + boolean simulatedFSDataset = conf.getBoolean( + DFS_DATANODE_SIMULATEDDATASTORAGE_KEY, + DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT); + + if (!simulatedFSDataset) { + // read storage info, lock data dirs and transition fs state if necessary + storage.recoverTransitionRead(DataNode.this, blockPoolId, nsInfo, + dataDirs, startOpt); + StorageInfo bpStorage = storage.getBPStorage(blockPoolId); + LOG.info("setting up storage: nsid=" + + bpStorage.getNamespaceID() + ";bpid=" + + blockPoolId + ";lv=" + storage.getLayoutVersion() + + ";nsInfo=" + nsInfo); + } + initFsDataSet(); + initPeriodicScanners(conf); + data.addBlockPool(nsInfo.getBlockPoolID(), conf); + } + + private DatanodeRegistration createRegistration() { + DatanodeRegistration reg = new DatanodeRegistration(getMachineName()); + reg.setInfoPort(infoServer.getPort()); + reg.setIpcPort(getIpcPort()); + return reg; + } + BPOfferService[] getAllBpOs() { return blockPoolManager.getAllNamenodeThreads(); } @@ -1521,8 +1569,7 @@ public class DataNode extends Configured * Initializes the {@link #data}. The initialization is done only once, when * handshake with the the first namenode is completed. */ - private synchronized void initFsDataSet(Configuration conf, - AbstractList dataDirs) throws IOException { + private synchronized void initFsDataSet() throws IOException { if (data != null) { // Already initialized return; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java index 4d7740455ff..a3d47b623ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java @@ -61,7 +61,7 @@ public class DataNodeTestUtils { bpos.setNamespaceInfo(nsifno); dn.setBPNamenode(bpid, nn); - bpos.setupBPStorage(); + dn.initBlockPool(bpos, nsifno); } } }