HDFS-6940. Refactoring to allow ConsensusNode implementation.
Contributed by Konstantin Shvachko.
This commit is contained in:
parent
3b35f81603
commit
88209ce181
|
@ -444,6 +444,8 @@ Release 2.6.0 - UNRELEASED
|
||||||
HDFS-6376. Distcp data between two HA clusters requires another configuration.
|
HDFS-6376. Distcp data between two HA clusters requires another configuration.
|
||||||
(Dave Marion and Haohui Mai via jing9)
|
(Dave Marion and Haohui Mai via jing9)
|
||||||
|
|
||||||
|
HDFS-6940. Refactoring to allow ConsensusNode implementation. (shv)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
||||||
|
|
|
@ -164,7 +164,7 @@ public class BlockManager {
|
||||||
final BlocksMap blocksMap;
|
final BlocksMap blocksMap;
|
||||||
|
|
||||||
/** Replication thread. */
|
/** Replication thread. */
|
||||||
final Daemon replicationThread = new Daemon(new ReplicationMonitor());
|
Daemon replicationThread;
|
||||||
|
|
||||||
/** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
|
/** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
|
||||||
final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
|
final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
|
||||||
|
@ -263,6 +263,7 @@ public class BlockManager {
|
||||||
this.namesystem = namesystem;
|
this.namesystem = namesystem;
|
||||||
datanodeManager = new DatanodeManager(this, namesystem, conf);
|
datanodeManager = new DatanodeManager(this, namesystem, conf);
|
||||||
heartbeatManager = datanodeManager.getHeartbeatManager();
|
heartbeatManager = datanodeManager.getHeartbeatManager();
|
||||||
|
setReplicationMonitor(new ReplicationMonitor());
|
||||||
|
|
||||||
final long pendingPeriod = conf.getLong(
|
final long pendingPeriod = conf.getLong(
|
||||||
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY,
|
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY,
|
||||||
|
@ -395,6 +396,22 @@ public class BlockManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getReplicationRecheckInterval() {
|
||||||
|
return replicationRecheckInterval;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AtomicLong excessBlocksCount() {
|
||||||
|
return excessBlocksCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void clearInvalidateBlocks() {
|
||||||
|
invalidateBlocks.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
void setReplicationMonitor(Runnable replicationMonitor) {
|
||||||
|
replicationThread = new Daemon(replicationMonitor);
|
||||||
|
}
|
||||||
|
|
||||||
public void setBlockPoolId(String blockPoolId) {
|
public void setBlockPoolId(String blockPoolId) {
|
||||||
if (isBlockTokenEnabled()) {
|
if (isBlockTokenEnabled()) {
|
||||||
blockTokenSecretManager.setBlockPoolId(blockPoolId);
|
blockTokenSecretManager.setBlockPoolId(blockPoolId);
|
||||||
|
@ -1616,7 +1633,7 @@ public class BlockManager {
|
||||||
* If there were any replication requests that timed out, reap them
|
* If there were any replication requests that timed out, reap them
|
||||||
* and put them back into the neededReplication queue
|
* and put them back into the neededReplication queue
|
||||||
*/
|
*/
|
||||||
private void processPendingReplications() {
|
void processPendingReplications() {
|
||||||
Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
|
Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
|
||||||
if (timedOutItems != null) {
|
if (timedOutItems != null) {
|
||||||
namesystem.writeLock();
|
namesystem.writeLock();
|
||||||
|
|
|
@ -1053,7 +1053,7 @@ public class DatanodeManager {
|
||||||
* 3. Added to exclude --> start decommission.
|
* 3. Added to exclude --> start decommission.
|
||||||
* 4. Removed from exclude --> stop decommission.
|
* 4. Removed from exclude --> stop decommission.
|
||||||
*/
|
*/
|
||||||
private void refreshDatanodes() {
|
void refreshDatanodes() {
|
||||||
for(DatanodeDescriptor node : datanodeMap.values()) {
|
for(DatanodeDescriptor node : datanodeMap.values()) {
|
||||||
// Check if not include.
|
// Check if not include.
|
||||||
if (!hostFileManager.isIncluded(node)) {
|
if (!hostFileManager.isIncluded(node)) {
|
||||||
|
@ -1586,5 +1586,9 @@ public class DatanodeManager {
|
||||||
public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) {
|
public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) {
|
||||||
this.shouldSendCachingCommands = shouldSendCachingCommands;
|
this.shouldSendCachingCommands = shouldSendCachingCommands;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public HostFileManager getHostFileManager() {
|
||||||
|
return this.hostFileManager;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -129,6 +129,10 @@ class HostFileManager {
|
||||||
void refresh(String includeFile, String excludeFile) throws IOException {
|
void refresh(String includeFile, String excludeFile) throws IOException {
|
||||||
HostSet newIncludes = readFile("included", includeFile);
|
HostSet newIncludes = readFile("included", includeFile);
|
||||||
HostSet newExcludes = readFile("excluded", excludeFile);
|
HostSet newExcludes = readFile("excluded", excludeFile);
|
||||||
|
setHosts(newIncludes, newExcludes);
|
||||||
|
}
|
||||||
|
|
||||||
|
void setHosts(HostSet newIncludes, HostSet newExcludes) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
includes = newIncludes;
|
includes = newIncludes;
|
||||||
excludes = newExcludes;
|
excludes = newExcludes;
|
||||||
|
|
|
@ -978,7 +978,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
return Collections.unmodifiableList(auditLoggers);
|
return Collections.unmodifiableList(auditLoggers);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void loadFSImage(StartupOption startOpt) throws IOException {
|
protected void loadFSImage(StartupOption startOpt) throws IOException {
|
||||||
final FSImage fsImage = getFSImage();
|
final FSImage fsImage = getFSImage();
|
||||||
|
|
||||||
// format before starting up if requested
|
// format before starting up if requested
|
||||||
|
@ -1026,7 +1026,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
imageLoadComplete();
|
imageLoadComplete();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void startSecretManager() {
|
protected void startSecretManager() {
|
||||||
if (dtSecretManager != null) {
|
if (dtSecretManager != null) {
|
||||||
try {
|
try {
|
||||||
dtSecretManager.startThreads();
|
dtSecretManager.startThreads();
|
||||||
|
@ -1038,7 +1038,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void startSecretManagerIfNecessary() {
|
protected void startSecretManagerIfNecessary() {
|
||||||
boolean shouldRun = shouldUseDelegationTokens() &&
|
boolean shouldRun = shouldUseDelegationTokens() &&
|
||||||
!isInSafeMode() && getEditLog().isOpenForWrite();
|
!isInSafeMode() && getEditLog().isOpenForWrite();
|
||||||
boolean running = dtSecretManager.isRunning();
|
boolean running = dtSecretManager.isRunning();
|
||||||
|
@ -1188,7 +1188,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
return haEnabled && inActiveState() && startingActiveService;
|
return haEnabled && inActiveState() && startingActiveService;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean shouldUseDelegationTokens() {
|
protected boolean shouldUseDelegationTokens() {
|
||||||
return UserGroupInformation.isSecurityEnabled() ||
|
return UserGroupInformation.isSecurityEnabled() ||
|
||||||
alwaysUseDelegationTokensForTests;
|
alwaysUseDelegationTokensForTests;
|
||||||
}
|
}
|
||||||
|
@ -2729,6 +2729,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
* @throws UnresolvedLinkException
|
* @throws UnresolvedLinkException
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
|
protected
|
||||||
LocatedBlock prepareFileForWrite(String src, INodeFile file,
|
LocatedBlock prepareFileForWrite(String src, INodeFile file,
|
||||||
String leaseHolder, String clientMachine,
|
String leaseHolder, String clientMachine,
|
||||||
boolean writeToEditLog,
|
boolean writeToEditLog,
|
||||||
|
@ -3185,6 +3186,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
return new FileState(pendingFile, src);
|
return new FileState(pendingFile, src);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected
|
||||||
LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs,
|
LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs,
|
||||||
long offset) throws IOException {
|
long offset) throws IOException {
|
||||||
LocatedBlock lBlk = new LocatedBlock(
|
LocatedBlock lBlk = new LocatedBlock(
|
||||||
|
@ -3302,7 +3304,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private INodeFile checkLease(String src, String holder, INode inode,
|
protected INodeFile checkLease(String src, String holder, INode inode,
|
||||||
long fileId)
|
long fileId)
|
||||||
throws LeaseExpiredException, FileNotFoundException {
|
throws LeaseExpiredException, FileNotFoundException {
|
||||||
assert hasReadLock();
|
assert hasReadLock();
|
||||||
|
@ -4420,7 +4422,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
return leaseManager.reassignLease(lease, src, newHolder);
|
return leaseManager.reassignLease(lease, src, newHolder);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void commitOrCompleteLastBlock(final INodeFile fileINode,
|
protected void commitOrCompleteLastBlock(final INodeFile fileINode,
|
||||||
final Block commitBlock) throws IOException {
|
final Block commitBlock) throws IOException {
|
||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
Preconditions.checkArgument(fileINode.isUnderConstruction());
|
Preconditions.checkArgument(fileINode.isUnderConstruction());
|
||||||
|
@ -4816,6 +4818,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
* @return an array of datanode commands
|
* @return an array of datanode commands
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
|
protected
|
||||||
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
|
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
|
||||||
StorageReport[] reports, long cacheCapacity, long cacheUsed,
|
StorageReport[] reports, long cacheCapacity, long cacheUsed,
|
||||||
int xceiverCount, int xmitsInProgress, int failedVolumes)
|
int xceiverCount, int xmitsInProgress, int failedVolumes)
|
||||||
|
@ -4865,7 +4868,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
* @param file
|
* @param file
|
||||||
* @param logRetryCache
|
* @param logRetryCache
|
||||||
*/
|
*/
|
||||||
private void persistBlocks(String path, INodeFile file,
|
protected void persistBlocks(String path, INodeFile file,
|
||||||
boolean logRetryCache) {
|
boolean logRetryCache) {
|
||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
Preconditions.checkArgument(file.isUnderConstruction());
|
Preconditions.checkArgument(file.isUnderConstruction());
|
||||||
|
@ -5297,7 +5300,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
* @param path
|
* @param path
|
||||||
* @param file
|
* @param file
|
||||||
*/
|
*/
|
||||||
private void persistNewBlock(String path, INodeFile file) {
|
protected void persistNewBlock(String path, INodeFile file) {
|
||||||
Preconditions.checkArgument(file.isUnderConstruction());
|
Preconditions.checkArgument(file.isUnderConstruction());
|
||||||
getEditLog().logAddBlock(path, file);
|
getEditLog().logAddBlock(path, file);
|
||||||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
|
@ -7175,7 +7178,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
*
|
*
|
||||||
* @return true if delegation token operation is allowed
|
* @return true if delegation token operation is allowed
|
||||||
*/
|
*/
|
||||||
private boolean isAllowedDelegationTokenOp() throws IOException {
|
protected boolean isAllowedDelegationTokenOp() throws IOException {
|
||||||
AuthenticationMethod authMethod = getConnectionAuthenticationMethod();
|
AuthenticationMethod authMethod = getConnectionAuthenticationMethod();
|
||||||
if (UserGroupInformation.isSecurityEnabled()
|
if (UserGroupInformation.isSecurityEnabled()
|
||||||
&& (authMethod != AuthenticationMethod.KERBEROS)
|
&& (authMethod != AuthenticationMethod.KERBEROS)
|
||||||
|
@ -7342,7 +7345,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
|
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
|
||||||
blockManager.getDatanodeManager().fetchDatanodes(live, null, true);
|
blockManager.getDatanodeManager().fetchDatanodes(live, null, true);
|
||||||
for (DatanodeDescriptor node : live) {
|
for (DatanodeDescriptor node : live) {
|
||||||
Map<String, Object> innerinfo = ImmutableMap.<String, Object>builder()
|
info.put(node.getHostName(), getLiveNodeInfo(node));
|
||||||
|
}
|
||||||
|
return JSON.toString(info);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Map<String, Object> getLiveNodeInfo(DatanodeDescriptor node) {
|
||||||
|
return ImmutableMap.<String, Object>builder()
|
||||||
.put("infoAddr", node.getInfoAddr())
|
.put("infoAddr", node.getInfoAddr())
|
||||||
.put("infoSecureAddr", node.getInfoSecureAddr())
|
.put("infoSecureAddr", node.getInfoSecureAddr())
|
||||||
.put("xferaddr", node.getXferAddr())
|
.put("xferaddr", node.getXferAddr())
|
||||||
|
@ -7360,10 +7369,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
.put("blockPoolUsedPercent", node.getBlockPoolUsedPercent())
|
.put("blockPoolUsedPercent", node.getBlockPoolUsedPercent())
|
||||||
.put("volfails", node.getVolumeFailures())
|
.put("volfails", node.getVolumeFailures())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
info.put(node.getHostName(), innerinfo);
|
|
||||||
}
|
|
||||||
return JSON.toString(info);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -7649,16 +7654,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
return fsLock.longReadLock;
|
return fsLock.longReadLock;
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public SafeModeInfo getSafeModeInfoForTests() {
|
|
||||||
return safeMode;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public void setNNResourceChecker(NameNodeResourceChecker nnResourceChecker) {
|
public void setNNResourceChecker(NameNodeResourceChecker nnResourceChecker) {
|
||||||
this.nnResourceChecker = nnResourceChecker;
|
this.nnResourceChecker = nnResourceChecker;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public SafeModeInfo getSafeModeInfo() {
|
||||||
|
return safeMode;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isAvoidingStaleDataNodesForWrite() {
|
public boolean isAvoidingStaleDataNodesForWrite() {
|
||||||
return this.blockManager.getDatanodeManager()
|
return this.blockManager.getDatanodeManager()
|
||||||
|
|
|
@ -223,7 +223,7 @@ public class NameNodeAdapter {
|
||||||
* if safemode is not running.
|
* if safemode is not running.
|
||||||
*/
|
*/
|
||||||
public static int getSafeModeSafeBlocks(NameNode nn) {
|
public static int getSafeModeSafeBlocks(NameNode nn) {
|
||||||
SafeModeInfo smi = nn.getNamesystem().getSafeModeInfoForTests();
|
SafeModeInfo smi = nn.getNamesystem().getSafeModeInfo();
|
||||||
if (smi == null) {
|
if (smi == null) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue