HDFS-6940. Refactoring to allow ConsensusNode implementation.

Contributed by Konstantin Shvachko.
This commit is contained in:
Konstantin V Shvachko 2014-09-06 12:04:49 -07:00
parent 035112f251
commit 1ea3883553
6 changed files with 57 additions and 26 deletions

View File

@ -188,6 +188,8 @@ Release 2.6.0 - UNRELEASED
HDFS-6376. Distcp data between two HA clusters requires another configuration. HDFS-6376. Distcp data between two HA clusters requires another configuration.
(Dave Marion and Haohui Mai via jing9) (Dave Marion and Haohui Mai via jing9)
HDFS-6940. Refactoring to allow ConsensusNode implementation. (shv)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang) HDFS-6690. Deduplicate xattr names in memory. (wang)

View File

@ -164,7 +164,7 @@ public class BlockManager {
final BlocksMap blocksMap; final BlocksMap blocksMap;
/** Replication thread. */ /** Replication thread. */
final Daemon replicationThread = new Daemon(new ReplicationMonitor()); Daemon replicationThread;
/** Store blocks -> datanodedescriptor(s) map of corrupt replicas */ /** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap(); final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
@ -263,6 +263,7 @@ public class BlockManager {
this.namesystem = namesystem; this.namesystem = namesystem;
datanodeManager = new DatanodeManager(this, namesystem, conf); datanodeManager = new DatanodeManager(this, namesystem, conf);
heartbeatManager = datanodeManager.getHeartbeatManager(); heartbeatManager = datanodeManager.getHeartbeatManager();
setReplicationMonitor(new ReplicationMonitor());
final long pendingPeriod = conf.getLong( final long pendingPeriod = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY, DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY,
@ -395,6 +396,22 @@ public class BlockManager {
} }
} }
public long getReplicationRecheckInterval() {
return replicationRecheckInterval;
}
public AtomicLong excessBlocksCount() {
return excessBlocksCount;
}
public void clearInvalidateBlocks() {
invalidateBlocks.clear();
}
void setReplicationMonitor(Runnable replicationMonitor) {
replicationThread = new Daemon(replicationMonitor);
}
public void setBlockPoolId(String blockPoolId) { public void setBlockPoolId(String blockPoolId) {
if (isBlockTokenEnabled()) { if (isBlockTokenEnabled()) {
blockTokenSecretManager.setBlockPoolId(blockPoolId); blockTokenSecretManager.setBlockPoolId(blockPoolId);
@ -1619,7 +1636,7 @@ public class BlockManager {
* If there were any replication requests that timed out, reap them * If there were any replication requests that timed out, reap them
* and put them back into the neededReplication queue * and put them back into the neededReplication queue
*/ */
private void processPendingReplications() { void processPendingReplications() {
Block[] timedOutItems = pendingReplications.getTimedOutBlocks(); Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
if (timedOutItems != null) { if (timedOutItems != null) {
namesystem.writeLock(); namesystem.writeLock();

View File

@ -1052,7 +1052,7 @@ public class DatanodeManager {
* 3. Added to exclude --> start decommission. * 3. Added to exclude --> start decommission.
* 4. Removed from exclude --> stop decommission. * 4. Removed from exclude --> stop decommission.
*/ */
private void refreshDatanodes() { void refreshDatanodes() {
for(DatanodeDescriptor node : datanodeMap.values()) { for(DatanodeDescriptor node : datanodeMap.values()) {
// Check if not include. // Check if not include.
if (!hostFileManager.isIncluded(node)) { if (!hostFileManager.isIncluded(node)) {
@ -1585,5 +1585,9 @@ public class DatanodeManager {
public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) { public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) {
this.shouldSendCachingCommands = shouldSendCachingCommands; this.shouldSendCachingCommands = shouldSendCachingCommands;
} }
public HostFileManager getHostFileManager() {
return this.hostFileManager;
}
} }

View File

@ -129,6 +129,10 @@ class HostFileManager {
void refresh(String includeFile, String excludeFile) throws IOException { void refresh(String includeFile, String excludeFile) throws IOException {
HostSet newIncludes = readFile("included", includeFile); HostSet newIncludes = readFile("included", includeFile);
HostSet newExcludes = readFile("excluded", excludeFile); HostSet newExcludes = readFile("excluded", excludeFile);
setHosts(newIncludes, newExcludes);
}
void setHosts(HostSet newIncludes, HostSet newExcludes) {
synchronized (this) { synchronized (this) {
includes = newIncludes; includes = newIncludes;
excludes = newExcludes; excludes = newExcludes;

View File

@ -982,7 +982,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return Collections.unmodifiableList(auditLoggers); return Collections.unmodifiableList(auditLoggers);
} }
private void loadFSImage(StartupOption startOpt) throws IOException { protected void loadFSImage(StartupOption startOpt) throws IOException {
final FSImage fsImage = getFSImage(); final FSImage fsImage = getFSImage();
// format before starting up if requested // format before starting up if requested
@ -1030,7 +1030,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
imageLoadComplete(); imageLoadComplete();
} }
private void startSecretManager() { protected void startSecretManager() {
if (dtSecretManager != null) { if (dtSecretManager != null) {
try { try {
dtSecretManager.startThreads(); dtSecretManager.startThreads();
@ -1042,7 +1042,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
} }
private void startSecretManagerIfNecessary() { protected void startSecretManagerIfNecessary() {
boolean shouldRun = shouldUseDelegationTokens() && boolean shouldRun = shouldUseDelegationTokens() &&
!isInSafeMode() && getEditLog().isOpenForWrite(); !isInSafeMode() && getEditLog().isOpenForWrite();
boolean running = dtSecretManager.isRunning(); boolean running = dtSecretManager.isRunning();
@ -1192,7 +1192,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return haEnabled && inActiveState() && startingActiveService; return haEnabled && inActiveState() && startingActiveService;
} }
private boolean shouldUseDelegationTokens() { protected boolean shouldUseDelegationTokens() {
return UserGroupInformation.isSecurityEnabled() || return UserGroupInformation.isSecurityEnabled() ||
alwaysUseDelegationTokensForTests; alwaysUseDelegationTokensForTests;
} }
@ -2736,6 +2736,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @throws UnresolvedLinkException * @throws UnresolvedLinkException
* @throws IOException * @throws IOException
*/ */
protected
LocatedBlock prepareFileForWrite(String src, INodeFile file, LocatedBlock prepareFileForWrite(String src, INodeFile file,
String leaseHolder, String clientMachine, String leaseHolder, String clientMachine,
boolean writeToEditLog, boolean writeToEditLog,
@ -3198,6 +3199,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return new FileState(pendingFile, src); return new FileState(pendingFile, src);
} }
protected
LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs, LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs,
long offset) throws IOException { long offset) throws IOException {
LocatedBlock lBlk = new LocatedBlock( LocatedBlock lBlk = new LocatedBlock(
@ -3315,7 +3317,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return true; return true;
} }
private INodeFile checkLease(String src, String holder, INode inode, protected INodeFile checkLease(String src, String holder, INode inode,
long fileId) long fileId)
throws LeaseExpiredException, FileNotFoundException { throws LeaseExpiredException, FileNotFoundException {
assert hasReadLock(); assert hasReadLock();
@ -4433,7 +4435,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return leaseManager.reassignLease(lease, src, newHolder); return leaseManager.reassignLease(lease, src, newHolder);
} }
private void commitOrCompleteLastBlock(final INodeFile fileINode, protected void commitOrCompleteLastBlock(final INodeFile fileINode,
final Block commitBlock) throws IOException { final Block commitBlock) throws IOException {
assert hasWriteLock(); assert hasWriteLock();
Preconditions.checkArgument(fileINode.isUnderConstruction()); Preconditions.checkArgument(fileINode.isUnderConstruction());
@ -4829,6 +4831,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @return an array of datanode commands * @return an array of datanode commands
* @throws IOException * @throws IOException
*/ */
protected
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg, HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, long cacheCapacity, long cacheUsed, StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xceiverCount, int xmitsInProgress, int failedVolumes) int xceiverCount, int xmitsInProgress, int failedVolumes)
@ -4878,7 +4881,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @param file * @param file
* @param logRetryCache * @param logRetryCache
*/ */
private void persistBlocks(String path, INodeFile file, protected void persistBlocks(String path, INodeFile file,
boolean logRetryCache) { boolean logRetryCache) {
assert hasWriteLock(); assert hasWriteLock();
Preconditions.checkArgument(file.isUnderConstruction()); Preconditions.checkArgument(file.isUnderConstruction());
@ -5310,7 +5313,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @param path * @param path
* @param file * @param file
*/ */
private void persistNewBlock(String path, INodeFile file) { protected void persistNewBlock(String path, INodeFile file) {
Preconditions.checkArgument(file.isUnderConstruction()); Preconditions.checkArgument(file.isUnderConstruction());
getEditLog().logAddBlock(path, file); getEditLog().logAddBlock(path, file);
if (NameNode.stateChangeLog.isDebugEnabled()) { if (NameNode.stateChangeLog.isDebugEnabled()) {
@ -7188,7 +7191,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* *
* @return true if delegation token operation is allowed * @return true if delegation token operation is allowed
*/ */
private boolean isAllowedDelegationTokenOp() throws IOException { protected boolean isAllowedDelegationTokenOp() throws IOException {
AuthenticationMethod authMethod = getConnectionAuthenticationMethod(); AuthenticationMethod authMethod = getConnectionAuthenticationMethod();
if (UserGroupInformation.isSecurityEnabled() if (UserGroupInformation.isSecurityEnabled()
&& (authMethod != AuthenticationMethod.KERBEROS) && (authMethod != AuthenticationMethod.KERBEROS)
@ -7355,7 +7358,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>(); final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
blockManager.getDatanodeManager().fetchDatanodes(live, null, true); blockManager.getDatanodeManager().fetchDatanodes(live, null, true);
for (DatanodeDescriptor node : live) { for (DatanodeDescriptor node : live) {
Map<String, Object> innerinfo = ImmutableMap.<String, Object>builder() info.put(node.getHostName(), getLiveNodeInfo(node));
}
return JSON.toString(info);
}
protected Map<String, Object> getLiveNodeInfo(DatanodeDescriptor node) {
return ImmutableMap.<String, Object>builder()
.put("infoAddr", node.getInfoAddr()) .put("infoAddr", node.getInfoAddr())
.put("infoSecureAddr", node.getInfoSecureAddr()) .put("infoSecureAddr", node.getInfoSecureAddr())
.put("xferaddr", node.getXferAddr()) .put("xferaddr", node.getXferAddr())
@ -7373,10 +7382,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
.put("blockPoolUsedPercent", node.getBlockPoolUsedPercent()) .put("blockPoolUsedPercent", node.getBlockPoolUsedPercent())
.put("volfails", node.getVolumeFailures()) .put("volfails", node.getVolumeFailures())
.build(); .build();
info.put(node.getHostName(), innerinfo);
}
return JSON.toString(info);
} }
/** /**
@ -7662,16 +7667,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return fsLock.longReadLock; return fsLock.longReadLock;
} }
@VisibleForTesting
public SafeModeInfo getSafeModeInfoForTests() {
return safeMode;
}
@VisibleForTesting @VisibleForTesting
public void setNNResourceChecker(NameNodeResourceChecker nnResourceChecker) { public void setNNResourceChecker(NameNodeResourceChecker nnResourceChecker) {
this.nnResourceChecker = nnResourceChecker; this.nnResourceChecker = nnResourceChecker;
} }
public SafeModeInfo getSafeModeInfo() {
return safeMode;
}
@Override @Override
public boolean isAvoidingStaleDataNodesForWrite() { public boolean isAvoidingStaleDataNodesForWrite() {
return this.blockManager.getDatanodeManager() return this.blockManager.getDatanodeManager()

View File

@ -223,7 +223,7 @@ public class NameNodeAdapter {
* if safemode is not running. * if safemode is not running.
*/ */
public static int getSafeModeSafeBlocks(NameNode nn) { public static int getSafeModeSafeBlocks(NameNode nn) {
SafeModeInfo smi = nn.getNamesystem().getSafeModeInfoForTests(); SafeModeInfo smi = nn.getNamesystem().getSafeModeInfo();
if (smi == null) { if (smi == null) {
return -1; return -1;
} }