HDFS-7381. Decouple the management of block id and gen stamps from FSNamesystem. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2014-11-11 12:41:51 -08:00
parent 0fd97f9c19
commit 571e9c6232
9 changed files with 276 additions and 225 deletions

View File

@ -350,6 +350,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7365. Remove hdfs.server.blockmanagement.MutableBlockCollection. HDFS-7365. Remove hdfs.server.blockmanagement.MutableBlockCollection.
(Li Lu via wheat9) (Li Lu via wheat9)
HDFS-7381. Decouple the management of block id and gen stamps from
FSNamesystem. (wheat9)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -0,0 +1,208 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import java.io.IOException;
/**
* BlockIdManager allocates the generation stamps and the block ID. The
* {@see FSNamesystem} is responsible for persisting the allocations in the
* {@see EditLog}.
*/
public class BlockIdManager {
/**
* The global generation stamp for legacy blocks with randomly
* generated block IDs.
*/
private final GenerationStamp generationStampV1 = new GenerationStamp();
/**
* The global generation stamp for this file system.
*/
private final GenerationStamp generationStampV2 = new GenerationStamp();
/**
* The value of the generation stamp when the first switch to sequential
* block IDs was made. Blocks with generation stamps below this value
* have randomly allocated block IDs. Blocks with generation stamps above
* this value had sequentially allocated block IDs. Read from the fsImage
* (or initialized as an offset from the V1 (legacy) generation stamp on
* upgrade).
*/
private long generationStampV1Limit;
/**
* The global block ID space for this file system.
*/
private final SequentialBlockIdGenerator blockIdGenerator;
public BlockIdManager(BlockManager blockManager) {
this.generationStampV1Limit = GenerationStamp.GRANDFATHER_GENERATION_STAMP;
this.blockIdGenerator = new SequentialBlockIdGenerator(blockManager);
}
/**
* Upgrades the generation stamp for the filesystem
* by reserving a sufficient range for all existing blocks.
* Should be invoked only during the first upgrade to
* sequential block IDs.
*/
public long upgradeGenerationStampToV2() {
Preconditions.checkState(generationStampV2.getCurrentValue() ==
GenerationStamp.LAST_RESERVED_STAMP);
generationStampV2.skipTo(generationStampV1.getCurrentValue() +
HdfsConstants.RESERVED_GENERATION_STAMPS_V1);
generationStampV1Limit = generationStampV2.getCurrentValue();
return generationStampV2.getCurrentValue();
}
/**
* Sets the generation stamp that delineates random and sequentially
* allocated block IDs.
*
* @param stamp set generation stamp limit to this value
*/
public void setGenerationStampV1Limit(long stamp) {
Preconditions.checkState(generationStampV1Limit == GenerationStamp
.GRANDFATHER_GENERATION_STAMP);
generationStampV1Limit = stamp;
}
/**
* Gets the value of the generation stamp that delineates sequential
* and random block IDs.
*/
public long getGenerationStampAtblockIdSwitch() {
return generationStampV1Limit;
}
@VisibleForTesting
SequentialBlockIdGenerator getBlockIdGenerator() {
return blockIdGenerator;
}
/**
* Sets the maximum allocated block ID for this filesystem. This is
* the basis for allocating new block IDs.
*/
public void setLastAllocatedBlockId(long blockId) {
blockIdGenerator.skipTo(blockId);
}
/**
* Gets the maximum sequentially allocated block ID for this filesystem
*/
public long getLastAllocatedBlockId() {
return blockIdGenerator.getCurrentValue();
}
/**
* Sets the current generation stamp for legacy blocks
*/
public void setGenerationStampV1(long stamp) {
generationStampV1.setCurrentValue(stamp);
}
/**
* Gets the current generation stamp for legacy blocks
*/
public long getGenerationStampV1() {
return generationStampV1.getCurrentValue();
}
/**
* Gets the current generation stamp for this filesystem
*/
public void setGenerationStampV2(long stamp) {
generationStampV2.setCurrentValue(stamp);
}
public long getGenerationStampV2() {
return generationStampV2.getCurrentValue();
}
/**
* Increments, logs and then returns the stamp
*/
public long nextGenerationStamp(boolean legacyBlock) throws IOException {
return legacyBlock ? getNextGenerationStampV1() :
getNextGenerationStampV2();
}
@VisibleForTesting
long getNextGenerationStampV1() throws IOException {
long genStampV1 = generationStampV1.nextValue();
if (genStampV1 >= generationStampV1Limit) {
// We ran out of generation stamps for legacy blocks. In practice, it
// is extremely unlikely as we reserved 1T v1 generation stamps. The
// result is that we can no longer append to the legacy blocks that
// were created before the upgrade to sequential block IDs.
throw new OutOfV1GenerationStampsException();
}
return genStampV1;
}
@VisibleForTesting
long getNextGenerationStampV2() {
return generationStampV2.nextValue();
}
public long getGenerationStampV1Limit() {
return generationStampV1Limit;
}
/**
* Determine whether the block ID was randomly generated (legacy) or
* sequentially generated. The generation stamp value is used to
* make the distinction.
*
* @return true if the block ID was randomly generated, false otherwise.
*/
public boolean isLegacyBlock(Block block) {
return block.getGenerationStamp() < getGenerationStampV1Limit();
}
/**
* Increments, logs and then returns the block ID
*/
public long nextBlockId() {
return blockIdGenerator.nextValue();
}
public boolean isGenStampInFuture(Block block) {
if (isLegacyBlock(block)) {
return block.getGenerationStamp() > getGenerationStampV1();
} else {
return block.getGenerationStamp() > getGenerationStampV2();
}
}
public void clear() {
generationStampV1.setCurrentValue(GenerationStamp.LAST_RESERVED_STAMP);
generationStampV2.setCurrentValue(GenerationStamp.LAST_RESERVED_STAMP);
getBlockIdGenerator().setCurrentValue(SequentialBlockIdGenerator
.LAST_RESERVED_BLOCK_ID);
setGenerationStampV1Limit(GenerationStamp.GRANDFATHER_GENERATION_STAMP);
}
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;

View File

@ -532,7 +532,7 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
} }
case OP_SET_GENSTAMP_V1: { case OP_SET_GENSTAMP_V1: {
SetGenstampV1Op setGenstampV1Op = (SetGenstampV1Op)op; SetGenstampV1Op setGenstampV1Op = (SetGenstampV1Op)op;
fsNamesys.setGenerationStampV1(setGenstampV1Op.genStampV1); fsNamesys.getBlockIdManager().setGenerationStampV1(setGenstampV1Op.genStampV1);
break; break;
} }
case OP_SET_PERMISSIONS: { case OP_SET_PERMISSIONS: {
@ -722,12 +722,12 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
} }
case OP_SET_GENSTAMP_V2: { case OP_SET_GENSTAMP_V2: {
SetGenstampV2Op setGenstampV2Op = (SetGenstampV2Op) op; SetGenstampV2Op setGenstampV2Op = (SetGenstampV2Op) op;
fsNamesys.setGenerationStampV2(setGenstampV2Op.genStampV2); fsNamesys.getBlockIdManager().setGenerationStampV2(setGenstampV2Op.genStampV2);
break; break;
} }
case OP_ALLOCATE_BLOCK_ID: { case OP_ALLOCATE_BLOCK_ID: {
AllocateBlockIdOp allocateBlockIdOp = (AllocateBlockIdOp) op; AllocateBlockIdOp allocateBlockIdOp = (AllocateBlockIdOp) op;
fsNamesys.setLastAllocatedBlockId(allocateBlockIdOp.blockId); fsNamesys.getBlockIdManager().setLastAllocatedBlockId(allocateBlockIdOp.blockId);
break; break;
} }
case OP_ROLLING_UPGRADE_START: { case OP_ROLLING_UPGRADE_START: {

View File

@ -341,24 +341,26 @@ public void load(File curFile) throws IOException {
// read in the last generation stamp for legacy blocks. // read in the last generation stamp for legacy blocks.
long genstamp = in.readLong(); long genstamp = in.readLong();
namesystem.setGenerationStampV1(genstamp); namesystem.getBlockIdManager().setGenerationStampV1(genstamp);
if (NameNodeLayoutVersion.supports( if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.SEQUENTIAL_BLOCK_ID, imgVersion)) { LayoutVersion.Feature.SEQUENTIAL_BLOCK_ID, imgVersion)) {
// read the starting generation stamp for sequential block IDs // read the starting generation stamp for sequential block IDs
genstamp = in.readLong(); genstamp = in.readLong();
namesystem.setGenerationStampV2(genstamp); namesystem.getBlockIdManager().setGenerationStampV2(genstamp);
// read the last generation stamp for blocks created after // read the last generation stamp for blocks created after
// the switch to sequential block IDs. // the switch to sequential block IDs.
long stampAtIdSwitch = in.readLong(); long stampAtIdSwitch = in.readLong();
namesystem.setGenerationStampV1Limit(stampAtIdSwitch); namesystem.getBlockIdManager().setGenerationStampV1Limit(stampAtIdSwitch);
// read the max sequential block ID. // read the max sequential block ID.
long maxSequentialBlockId = in.readLong(); long maxSequentialBlockId = in.readLong();
namesystem.setLastAllocatedBlockId(maxSequentialBlockId); namesystem.getBlockIdManager().setLastAllocatedBlockId(maxSequentialBlockId);
} else { } else {
long startingGenStamp = namesystem.upgradeGenerationStampToV2();
long startingGenStamp = namesystem.getBlockIdManager()
.upgradeGenerationStampToV2();
// This is an upgrade. // This is an upgrade.
LOG.info("Upgrading to sequential block IDs. Generation stamp " + LOG.info("Upgrading to sequential block IDs. Generation stamp " +
"for new blocks set to " + startingGenStamp); "for new blocks set to " + startingGenStamp);
@ -1251,10 +1253,10 @@ void save(File newFile, FSImageCompression compression) throws IOException {
out.writeInt(sourceNamesystem.unprotectedGetNamespaceInfo() out.writeInt(sourceNamesystem.unprotectedGetNamespaceInfo()
.getNamespaceID()); .getNamespaceID());
out.writeLong(numINodes); out.writeLong(numINodes);
out.writeLong(sourceNamesystem.getGenerationStampV1()); out.writeLong(sourceNamesystem.getBlockIdManager().getGenerationStampV1());
out.writeLong(sourceNamesystem.getGenerationStampV2()); out.writeLong(sourceNamesystem.getBlockIdManager().getGenerationStampV2());
out.writeLong(sourceNamesystem.getGenerationStampAtblockIdSwitch()); out.writeLong(sourceNamesystem.getBlockIdManager().getGenerationStampAtblockIdSwitch());
out.writeLong(sourceNamesystem.getLastAllocatedBlockId()); out.writeLong(sourceNamesystem.getBlockIdManager().getLastAllocatedBlockId());
out.writeLong(context.getTxId()); out.writeLong(context.getTxId());
out.writeLong(sourceNamesystem.getLastInodeId()); out.writeLong(sourceNamesystem.getLastInodeId());

View File

@ -46,8 +46,7 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException; import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.NameSystemSection; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.NameSystemSection;
@ -293,10 +292,11 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) {
private void loadNameSystemSection(InputStream in) throws IOException { private void loadNameSystemSection(InputStream in) throws IOException {
NameSystemSection s = NameSystemSection.parseDelimitedFrom(in); NameSystemSection s = NameSystemSection.parseDelimitedFrom(in);
fsn.setGenerationStampV1(s.getGenstampV1()); BlockIdManager blockIdManager = fsn.getBlockIdManager();
fsn.setGenerationStampV2(s.getGenstampV2()); blockIdManager.setGenerationStampV1(s.getGenstampV1());
fsn.setGenerationStampV1Limit(s.getGenstampV1Limit()); blockIdManager.setGenerationStampV2(s.getGenstampV2());
fsn.setLastAllocatedBlockId(s.getLastAllocatedBlockId()); blockIdManager.setGenerationStampV1Limit(s.getGenstampV1Limit());
blockIdManager.setLastAllocatedBlockId(s.getLastAllocatedBlockId());
imgTxId = s.getTransactionId(); imgTxId = s.getTransactionId();
if (s.hasRollingUpgradeStartTime() if (s.hasRollingUpgradeStartTime()
&& fsn.getFSImage().hasRollbackFSImage()) { && fsn.getFSImage().hasRollbackFSImage()) {
@ -407,7 +407,7 @@ void save(File file, FSImageCompression compression) throws IOException {
FileOutputStream fout = new FileOutputStream(file); FileOutputStream fout = new FileOutputStream(file);
fileChannel = fout.getChannel(); fileChannel = fout.getChannel();
try { try {
saveInternal(fout, compression, file.getAbsolutePath().toString()); saveInternal(fout, compression, file.getAbsolutePath());
} finally { } finally {
fout.close(); fout.close();
} }
@ -531,11 +531,12 @@ private void saveNameSystemSection(FileSummary.Builder summary)
throws IOException { throws IOException {
final FSNamesystem fsn = context.getSourceNamesystem(); final FSNamesystem fsn = context.getSourceNamesystem();
OutputStream out = sectionOutputStream; OutputStream out = sectionOutputStream;
BlockIdManager blockIdManager = fsn.getBlockIdManager();
NameSystemSection.Builder b = NameSystemSection.newBuilder() NameSystemSection.Builder b = NameSystemSection.newBuilder()
.setGenstampV1(fsn.getGenerationStampV1()) .setGenstampV1(blockIdManager.getGenerationStampV1())
.setGenstampV1Limit(fsn.getGenerationStampV1Limit()) .setGenstampV1Limit(blockIdManager.getGenerationStampV1Limit())
.setGenstampV2(fsn.getGenerationStampV2()) .setGenstampV2(blockIdManager.getGenerationStampV2())
.setLastAllocatedBlockId(fsn.getLastAllocatedBlockId()) .setLastAllocatedBlockId(blockIdManager.getLastAllocatedBlockId())
.setTransactionId(context.getTxId()); .setTransactionId(context.getTxId());
// We use the non-locked version of getNamespaceInfo here since // We use the non-locked version of getNamespaceInfo here since

View File

@ -204,6 +204,7 @@
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager.SecretManagerState; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager.SecretManagerState;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@ -211,8 +212,6 @@
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.OutOfV1GenerationStampsException;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
@ -331,6 +330,8 @@ protected StringBuilder initialValue() {
} }
}; };
private final BlockIdManager blockIdManager;
@VisibleForTesting @VisibleForTesting
public boolean isAuditEnabled() { public boolean isAuditEnabled() {
return !isDefaultAuditLogger || auditLog.isInfoEnabled(); return !isDefaultAuditLogger || auditLog.isInfoEnabled();
@ -490,34 +491,6 @@ private void logAuditEvent(boolean succeeded,
private final long minBlockSize; // minimum block size private final long minBlockSize; // minimum block size
private final long maxBlocksPerFile; // maximum # of blocks per file private final long maxBlocksPerFile; // maximum # of blocks per file
/**
* The global generation stamp for legacy blocks with randomly
* generated block IDs.
*/
private final GenerationStamp generationStampV1 = new GenerationStamp();
/**
* The global generation stamp for this file system.
*/
private final GenerationStamp generationStampV2 = new GenerationStamp();
/**
* The value of the generation stamp when the first switch to sequential
* block IDs was made. Blocks with generation stamps below this value
* have randomly allocated block IDs. Blocks with generation stamps above
* this value had sequentially allocated block IDs. Read from the fsImage
* (or initialized as an offset from the V1 (legacy) generation stamp on
* upgrade).
*/
private long generationStampV1Limit =
GenerationStamp.GRANDFATHER_GENERATION_STAMP;
/**
* The global block ID space for this file system.
*/
@VisibleForTesting
private final SequentialBlockIdGenerator blockIdGenerator;
// precision of access times. // precision of access times.
private final long accessTimePrecision; private final long accessTimePrecision;
@ -646,11 +619,7 @@ public long allocateNewInodeId() {
void clear() { void clear() {
dir.reset(); dir.reset();
dtSecretManager.reset(); dtSecretManager.reset();
generationStampV1.setCurrentValue(GenerationStamp.LAST_RESERVED_STAMP); blockIdManager.clear();
generationStampV2.setCurrentValue(GenerationStamp.LAST_RESERVED_STAMP);
blockIdGenerator.setCurrentValue(
SequentialBlockIdGenerator.LAST_RESERVED_BLOCK_ID);
generationStampV1Limit = GenerationStamp.GRANDFATHER_GENERATION_STAMP;
leaseManager.removeAllLeases(); leaseManager.removeAllLeases();
inodeId.setCurrentValue(INodeId.LAST_RESERVED_ID); inodeId.setCurrentValue(INodeId.LAST_RESERVED_ID);
snapshotManager.clearSnapshottableDirs(); snapshotManager.clearSnapshottableDirs();
@ -798,7 +767,7 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
this.blockManager = new BlockManager(this, this, conf); this.blockManager = new BlockManager(this, this, conf);
this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics(); this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
this.blockIdGenerator = new SequentialBlockIdGenerator(this.blockManager); this.blockIdManager = new BlockIdManager(blockManager);
this.isStoragePolicyEnabled = this.isStoragePolicyEnabled =
conf.getBoolean(DFS_STORAGE_POLICY_ENABLED_KEY, conf.getBoolean(DFS_STORAGE_POLICY_ENABLED_KEY,
@ -1360,7 +1329,7 @@ public void checkOperation(OperationCategory op) throws StandbyException {
* @throws SafeModeException * @throws SafeModeException
* Otherwise if NameNode is in SafeMode. * Otherwise if NameNode is in SafeMode.
*/ */
private void checkNameNodeSafeMode(String errorMsg) void checkNameNodeSafeMode(String errorMsg)
throws RetriableException, SafeModeException { throws RetriableException, SafeModeException {
if (isInSafeMode()) { if (isInSafeMode()) {
SafeModeException se = new SafeModeException(errorMsg, safeMode); SafeModeException se = new SafeModeException(errorMsg, safeMode);
@ -4591,7 +4560,7 @@ boolean internalReleaseLease(Lease lease, String src,
return true; return true;
} }
// start recovery of the last block for this file // start recovery of the last block for this file
long blockRecoveryId = nextGenerationStamp(isLegacyBlock(uc)); long blockRecoveryId = nextGenerationStamp(blockIdManager.isLegacyBlock(uc));
lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile); lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
uc.initializeBlockRecovery(blockRecoveryId); uc.initializeBlockRecovery(blockRecoveryId);
leaseManager.renewLease(lease); leaseManager.renewLease(lease);
@ -6727,91 +6696,6 @@ public int getNumStaleStorages() {
return getBlockManager().getDatanodeManager().getNumStaleStorages(); return getBlockManager().getDatanodeManager().getNumStaleStorages();
} }
/**
* Sets the current generation stamp for legacy blocks
*/
void setGenerationStampV1(long stamp) {
generationStampV1.setCurrentValue(stamp);
}
/**
* Gets the current generation stamp for legacy blocks
*/
long getGenerationStampV1() {
return generationStampV1.getCurrentValue();
}
/**
* Gets the current generation stamp for this filesystem
*/
void setGenerationStampV2(long stamp) {
generationStampV2.setCurrentValue(stamp);
}
/**
* Gets the current generation stamp for this filesystem
*/
long getGenerationStampV2() {
return generationStampV2.getCurrentValue();
}
/**
* Upgrades the generation stamp for the filesystem
* by reserving a sufficient range for all existing blocks.
* Should be invoked only during the first upgrade to
* sequential block IDs.
*/
long upgradeGenerationStampToV2() {
Preconditions.checkState(generationStampV2.getCurrentValue() ==
GenerationStamp.LAST_RESERVED_STAMP);
generationStampV2.skipTo(
generationStampV1.getCurrentValue() +
HdfsConstants.RESERVED_GENERATION_STAMPS_V1);
generationStampV1Limit = generationStampV2.getCurrentValue();
return generationStampV2.getCurrentValue();
}
/**
* Sets the generation stamp that delineates random and sequentially
* allocated block IDs.
* @param stamp set generation stamp limit to this value
*/
void setGenerationStampV1Limit(long stamp) {
Preconditions.checkState(generationStampV1Limit ==
GenerationStamp.GRANDFATHER_GENERATION_STAMP);
generationStampV1Limit = stamp;
}
/**
* Gets the value of the generation stamp that delineates sequential
* and random block IDs.
*/
long getGenerationStampAtblockIdSwitch() {
return generationStampV1Limit;
}
@VisibleForTesting
SequentialBlockIdGenerator getBlockIdGenerator() {
return blockIdGenerator;
}
/**
* Sets the maximum allocated block ID for this filesystem. This is
* the basis for allocating new block IDs.
*/
void setLastAllocatedBlockId(long blockId) {
blockIdGenerator.skipTo(blockId);
}
/**
* Gets the maximum sequentially allocated block ID for this filesystem
*/
long getLastAllocatedBlockId() {
return blockIdGenerator.getCurrentValue();
}
/** /**
* Increments, logs and then returns the stamp * Increments, logs and then returns the stamp
*/ */
@ -6820,12 +6704,10 @@ long nextGenerationStamp(boolean legacyBlock)
assert hasWriteLock(); assert hasWriteLock();
checkNameNodeSafeMode("Cannot get next generation stamp"); checkNameNodeSafeMode("Cannot get next generation stamp");
long gs; long gs = blockIdManager.nextGenerationStamp(legacyBlock);
if (legacyBlock) { if (legacyBlock) {
gs = getNextGenerationStampV1();
getEditLog().logGenerationStampV1(gs); getEditLog().logGenerationStampV1(gs);
} else { } else {
gs = getNextGenerationStampV2();
getEditLog().logGenerationStampV2(gs); getEditLog().logGenerationStampV2(gs);
} }
@ -6833,47 +6715,13 @@ long nextGenerationStamp(boolean legacyBlock)
return gs; return gs;
} }
@VisibleForTesting
long getNextGenerationStampV1() throws IOException {
long genStampV1 = generationStampV1.nextValue();
if (genStampV1 >= generationStampV1Limit) {
// We ran out of generation stamps for legacy blocks. In practice, it
// is extremely unlikely as we reserved 1T v1 generation stamps. The
// result is that we can no longer append to the legacy blocks that
// were created before the upgrade to sequential block IDs.
throw new OutOfV1GenerationStampsException();
}
return genStampV1;
}
@VisibleForTesting
long getNextGenerationStampV2() {
return generationStampV2.nextValue();
}
long getGenerationStampV1Limit() {
return generationStampV1Limit;
}
/**
* Determine whether the block ID was randomly generated (legacy) or
* sequentially generated. The generation stamp value is used to
* make the distinction.
* @return true if the block ID was randomly generated, false otherwise.
*/
boolean isLegacyBlock(Block block) {
return block.getGenerationStamp() < getGenerationStampV1Limit();
}
/** /**
* Increments, logs and then returns the block ID * Increments, logs and then returns the block ID
*/ */
private long nextBlockId() throws IOException { private long nextBlockId() throws IOException {
assert hasWriteLock(); assert hasWriteLock();
checkNameNodeSafeMode("Cannot get next block ID"); checkNameNodeSafeMode("Cannot get next block ID");
final long blockId = blockIdGenerator.nextValue(); final long blockId = blockIdManager.nextBlockId();
getEditLog().logAllocateBlockId(blockId); getEditLog().logAllocateBlockId(blockId);
// NB: callers sync the log // NB: callers sync the log
return blockId; return blockId;
@ -6988,8 +6836,7 @@ LocatedBlock updateBlockForPipeline(ExtendedBlock block,
checkUCBlock(block, clientName); checkUCBlock(block, clientName);
// get a new generation stamp and an access token // get a new generation stamp and an access token
block.setGenerationStamp( block.setGenerationStamp(nextGenerationStamp(blockIdManager.isLegacyBlock(block.getLocalBlock())));
nextGenerationStamp(isLegacyBlock(block.getLocalBlock())));
locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]); locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
blockManager.setBlockToken(locatedBlock, AccessMode.WRITE); blockManager.setBlockToken(locatedBlock, AccessMode.WRITE);
} finally { } finally {
@ -7861,6 +7708,11 @@ public String getCompileInfo() {
public BlockManager getBlockManager() { public BlockManager getBlockManager() {
return blockManager; return blockManager;
} }
public BlockIdManager getBlockIdManager() {
return blockIdManager;
}
/** @return the FSDirectory. */ /** @return the FSDirectory. */
public FSDirectory getFSDirectory() { public FSDirectory getFSDirectory() {
return dir; return dir;
@ -7928,11 +7780,7 @@ public synchronized void verifyToken(DelegationTokenIdentifier identifier,
@Override @Override
public boolean isGenStampInFuture(Block block) { public boolean isGenStampInFuture(Block block) {
if (isLegacyBlock(block)) { return blockIdManager.isGenStampInFuture(block);
return block.getGenerationStamp() > getGenerationStampV1();
} else {
return block.getGenerationStamp() > getGenerationStampV2();
}
} }
@VisibleForTesting @VisibleForTesting

View File

@ -15,10 +15,9 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -31,9 +30,8 @@
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.junit.Test; import org.junit.Test;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
@ -45,20 +43,13 @@
* collision handling. * collision handling.
*/ */
public class TestSequentialBlockId { public class TestSequentialBlockId {
private static final Log LOG = LogFactory.getLog("TestSequentialBlockId"); private static final Log LOG = LogFactory.getLog("TestSequentialBlockId");
private static final DataChecksum DEFAULT_CHECKSUM =
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512);
final int BLOCK_SIZE = 1024; final int BLOCK_SIZE = 1024;
final int IO_SIZE = BLOCK_SIZE; final int IO_SIZE = BLOCK_SIZE;
final short REPLICATION = 1; final short REPLICATION = 1;
final long SEED = 0; final long SEED = 0;
DatanodeID datanode;
InetSocketAddress dnAddr;
/** /**
* Test that block IDs are generated sequentially. * Test that block IDs are generated sequentially.
* *
@ -125,7 +116,8 @@ public void testTriggerBlockIdCollision() throws IOException {
// Rewind the block ID counter in the name system object. This will result // Rewind the block ID counter in the name system object. This will result
// in block ID collisions when we try to allocate new blocks. // in block ID collisions when we try to allocate new blocks.
SequentialBlockIdGenerator blockIdGenerator = fsn.getBlockIdGenerator(); SequentialBlockIdGenerator blockIdGenerator = fsn.getBlockIdManager()
.getBlockIdGenerator();
blockIdGenerator.setCurrentValue(blockIdGenerator.getCurrentValue() - 5); blockIdGenerator.setCurrentValue(blockIdGenerator.getCurrentValue() - 5);
// Trigger collisions by creating a new file. // Trigger collisions by creating a new file.
@ -156,10 +148,10 @@ public void testBlockTypeDetection() throws IOException {
// Setup a mock object and stub out a few routines to // Setup a mock object and stub out a few routines to
// retrieve the generation stamp counters. // retrieve the generation stamp counters.
FSNamesystem fsn = mock(FSNamesystem.class); BlockIdManager bid = mock(BlockIdManager.class);
final long maxGenStampForLegacyBlocks = 10000; final long maxGenStampForLegacyBlocks = 10000;
when(fsn.getGenerationStampV1Limit()) when(bid.getGenerationStampV1Limit())
.thenReturn(maxGenStampForLegacyBlocks); .thenReturn(maxGenStampForLegacyBlocks);
Block legacyBlock = spy(new Block()); Block legacyBlock = spy(new Block());
@ -172,9 +164,9 @@ public void testBlockTypeDetection() throws IOException {
// Make sure that isLegacyBlock() can correctly detect // Make sure that isLegacyBlock() can correctly detect
// legacy and new blocks. // legacy and new blocks.
when(fsn.isLegacyBlock(any(Block.class))).thenCallRealMethod(); when(bid.isLegacyBlock(any(Block.class))).thenCallRealMethod();
assertThat(fsn.isLegacyBlock(legacyBlock), is(true)); assertThat(bid.isLegacyBlock(legacyBlock), is(true));
assertThat(fsn.isLegacyBlock(newBlock), is(false)); assertThat(bid.isLegacyBlock(newBlock), is(false));
} }
/** /**
@ -185,25 +177,21 @@ public void testBlockTypeDetection() throws IOException {
*/ */
@Test @Test
public void testGenerationStampUpdate() throws IOException { public void testGenerationStampUpdate() throws IOException {
// Setup a mock object and stub out a few routines to // Setup a mock object and stub out a few routines to
// retrieve the generation stamp counters. // retrieve the generation stamp counters.
FSNamesystem fsn = mock(FSNamesystem.class); BlockIdManager bid = mock(BlockIdManager.class);
FSEditLog editLog = mock(FSEditLog.class);
final long nextGenerationStampV1 = 5000; final long nextGenerationStampV1 = 5000;
final long nextGenerationStampV2 = 20000; final long nextGenerationStampV2 = 20000;
when(fsn.getNextGenerationStampV1()) when(bid.getNextGenerationStampV1())
.thenReturn(nextGenerationStampV1); .thenReturn(nextGenerationStampV1);
when(fsn.getNextGenerationStampV2()) when(bid.getNextGenerationStampV2())
.thenReturn(nextGenerationStampV2); .thenReturn(nextGenerationStampV2);
// Make sure that the generation stamp is set correctly for both // Make sure that the generation stamp is set correctly for both
// kinds of blocks. // kinds of blocks.
when(fsn.nextGenerationStamp(anyBoolean())).thenCallRealMethod(); when(bid.nextGenerationStamp(anyBoolean())).thenCallRealMethod();
when(fsn.hasWriteLock()).thenReturn(true); assertThat(bid.nextGenerationStamp(true), is(nextGenerationStampV1));
when(fsn.getEditLog()).thenReturn(editLog); assertThat(bid.nextGenerationStamp(false), is(nextGenerationStampV2));
assertThat(fsn.nextGenerationStamp(true), is(nextGenerationStampV1));
assertThat(fsn.nextGenerationStamp(false), is(nextGenerationStampV2));
} }
} }

View File

@ -50,6 +50,7 @@
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
@ -516,7 +517,9 @@ public void testCancelSaveNamespace() throws Exception {
FSNamesystem spyFsn = spy(fsn); FSNamesystem spyFsn = spy(fsn);
final FSNamesystem finalFsn = spyFsn; final FSNamesystem finalFsn = spyFsn;
DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG); DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG);
doAnswer(delayer).when(spyFsn).getGenerationStampV2(); BlockIdManager bid = spy(spyFsn.getBlockIdManager());
Whitebox.setInternalState(finalFsn, "blockIdManager", bid);
doAnswer(delayer).when(bid).getGenerationStampV2();
ExecutorService pool = Executors.newFixedThreadPool(2); ExecutorService pool = Executors.newFixedThreadPool(2);
@ -572,9 +575,7 @@ public Void call() throws Exception {
NNStorage.getImageFileName(0) + MD5FileUtils.MD5_SUFFIX); NNStorage.getImageFileName(0) + MD5FileUtils.MD5_SUFFIX);
} }
} finally { } finally {
if (fsn != null) { fsn.close();
fsn.close();
}
} }
} }