HDFS-4645. Move from randomly generated block ID to sequentially generated block ID. Contributed by Arpit Agarwal

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1500580 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-07-08 05:29:10 +00:00
parent e01cbdac25
commit 6770de7ec4
28 changed files with 1149 additions and 321 deletions

View File

@ -124,6 +124,9 @@ Trunk (Unreleased)
HDFS-4904. Remove JournalService. (Arpit Agarwal via cnauroth) HDFS-4904. Remove JournalService. (Arpit Agarwal via cnauroth)
HDFS-4645. Move from randomly generated block ID to sequentially generated
block ID. (Arpit Agarwal via szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -86,6 +86,10 @@ public class HdfsConstants {
// An invalid transaction ID that will never be seen in a real namesystem. // An invalid transaction ID that will never be seen in a real namesystem.
public static final long INVALID_TXID = -12345; public static final long INVALID_TXID = -12345;
// Number of generation stamps reserved for legacy blocks.
public static final long RESERVED_GENERATION_STAMPS_V1 =
1024L * 1024 * 1024 * 1024;
/** /**
* URI Scheme for hdfs://namenode/ URIs. * URI Scheme for hdfs://namenode/ URIs.
*/ */

View File

@ -102,9 +102,9 @@ public class LayoutVersion {
RESERVED_REL1_3_0(-44, -41, RESERVED_REL1_3_0(-44, -41,
"Reserved for release 1.3.0", true, ADD_INODE_ID, SNAPSHOT), "Reserved for release 1.3.0", true, ADD_INODE_ID, SNAPSHOT),
OPTIMIZE_SNAPSHOT_INODES(-45, -43, OPTIMIZE_SNAPSHOT_INODES(-45, -43,
"Reduce snapshot inode memory footprint", false); "Reduce snapshot inode memory footprint", false),
SEQUENTIAL_BLOCK_ID(-46, "Allocate block IDs sequentially and store " +
"block IDs in the edits log and image files");
final int lv; final int lv;
final int ancestorLV; final int ancestorLV;

View File

@ -1726,7 +1726,7 @@ public class BlockManager {
ReplicaState reportedState = itBR.getCurrentReplicaState(); ReplicaState reportedState = itBR.getCurrentReplicaState();
if (shouldPostponeBlocksFromFuture && if (shouldPostponeBlocksFromFuture &&
namesystem.isGenStampInFuture(iblk.getGenerationStamp())) { namesystem.isGenStampInFuture(iblk)) {
queueReportedBlock(node, iblk, reportedState, queueReportedBlock(node, iblk, reportedState,
QUEUE_REASON_FUTURE_GENSTAMP); QUEUE_REASON_FUTURE_GENSTAMP);
continue; continue;
@ -1848,7 +1848,7 @@ public class BlockManager {
} }
if (shouldPostponeBlocksFromFuture && if (shouldPostponeBlocksFromFuture &&
namesystem.isGenStampInFuture(block.getGenerationStamp())) { namesystem.isGenStampInFuture(block)) {
queueReportedBlock(dn, block, reportedState, queueReportedBlock(dn, block, reportedState,
QUEUE_REASON_FUTURE_GENSTAMP); QUEUE_REASON_FUTURE_GENSTAMP);
return null; return null;

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* This exception is thrown when the name node runs out of V1 generation
* stamps.
*
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class OutOfV1GenerationStampsException extends IOException {
private static final long serialVersionUID = 1L;
public OutOfV1GenerationStampsException() {
super("Out of V1 (legacy) generation stamps\n");
}
}

View File

@ -61,7 +61,6 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp;
@ -70,6 +69,9 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream; import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@ -799,12 +801,30 @@ public class FSEditLog implements LogsPurgeable {
logEdit(op); logEdit(op);
} }
/** /**
* Add legacy block generation stamp record to edit log
*/
void logGenerationStampV1(long genstamp) {
SetGenstampV1Op op = SetGenstampV1Op.getInstance(cache.get())
.setGenerationStamp(genstamp);
logEdit(op);
}
/**
* Add generation stamp record to edit log * Add generation stamp record to edit log
*/ */
void logGenerationStamp(long genstamp) { void logGenerationStampV2(long genstamp) {
SetGenstampOp op = SetGenstampOp.getInstance(cache.get()) SetGenstampV2Op op = SetGenstampV2Op.getInstance(cache.get())
.setGenerationStamp(genstamp); .setGenerationStamp(genstamp);
logEdit(op);
}
/**
* Record a newly allocated block ID in the edit log
*/
void logAllocateBlockId(long blockId) {
AllocateBlockIdOp op = AllocateBlockIdOp.getInstance(cache.get())
.setBlockId(blockId);
logEdit(op); logEdit(op);
} }

View File

@ -55,7 +55,6 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetNSQuotaOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetNSQuotaOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp;
@ -65,6 +64,9 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.util.Holder; import org.apache.hadoop.hdfs.util.Holder;
@ -404,9 +406,9 @@ public class FSEditLogLoader {
mkdirOp.timestamp); mkdirOp.timestamp);
break; break;
} }
case OP_SET_GENSTAMP: { case OP_SET_GENSTAMP_V1: {
SetGenstampOp setGenstampOp = (SetGenstampOp)op; SetGenstampV1Op setGenstampV1Op = (SetGenstampV1Op)op;
fsNamesys.setGenerationStamp(setGenstampOp.genStamp); fsNamesys.setGenerationStampV1(setGenstampV1Op.genStampV1);
break; break;
} }
case OP_SET_PERMISSIONS: { case OP_SET_PERMISSIONS: {
@ -552,6 +554,16 @@ public class FSEditLogLoader {
disallowSnapshotOp.snapshotRoot); disallowSnapshotOp.snapshotRoot);
break; break;
} }
case OP_SET_GENSTAMP_V2: {
SetGenstampV2Op setGenstampV2Op = (SetGenstampV2Op) op;
fsNamesys.setGenerationStampV2(setGenstampV2Op.genStampV2);
break;
}
case OP_ALLOCATE_BLOCK_ID: {
AllocateBlockIdOp allocateBlockIdOp = (AllocateBlockIdOp) op;
fsNamesys.setLastAllocatedBlockId(allocateBlockIdOp.blockId);
break;
}
default: default:
throw new IOException("Invalid operation read " + op.opCode); throw new IOException("Invalid operation read " + op.opCode);
} }

View File

@ -90,7 +90,7 @@ public abstract class FSEditLogOp {
inst.put(OP_RENAME_OLD, new RenameOldOp()); inst.put(OP_RENAME_OLD, new RenameOldOp());
inst.put(OP_DELETE, new DeleteOp()); inst.put(OP_DELETE, new DeleteOp());
inst.put(OP_MKDIR, new MkdirOp()); inst.put(OP_MKDIR, new MkdirOp());
inst.put(OP_SET_GENSTAMP, new SetGenstampOp()); inst.put(OP_SET_GENSTAMP_V1, new SetGenstampV1Op());
inst.put(OP_SET_PERMISSIONS, new SetPermissionsOp()); inst.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
inst.put(OP_SET_OWNER, new SetOwnerOp()); inst.put(OP_SET_OWNER, new SetOwnerOp());
inst.put(OP_SET_NS_QUOTA, new SetNSQuotaOp()); inst.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
@ -116,6 +116,8 @@ public abstract class FSEditLogOp {
inst.put(OP_CREATE_SNAPSHOT, new CreateSnapshotOp()); inst.put(OP_CREATE_SNAPSHOT, new CreateSnapshotOp());
inst.put(OP_DELETE_SNAPSHOT, new DeleteSnapshotOp()); inst.put(OP_DELETE_SNAPSHOT, new DeleteSnapshotOp());
inst.put(OP_RENAME_SNAPSHOT, new RenameSnapshotOp()); inst.put(OP_RENAME_SNAPSHOT, new RenameSnapshotOp());
inst.put(OP_SET_GENSTAMP_V2, new SetGenstampV2Op());
inst.put(OP_ALLOCATE_BLOCK_ID, new AllocateBlockIdOp());
} }
public FSEditLogOp get(FSEditLogOpCodes opcode) { public FSEditLogOp get(FSEditLogOpCodes opcode) {
@ -1054,39 +1056,39 @@ public abstract class FSEditLogOp {
} }
} }
static class SetGenstampOp extends FSEditLogOp { static class SetGenstampV1Op extends FSEditLogOp {
long genStamp; long genStampV1;
private SetGenstampOp() { private SetGenstampV1Op() {
super(OP_SET_GENSTAMP); super(OP_SET_GENSTAMP_V1);
} }
static SetGenstampOp getInstance(OpInstanceCache cache) { static SetGenstampV1Op getInstance(OpInstanceCache cache) {
return (SetGenstampOp)cache.get(OP_SET_GENSTAMP); return (SetGenstampV1Op)cache.get(OP_SET_GENSTAMP_V1);
} }
SetGenstampOp setGenerationStamp(long genStamp) { SetGenstampV1Op setGenerationStamp(long genStamp) {
this.genStamp = genStamp; this.genStampV1 = genStamp;
return this; return this;
} }
@Override @Override
public public
void writeFields(DataOutputStream out) throws IOException { void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeLong(genStamp, out); FSImageSerialization.writeLong(genStampV1, out);
} }
@Override @Override
void readFields(DataInputStream in, int logVersion) void readFields(DataInputStream in, int logVersion)
throws IOException { throws IOException {
this.genStamp = FSImageSerialization.readLong(in); this.genStampV1 = FSImageSerialization.readLong(in);
} }
@Override @Override
public String toString() { public String toString() {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
builder.append("SetGenstampOp [genStamp="); builder.append("SetGenstampOp [GenStamp=");
builder.append(genStamp); builder.append(genStampV1);
builder.append(", opCode="); builder.append(", opCode=");
builder.append(opCode); builder.append(opCode);
builder.append(", txid="); builder.append(", txid=");
@ -1094,15 +1096,119 @@ public abstract class FSEditLogOp {
builder.append("]"); builder.append("]");
return builder.toString(); return builder.toString();
} }
@Override @Override
protected void toXml(ContentHandler contentHandler) throws SAXException { protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "GENSTAMP", XMLUtils.addSaxString(contentHandler, "GENSTAMP",
Long.valueOf(genStamp).toString()); Long.valueOf(genStampV1).toString());
} }
@Override void fromXml(Stanza st) throws InvalidXmlException { @Override void fromXml(Stanza st) throws InvalidXmlException {
this.genStamp = Long.valueOf(st.getValue("GENSTAMP")); this.genStampV1 = Long.valueOf(st.getValue("GENSTAMP"));
}
}
static class SetGenstampV2Op extends FSEditLogOp {
long genStampV2;
private SetGenstampV2Op() {
super(OP_SET_GENSTAMP_V2);
}
static SetGenstampV2Op getInstance(OpInstanceCache cache) {
return (SetGenstampV2Op)cache.get(OP_SET_GENSTAMP_V2);
}
SetGenstampV2Op setGenerationStamp(long genStamp) {
this.genStampV2 = genStamp;
return this;
}
@Override
public
void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeLong(genStampV2, out);
}
@Override
void readFields(DataInputStream in, int logVersion)
throws IOException {
this.genStampV2 = FSImageSerialization.readLong(in);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("SetGenstampV2Op [GenStampV2=");
builder.append(genStampV2);
builder.append(", opCode=");
builder.append(opCode);
builder.append(", txid=");
builder.append(txid);
builder.append("]");
return builder.toString();
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "GENSTAMPV2",
Long.valueOf(genStampV2).toString());
}
@Override void fromXml(Stanza st) throws InvalidXmlException {
this.genStampV2 = Long.valueOf(st.getValue("GENSTAMPV2"));
}
}
static class AllocateBlockIdOp extends FSEditLogOp {
long blockId;
private AllocateBlockIdOp() {
super(OP_ALLOCATE_BLOCK_ID);
}
static AllocateBlockIdOp getInstance(OpInstanceCache cache) {
return (AllocateBlockIdOp)cache.get(OP_ALLOCATE_BLOCK_ID);
}
AllocateBlockIdOp setBlockId(long blockId) {
this.blockId = blockId;
return this;
}
@Override
public
void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeLong(blockId, out);
}
@Override
void readFields(DataInputStream in, int logVersion)
throws IOException {
this.blockId = FSImageSerialization.readLong(in);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("AllocateBlockIdOp [blockId=");
builder.append(blockId);
builder.append(", opCode=");
builder.append(opCode);
builder.append(", txid=");
builder.append(txid);
builder.append("]");
return builder.toString();
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "BLOCK_ID",
Long.valueOf(blockId).toString());
}
@Override void fromXml(Stanza st) throws InvalidXmlException {
this.blockId = Long.valueOf(st.getValue("BLOCK_ID"));
} }
} }

View File

@ -41,7 +41,7 @@ public enum FSEditLogOpCodes {
OP_SET_PERMISSIONS ((byte) 7), OP_SET_PERMISSIONS ((byte) 7),
OP_SET_OWNER ((byte) 8), OP_SET_OWNER ((byte) 8),
OP_CLOSE ((byte) 9), OP_CLOSE ((byte) 9),
OP_SET_GENSTAMP ((byte) 10), OP_SET_GENSTAMP_V1 ((byte) 10),
OP_SET_NS_QUOTA ((byte) 11), // obsolete OP_SET_NS_QUOTA ((byte) 11), // obsolete
OP_CLEAR_NS_QUOTA ((byte) 12), // obsolete OP_CLEAR_NS_QUOTA ((byte) 12), // obsolete
OP_TIMES ((byte) 13), // set atime, mtime OP_TIMES ((byte) 13), // set atime, mtime
@ -61,8 +61,9 @@ public enum FSEditLogOpCodes {
OP_DELETE_SNAPSHOT ((byte) 27), OP_DELETE_SNAPSHOT ((byte) 27),
OP_RENAME_SNAPSHOT ((byte) 28), OP_RENAME_SNAPSHOT ((byte) 28),
OP_ALLOW_SNAPSHOT ((byte) 29), OP_ALLOW_SNAPSHOT ((byte) 29),
OP_DISALLOW_SNAPSHOT ((byte) 30); OP_DISALLOW_SNAPSHOT ((byte) 30),
OP_SET_GENSTAMP_V2 ((byte) 31),
OP_ALLOCATE_BLOCK_ID ((byte) 32);
private byte opCode; private byte opCode;
/** /**

View File

@ -92,7 +92,6 @@ public class FSImage implements Closeable {
final private Configuration conf; final private Configuration conf;
protected NNStorageRetentionManager archivalManager; protected NNStorageRetentionManager archivalManager;
protected IdGenerator blockIdGenerator;
/** /**
* Construct an FSImage * Construct an FSImage
@ -138,9 +137,6 @@ public class FSImage implements Closeable {
Preconditions.checkState(fileCount == 1, Preconditions.checkState(fileCount == 1,
"FSImage.format should be called with an uninitialized namesystem, has " + "FSImage.format should be called with an uninitialized namesystem, has " +
fileCount + " files"); fileCount + " files");
// BlockIdGenerator is defined during formatting
// currently there is only one BlockIdGenerator
blockIdGenerator = createBlockIdGenerator(fsn);
NamespaceInfo ns = NNStorage.newNamespaceInfo(); NamespaceInfo ns = NNStorage.newNamespaceInfo();
ns.clusterID = clusterId; ns.clusterID = clusterId;
@ -811,9 +807,6 @@ public class FSImage implements Closeable {
FSImageFormat.Loader loader = new FSImageFormat.Loader( FSImageFormat.Loader loader = new FSImageFormat.Loader(
conf, target); conf, target);
loader.load(curFile); loader.load(curFile);
// BlockIdGenerator is determined after loading image
// currently there is only one BlockIdGenerator
blockIdGenerator = createBlockIdGenerator(target);
target.setBlockPoolId(this.getBlockPoolID()); target.setBlockPoolId(this.getBlockPoolID());
// Check that the image digest we loaded matches up with what // Check that the image digest we loaded matches up with what
@ -1246,12 +1239,4 @@ public class FSImage implements Closeable {
public synchronized long getMostRecentCheckpointTxId() { public synchronized long getMostRecentCheckpointTxId() {
return storage.getMostRecentCheckpointTxId(); return storage.getMostRecentCheckpointTxId();
} }
public long getUniqueBlockId() {
return blockIdGenerator.nextValue();
}
public IdGenerator createBlockIdGenerator(FSNamesystem fsn) {
return new RandomBlockIdGenerator(fsn);
}
} }

View File

@ -70,8 +70,10 @@ import org.apache.hadoop.io.Text;
* <pre> * <pre>
* FSImage { * FSImage {
* layoutVersion: int, namespaceID: int, numberItemsInFSDirectoryTree: long, * layoutVersion: int, namespaceID: int, numberItemsInFSDirectoryTree: long,
* namesystemGenerationStamp: long, transactionID: long, * namesystemGenerationStampV1: long, namesystemGenerationStampV2: long,
* snapshotCounter: int, numberOfSnapshots: int, numOfSnapshottableDirs: int, * generationStampAtBlockIdSwitch:long, lastAllocatedBlockId:
* long transactionID: long, snapshotCounter: int, numberOfSnapshots: int,
* numOfSnapshottableDirs: int,
* {FSDirectoryTree, FilesUnderConstruction, SecretManagerState} (can be compressed) * {FSDirectoryTree, FilesUnderConstruction, SecretManagerState} (can be compressed)
* } * }
* *
@ -257,10 +259,30 @@ public class FSImageFormat {
long numFiles = in.readLong(); long numFiles = in.readLong();
// read in the last generation stamp. // read in the last generation stamp for legacy blocks.
long genstamp = in.readLong(); long genstamp = in.readLong();
namesystem.setGenerationStamp(genstamp); namesystem.setGenerationStampV1(genstamp);
if (LayoutVersion.supports(Feature.SEQUENTIAL_BLOCK_ID, imgVersion)) {
// read the starting generation stamp for sequential block IDs
genstamp = in.readLong();
namesystem.setGenerationStampV2(genstamp);
// read the last generation stamp for blocks created after
// the switch to sequential block IDs.
long stampAtIdSwitch = in.readLong();
namesystem.setGenerationStampV1Limit(stampAtIdSwitch);
// read the max sequential block ID.
long maxSequentialBlockId = in.readLong();
namesystem.setLastAllocatedBlockId(maxSequentialBlockId);
} else {
long startingGenStamp = namesystem.upgradeGenerationStampToV2();
// This is an upgrade.
LOG.info("Upgrading to sequential block IDs. Generation stamp " +
"for new blocks set to " + startingGenStamp);
}
// read the transaction ID of the last edit represented by // read the transaction ID of the last edit represented by
// this image // this image
if (LayoutVersion.supports(Feature.STORED_TXIDS, imgVersion)) { if (LayoutVersion.supports(Feature.STORED_TXIDS, imgVersion)) {
@ -884,9 +906,13 @@ public class FSImageFormat {
out.writeInt(sourceNamesystem.unprotectedGetNamespaceInfo() out.writeInt(sourceNamesystem.unprotectedGetNamespaceInfo()
.getNamespaceID()); .getNamespaceID());
out.writeLong(fsDir.rootDir.numItemsInTree()); out.writeLong(fsDir.rootDir.numItemsInTree());
out.writeLong(sourceNamesystem.getGenerationStamp()); out.writeLong(sourceNamesystem.getGenerationStampV1());
out.writeLong(sourceNamesystem.getGenerationStampV2());
out.writeLong(sourceNamesystem.getGenerationStampAtblockIdSwitch());
out.writeLong(sourceNamesystem.getLastAllocatedBlockId());
out.writeLong(context.getTxId()); out.writeLong(context.getTxId());
out.writeLong(sourceNamesystem.getLastInodeId()); out.writeLong(sourceNamesystem.getLastInodeId());
sourceNamesystem.getSnapshotManager().write(out); sourceNamesystem.getSnapshotManager().write(out);

View File

@ -156,12 +156,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.*;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
@ -379,9 +374,32 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private final long maxBlocksPerFile; // maximum # of blocks per file private final long maxBlocksPerFile; // maximum # of blocks per file
/** /**
* The global generation stamp for this file system. * The global generation stamp for legacy blocks with randomly
* generated block IDs.
*/ */
private final GenerationStamp generationStamp = new GenerationStamp(); private final GenerationStamp generationStampV1 = new GenerationStamp();
/**
* The global generation stamp for this file system.
*/
private final GenerationStamp generationStampV2 = new GenerationStamp();
/**
* The value of the generation stamp when the first switch to sequential
* block IDs was made. Blocks with generation stamps below this value
* have randomly allocated block IDs. Blocks with generation stamps above
* this value had sequentially allocated block IDs. Read from the fsImage
* (or initialized as an offset from the V1 (legacy) generation stamp on
* upgrade).
*/
private long generationStampV1Limit =
GenerationStamp.GRANDFATHER_GENERATION_STAMP;
/**
* The global block ID space for this file system.
*/
@VisibleForTesting
private final SequentialBlockIdGenerator blockIdGenerator;
// precision of access times. // precision of access times.
private final long accessTimePrecision; private final long accessTimePrecision;
@ -441,7 +459,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
void clear() { void clear() {
dir.reset(); dir.reset();
dtSecretManager.reset(); dtSecretManager.reset();
generationStamp.setCurrentValue(GenerationStamp.LAST_RESERVED_STAMP); generationStampV1.setCurrentValue(GenerationStamp.LAST_RESERVED_STAMP);
generationStampV2.setCurrentValue(GenerationStamp.LAST_RESERVED_STAMP);
blockIdGenerator.setCurrentValue(
SequentialBlockIdGenerator.LAST_RESERVED_BLOCK_ID);
generationStampV1Limit = GenerationStamp.GRANDFATHER_GENERATION_STAMP;
leaseManager.removeAllLeases(); leaseManager.removeAllLeases();
inodeId.setCurrentValue(INodeId.LAST_RESERVED_ID); inodeId.setCurrentValue(INodeId.LAST_RESERVED_ID);
} }
@ -543,9 +565,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* *
* Note that this does not load any data off of disk -- if you would * Note that this does not load any data off of disk -- if you would
* like that behavior, use {@link #loadFromDisk(Configuration)} * like that behavior, use {@link #loadFromDisk(Configuration)}
*
* @param fnImage The FSImage to associate with
* @param conf configuration * @param conf configuration
* @param fsImage The FSImage to associate with
* @throws IOException on bad configuration * @throws IOException on bad configuration
*/ */
FSNamesystem(Configuration conf, FSImage fsImage) throws IOException { FSNamesystem(Configuration conf, FSImage fsImage) throws IOException {
@ -556,6 +578,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
this.blockManager = new BlockManager(this, this, conf); this.blockManager = new BlockManager(this, this, conf);
this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics(); this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
this.blockIdGenerator = new SequentialBlockIdGenerator(this.blockManager);
this.fsOwner = UserGroupInformation.getCurrentUser(); this.fsOwner = UserGroupInformation.getCurrentUser();
this.fsOwnerShortUserName = fsOwner.getShortUserName(); this.fsOwnerShortUserName = fsOwner.getShortUserName();
@ -2675,9 +2698,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/ */
Block createNewBlock() throws IOException { Block createNewBlock() throws IOException {
assert hasWriteLock(); assert hasWriteLock();
Block b = new Block(getFSImage().getUniqueBlockId(), 0, 0); Block b = new Block(nextBlockId(), 0, 0);
// Increment the generation stamp for every new block. // Increment the generation stamp for every new block.
b.setGenerationStamp(nextGenerationStamp()); b.setGenerationStamp(nextGenerationStamp(false));
return b; return b;
} }
@ -3373,7 +3396,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
uc.setExpectedLocations(blockManager.getNodes(lastBlock)); uc.setExpectedLocations(blockManager.getNodes(lastBlock));
} }
// start recovery of the last block for this file // start recovery of the last block for this file
long blockRecoveryId = nextGenerationStamp(); long blockRecoveryId = nextGenerationStamp(isLegacyBlock(uc));
lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile); lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
uc.initializeBlockRecovery(blockRecoveryId); uc.initializeBlockRecovery(blockRecoveryId);
leaseManager.renewLease(lease); leaseManager.renewLease(lease);
@ -5024,34 +5047,164 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
/** /**
* Sets the generation stamp for this filesystem * Sets the current generation stamp for legacy blocks
*/ */
void setGenerationStamp(long stamp) { void setGenerationStampV1(long stamp) {
generationStamp.setCurrentValue(stamp); generationStampV1.setCurrentValue(stamp);
} }
/** /**
* Gets the generation stamp for this filesystem * Gets the current generation stamp for legacy blocks
*/ */
long getGenerationStamp() { long getGenerationStampV1() {
return generationStamp.getCurrentValue(); return generationStampV1.getCurrentValue();
}
/**
* Gets the current generation stamp for this filesystem
*/
void setGenerationStampV2(long stamp) {
generationStampV2.setCurrentValue(stamp);
}
/**
* Gets the current generation stamp for this filesystem
*/
long getGenerationStampV2() {
return generationStampV2.getCurrentValue();
}
/**
* Upgrades the generation stamp for the filesystem
* by reserving a sufficient range for all existing blocks.
* Should be invoked only during the first upgrade to
* sequential block IDs.
*/
long upgradeGenerationStampToV2() {
Preconditions.checkState(generationStampV2.getCurrentValue() ==
GenerationStamp.LAST_RESERVED_STAMP);
generationStampV2.skipTo(
generationStampV1.getCurrentValue() +
HdfsConstants.RESERVED_GENERATION_STAMPS_V1);
generationStampV1Limit = generationStampV2.getCurrentValue();
return generationStampV2.getCurrentValue();
}
/**
* Sets the generation stamp that delineates random and sequentially
* allocated block IDs.
* @param stamp
*/
void setGenerationStampV1Limit(long stamp) {
Preconditions.checkState(generationStampV1Limit ==
GenerationStamp.GRANDFATHER_GENERATION_STAMP);
generationStampV1Limit = stamp;
}
/**
* Gets the value of the generation stamp that delineates sequential
* and random block IDs.
*/
long getGenerationStampAtblockIdSwitch() {
return generationStampV1Limit;
}
@VisibleForTesting
SequentialBlockIdGenerator getBlockIdGenerator() {
return blockIdGenerator;
}
/**
* Sets the maximum allocated block ID for this filesystem. This is
* the basis for allocating new block IDs.
*/
void setLastAllocatedBlockId(long blockId) {
blockIdGenerator.skipTo(blockId);
}
/**
* Gets the maximum sequentially allocated block ID for this filesystem
*/
long getLastAllocatedBlockId() {
return blockIdGenerator.getCurrentValue();
} }
/** /**
* Increments, logs and then returns the stamp * Increments, logs and then returns the stamp
*/ */
private long nextGenerationStamp() throws SafeModeException { long nextGenerationStamp(boolean legacyBlock)
throws IOException, SafeModeException {
assert hasWriteLock(); assert hasWriteLock();
if (isInSafeMode()) { if (isInSafeMode()) {
throw new SafeModeException( throw new SafeModeException(
"Cannot get next generation stamp", safeMode); "Cannot get next generation stamp", safeMode);
} }
final long gs = generationStamp.nextValue();
getEditLog().logGenerationStamp(gs); long gs;
if (legacyBlock) {
gs = getNextGenerationStampV1();
getEditLog().logGenerationStampV1(gs);
} else {
gs = getNextGenerationStampV2();
getEditLog().logGenerationStampV2(gs);
}
// NB: callers sync the log // NB: callers sync the log
return gs; return gs;
} }
@VisibleForTesting
long getNextGenerationStampV1() throws IOException {
long genStampV1 = generationStampV1.nextValue();
if (genStampV1 >= generationStampV1Limit) {
// We ran out of generation stamps for legacy blocks. In practice, it
// is extremely unlikely as we reserved 1T v1 generation stamps. The
// result is that we can no longer append to the legacy blocks that
// were created before the upgrade to sequential block IDs.
throw new OutOfV1GenerationStampsException();
}
return genStampV1;
}
@VisibleForTesting
long getNextGenerationStampV2() {
return generationStampV2.nextValue();
}
long getGenerationStampV1Limit() {
return generationStampV1Limit;
}
/**
* Determine whether the block ID was randomly generated (legacy) or
* sequentially generated. The generation stamp value is used to
* make the distinction.
* @param block
* @return true if the block ID was randomly generated, false otherwise.
*/
boolean isLegacyBlock(Block block) {
return block.getGenerationStamp() < getGenerationStampV1Limit();
}
/**
* Increments, logs and then returns the block ID
*/
private long nextBlockId() throws SafeModeException {
assert hasWriteLock();
if (isInSafeMode()) {
throw new SafeModeException(
"Cannot get next block ID", safeMode);
}
final long blockId = blockIdGenerator.nextValue();
getEditLog().logAllocateBlockId(blockId);
// NB: callers sync the log
return blockId;
}
private INodeFileUnderConstruction checkUCBlock(ExtendedBlock block, private INodeFileUnderConstruction checkUCBlock(ExtendedBlock block,
String clientName) throws IOException { String clientName) throws IOException {
assert hasWriteLock(); assert hasWriteLock();
@ -5132,7 +5285,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
checkUCBlock(block, clientName); checkUCBlock(block, clientName);
// get a new generation stamp and an access token // get a new generation stamp and an access token
block.setGenerationStamp(nextGenerationStamp()); block.setGenerationStamp(
nextGenerationStamp(isLegacyBlock(block.getLocalBlock())));
locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]); locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
blockManager.setBlockToken(locatedBlock, AccessMode.WRITE); blockManager.setBlockToken(locatedBlock, AccessMode.WRITE);
} finally { } finally {
@ -5147,7 +5301,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* Update a pipeline for a block under construction * Update a pipeline for a block under construction
* *
* @param clientName the name of the client * @param clientName the name of the client
* @param oldblock and old block * @param oldBlock and old block
* @param newBlock a new block with a new generation stamp and length * @param newBlock a new block with a new generation stamp and length
* @param newNodes datanodes in the pipeline * @param newNodes datanodes in the pipeline
* @throws IOException if any error occurs * @throws IOException if any error occurs
@ -5934,9 +6088,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
@Override @Override
public boolean isGenStampInFuture(long genStamp) { public boolean isGenStampInFuture(Block block) {
return (genStamp > getGenerationStamp()); if (isLegacyBlock(block)) {
return block.getGenerationStamp() > getGenerationStampV1();
} else {
return block.getGenerationStamp() > getGenerationStampV2();
}
} }
@VisibleForTesting @VisibleForTesting
public EditLogTailer getEditLogTailer() { public EditLogTailer getEditLogTailer() {
return editLogTailer; return editLogTailer;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.util.RwLock; import org.apache.hadoop.hdfs.util.RwLock;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
@ -37,7 +38,7 @@ public interface Namesystem extends RwLock, SafeMode {
public boolean isInStandbyState(); public boolean isInStandbyState();
public boolean isGenStampInFuture(long generationStamp); public boolean isGenStampInFuture(Block block);
public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal); public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal);

View File

@ -18,27 +18,41 @@
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.util.IdGenerator; import org.apache.hadoop.util.SequentialNumber;
/** /**
* Generator of random block IDs. * Generate the next valid block ID by incrementing the maximum block
* ID allocated so far, starting at 2^30+1.
*
* Block IDs used to be allocated randomly in the past. Hence we may
* find some conflicts while stepping through the ID space sequentially.
* However given the sparsity of the ID space, conflicts should be rare
* and can be skipped over when detected.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class RandomBlockIdGenerator implements IdGenerator { public class SequentialBlockIdGenerator extends SequentialNumber {
/**
* The last reserved block ID.
*/
public static final long LAST_RESERVED_BLOCK_ID = 1024L * 1024 * 1024;
private final BlockManager blockManager; private final BlockManager blockManager;
RandomBlockIdGenerator(FSNamesystem namesystem) { SequentialBlockIdGenerator(BlockManager blockManagerRef) {
this.blockManager = namesystem.getBlockManager(); super(LAST_RESERVED_BLOCK_ID);
this.blockManager = blockManagerRef;
} }
@Override // NumberGenerator @Override // NumberGenerator
public long nextValue() { public long nextValue() {
Block b = new Block(DFSUtil.getRandom().nextLong(), 0, 0); Block b = new Block(super.nextValue());
// There may be an occasional conflict with randomly generated
// block IDs. Skip over the conflicts.
while(isValidBlock(b)) { while(isValidBlock(b)) {
b.setBlockId(DFSUtil.getRandom().nextLong()); b.setBlockId(super.nextValue());
} }
return b.getBlockId(); return b.getBlockId();
} }

View File

@ -126,7 +126,7 @@ class ImageLoaderCurrent implements ImageLoader {
new SimpleDateFormat("yyyy-MM-dd HH:mm"); new SimpleDateFormat("yyyy-MM-dd HH:mm");
private static int[] versions = { -16, -17, -18, -19, -20, -21, -22, -23, private static int[] versions = { -16, -17, -18, -19, -20, -21, -22, -23,
-24, -25, -26, -27, -28, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39, -24, -25, -26, -27, -28, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39,
-40, -41, -42, -43, -44, -45}; -40, -41, -42, -43, -44, -45, -46 };
private int imageVersion = 0; private int imageVersion = 0;
private final Map<Long, String> subtreeMap = new HashMap<Long, String>(); private final Map<Long, String> subtreeMap = new HashMap<Long, String>();
@ -165,6 +165,12 @@ class ImageLoaderCurrent implements ImageLoader {
v.visit(ImageElement.GENERATION_STAMP, in.readLong()); v.visit(ImageElement.GENERATION_STAMP, in.readLong());
if (LayoutVersion.supports(Feature.SEQUENTIAL_BLOCK_ID, imageVersion)) {
v.visit(ImageElement.GENERATION_STAMP_V2, in.readLong());
v.visit(ImageElement.GENERATION_STAMP_V1_LIMIT, in.readLong());
v.visit(ImageElement.LAST_ALLOCATED_BLOCK_ID, in.readLong());
}
if (LayoutVersion.supports(Feature.STORED_TXIDS, imageVersion)) { if (LayoutVersion.supports(Feature.STORED_TXIDS, imageVersion)) {
v.visit(ImageElement.TRANSACTION_ID, in.readLong()); v.visit(ImageElement.TRANSACTION_ID, in.readLong());
} }

View File

@ -38,6 +38,9 @@ abstract class ImageVisitor {
LAYOUT_VERSION, LAYOUT_VERSION,
NUM_INODES, NUM_INODES,
GENERATION_STAMP, GENERATION_STAMP,
GENERATION_STAMP_V2,
GENERATION_STAMP_V1_LIMIT,
LAST_ALLOCATED_BLOCK_ID,
INODES, INODES,
INODE, INODE,
INODE_PATH, INODE_PATH,

View File

@ -567,7 +567,7 @@ public class DFSTestUtil {
} }
public static ExtendedBlock getFirstBlock(FileSystem fs, Path path) throws IOException { public static ExtendedBlock getFirstBlock(FileSystem fs, Path path) throws IOException {
HdfsDataInputStream in = (HdfsDataInputStream)((DistributedFileSystem)fs).open(path); HdfsDataInputStream in = (HdfsDataInputStream) fs.open(path);
in.readByte(); in.readByte();
return in.getCurrentBlock(); return in.getCurrentBlock();
} }
@ -577,6 +577,12 @@ public class DFSTestUtil {
return ((HdfsDataInputStream) in).getAllBlocks(); return ((HdfsDataInputStream) in).getAllBlocks();
} }
public static List<LocatedBlock> getAllBlocks(FileSystem fs, Path path)
throws IOException {
HdfsDataInputStream in = (HdfsDataInputStream) fs.open(path);
return in.getAllBlocks();
}
public static Token<BlockTokenIdentifier> getBlockToken( public static Token<BlockTokenIdentifier> getBlockToken(
FSDataOutputStream out) { FSDataOutputStream out) {
return ((DFSOutputStream) out.getWrappedStream()).getBlockToken(); return ((DFSOutputStream) out.getWrappedStream()).getBlockToken();

View File

@ -53,7 +53,9 @@ import com.google.common.base.Joiner;
*/ */
public class TestDFSUpgrade { public class TestDFSUpgrade {
private static final int EXPECTED_TXID = 45; // TODO: Avoid hard-coding expected_txid. The test should be more robust.
private static final int EXPECTED_TXID = 61;
private static final Log LOG = LogFactory.getLog(TestDFSUpgrade.class.getName()); private static final Log LOG = LogFactory.getLog(TestDFSUpgrade.class.getName());
private Configuration conf; private Configuration conf;
private int testCounter = 0; private int testCounter = 0;

View File

@ -202,6 +202,7 @@ public class TestDataTransferProtocol {
@Test @Test
public void testOpWrite() throws IOException { public void testOpWrite() throws IOException {
int numDataNodes = 1; int numDataNodes = 1;
final long BLOCK_ID_FUDGE = 128;
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
try { try {
@ -252,8 +253,9 @@ public class TestDataTransferProtocol {
"Recover failed close to a finalized replica", false); "Recover failed close to a finalized replica", false);
firstBlock.setGenerationStamp(newGS); firstBlock.setGenerationStamp(newGS);
/* Test writing to a new block */ // Test writing to a new block. Don't choose the next sequential
long newBlockId = firstBlock.getBlockId() + 1; // block ID to avoid conflicting with IDs chosen by the NN.
long newBlockId = firstBlock.getBlockId() + BLOCK_ID_FUDGE;
ExtendedBlock newBlock = new ExtendedBlock(firstBlock.getBlockPoolId(), ExtendedBlock newBlock = new ExtendedBlock(firstBlock.getBlockPoolId(),
newBlockId, 0, firstBlock.getGenerationStamp()); newBlockId, 0, firstBlock.getGenerationStamp());
@ -284,7 +286,7 @@ public class TestDataTransferProtocol {
Path file1 = new Path("dataprotocol1.dat"); Path file1 = new Path("dataprotocol1.dat");
DFSTestUtil.createFile(fileSys, file1, 1L, (short)numDataNodes, 0L); DFSTestUtil.createFile(fileSys, file1, 1L, (short)numDataNodes, 0L);
DFSOutputStream out = (DFSOutputStream)(fileSys.append(file1). DFSOutputStream out = (DFSOutputStream)(fileSys.append(file1).
getWrappedStream()); getWrappedStream());
out.write(1); out.write(1);
out.hflush(); out.hflush();
FSDataInputStream in = fileSys.open(file1); FSDataInputStream in = fileSys.open(file1);

View File

@ -64,9 +64,6 @@ public class OfflineEditsViewerHelper {
/** /**
* Generates edits with all op codes and returns the edits filename * Generates edits with all op codes and returns the edits filename
*
* @param dfsDir DFS directory (where to setup MiniDFS cluster)
* @param editsFilename where to copy the edits
*/ */
public String generateEdits() throws IOException { public String generateEdits() throws IOException {
CheckpointSignature signature = runOperations(); CheckpointSignature signature = runOperations();
@ -142,7 +139,7 @@ public class OfflineEditsViewerHelper {
DistributedFileSystem dfs = DistributedFileSystem dfs =
(DistributedFileSystem)cluster.getFileSystem(); (DistributedFileSystem)cluster.getFileSystem();
FileContext fc = FileContext.getFileContext(cluster.getURI(0), config); FileContext fc = FileContext.getFileContext(cluster.getURI(0), config);
// OP_ADD 0, OP_SET_GENSTAMP 10 // OP_ADD 0
Path pathFileCreate = new Path("/file_create_u\1F431"); Path pathFileCreate = new Path("/file_create_u\1F431");
FSDataOutputStream s = dfs.create(pathFileCreate); FSDataOutputStream s = dfs.create(pathFileCreate);
// OP_CLOSE 9 // OP_CLOSE 9

View File

@ -1169,7 +1169,8 @@ public class TestCheckpoint {
throw new IOException(e); throw new IOException(e);
} }
final int EXPECTED_TXNS_FIRST_SEG = 11; // TODO: Fix the test to not require a hard-coded transaction count.
final int EXPECTED_TXNS_FIRST_SEG = 13;
// the following steps should have happened: // the following steps should have happened:
// edits_inprogress_1 -> edits_1-12 (finalized) // edits_inprogress_1 -> edits_1-12 (finalized)

View File

@ -1083,7 +1083,7 @@ public class TestEditLog {
editlog.initJournalsForWrite(); editlog.initJournalsForWrite();
editlog.openForWrite(); editlog.openForWrite();
for (int i = 2; i < TXNS_PER_ROLL; i++) { for (int i = 2; i < TXNS_PER_ROLL; i++) {
editlog.logGenerationStamp((long)0); editlog.logGenerationStampV2((long) 0);
} }
editlog.logSync(); editlog.logSync();
@ -1095,7 +1095,7 @@ public class TestEditLog {
for (int i = 0; i < numrolls; i++) { for (int i = 0; i < numrolls; i++) {
editlog.rollEditLog(); editlog.rollEditLog();
editlog.logGenerationStamp((long)i); editlog.logGenerationStampV2((long) i);
editlog.logSync(); editlog.logSync();
while (aborts.size() > 0 while (aborts.size() > 0
@ -1105,7 +1105,7 @@ public class TestEditLog {
} }
for (int j = 3; j < TXNS_PER_ROLL; j++) { for (int j = 3; j < TXNS_PER_ROLL; j++) {
editlog.logGenerationStamp((long)i); editlog.logGenerationStampV2((long) i);
} }
editlog.logSync(); editlog.logSync();
} }

View File

@ -17,7 +17,9 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
@ -51,10 +53,10 @@ public class TestEditLogFileInputStream {
// Read the edit log and verify that we got all of the data. // Read the edit log and verify that we got all of the data.
EnumMap<FSEditLogOpCodes, Holder<Integer>> counts = EnumMap<FSEditLogOpCodes, Holder<Integer>> counts =
FSImageTestUtil.countEditLogOpTypes(elis); FSImageTestUtil.countEditLogOpTypes(elis);
assertEquals(1L, (long)counts.get(FSEditLogOpCodes.OP_ADD).held); assertThat(counts.get(FSEditLogOpCodes.OP_ADD).held, is(1));
assertEquals(1L, (long)counts.get(FSEditLogOpCodes.OP_SET_GENSTAMP).held); assertThat(counts.get(FSEditLogOpCodes.OP_SET_GENSTAMP_V1).held, is(1));
assertEquals(1L, (long)counts.get(FSEditLogOpCodes.OP_CLOSE).held); assertThat(counts.get(FSEditLogOpCodes.OP_CLOSE).held, is(1));
// Check that length header was picked up. // Check that length header was picked up.
assertEquals(FAKE_LOG_DATA.length, elis.length()); assertEquals(FAKE_LOG_DATA.length, elis.length());
elis.close(); elis.close();

View File

@ -513,7 +513,7 @@ public class TestSaveNamespace {
FSNamesystem spyFsn = spy(fsn); FSNamesystem spyFsn = spy(fsn);
final FSNamesystem finalFsn = spyFsn; final FSNamesystem finalFsn = spyFsn;
DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG); DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG);
doAnswer(delayer).when(spyFsn).getGenerationStamp(); doAnswer(delayer).when(spyFsn).getGenerationStampV2();
ExecutorService pool = Executors.newFixedThreadPool(2); ExecutorService pool = Executors.newFixedThreadPool(2);

View File

@ -0,0 +1,209 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.util.DataChecksum;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
* Tests the sequential block ID generation mechanism and block ID
* collision handling.
*/
public class TestSequentialBlockId {
private static final Log LOG = LogFactory.getLog("TestSequentialBlockId");
private static final DataChecksum DEFAULT_CHECKSUM =
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512);
final int BLOCK_SIZE = 1024;
final int IO_SIZE = BLOCK_SIZE;
final short REPLICATION = 1;
final long SEED = 0;
DatanodeID datanode;
InetSocketAddress dnAddr;
/**
* Test that block IDs are generated sequentially.
*
* @throws IOException
*/
@Test
public void testBlockIdGeneration() throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
try {
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
// Create a file that is 10 blocks long.
Path path = new Path("testBlockIdGeneration.dat");
DFSTestUtil.createFile(
fs, path, IO_SIZE, BLOCK_SIZE * 10, BLOCK_SIZE, REPLICATION, SEED);
List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(fs, path);
LOG.info("Block0 id is " + blocks.get(0).getBlock().getBlockId());
long nextBlockExpectedId = blocks.get(0).getBlock().getBlockId() + 1;
// Ensure that the block IDs are sequentially increasing.
for (int i = 1; i < blocks.size(); ++i) {
long nextBlockId = blocks.get(i).getBlock().getBlockId();
LOG.info("Block" + i + " id is " + nextBlockId);
assertThat(nextBlockId, is(nextBlockExpectedId));
++nextBlockExpectedId;
}
} finally {
cluster.shutdown();
}
}
/**
* Test that collisions in the block ID space are handled gracefully.
*
* @throws IOException
*/
@Test
public void testTriggerBlockIdCollision() throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
try {
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
FSNamesystem fsn = cluster.getNamesystem();
final int blockCount = 10;
// Create a file with a few blocks to rev up the global block ID
// counter.
Path path1 = new Path("testBlockIdCollisionDetection_file1.dat");
DFSTestUtil.createFile(
fs, path1, IO_SIZE, BLOCK_SIZE * blockCount,
BLOCK_SIZE, REPLICATION, SEED);
List<LocatedBlock> blocks1 = DFSTestUtil.getAllBlocks(fs, path1);
// Rewind the block ID counter in the name system object. This will result
// in block ID collisions when we try to allocate new blocks.
SequentialBlockIdGenerator blockIdGenerator = fsn.getBlockIdGenerator();
blockIdGenerator.setCurrentValue(blockIdGenerator.getCurrentValue() - 5);
// Trigger collisions by creating a new file.
Path path2 = new Path("testBlockIdCollisionDetection_file2.dat");
DFSTestUtil.createFile(
fs, path2, IO_SIZE, BLOCK_SIZE * blockCount,
BLOCK_SIZE, REPLICATION, SEED);
List<LocatedBlock> blocks2 = DFSTestUtil.getAllBlocks(fs, path2);
assertThat(blocks2.size(), is(blockCount));
// Make sure that file2 block IDs start immediately after file1
assertThat(blocks2.get(0).getBlock().getBlockId(),
is(blocks1.get(9).getBlock().getBlockId() + 1));
} finally {
cluster.shutdown();
}
}
/**
* Test that the block type (legacy or not) can be correctly detected
* based on its generation stamp.
*
* @throws IOException
*/
@Test
public void testBlockTypeDetection() throws IOException {
// Setup a mock object and stub out a few routines to
// retrieve the generation stamp counters.
FSNamesystem fsn = mock(FSNamesystem.class);
final long maxGenStampForLegacyBlocks = 10000;
when(fsn.getGenerationStampV1Limit())
.thenReturn(maxGenStampForLegacyBlocks);
Block legacyBlock = spy(new Block());
when(legacyBlock.getGenerationStamp())
.thenReturn(maxGenStampForLegacyBlocks/2);
Block newBlock = spy(new Block());
when(newBlock.getGenerationStamp())
.thenReturn(maxGenStampForLegacyBlocks+1);
// Make sure that isLegacyBlock() can correctly detect
// legacy and new blocks.
when(fsn.isLegacyBlock(any(Block.class))).thenCallRealMethod();
assertThat(fsn.isLegacyBlock(legacyBlock), is(true));
assertThat(fsn.isLegacyBlock(newBlock), is(false));
}
/**
* Test that the generation stamp for legacy and new blocks is updated
* as expected.
*
* @throws IOException
*/
@Test
public void testGenerationStampUpdate() throws IOException {
// Setup a mock object and stub out a few routines to
// retrieve the generation stamp counters.
FSNamesystem fsn = mock(FSNamesystem.class);
FSEditLog editLog = mock(FSEditLog.class);
final long nextGenerationStampV1 = 5000;
final long nextGenerationStampV2 = 20000;
when(fsn.getNextGenerationStampV1())
.thenReturn(nextGenerationStampV1);
when(fsn.getNextGenerationStampV2())
.thenReturn(nextGenerationStampV2);
// Make sure that the generation stamp is set correctly for both
// kinds of blocks.
when(fsn.nextGenerationStamp(anyBoolean())).thenCallRealMethod();
when(fsn.hasWriteLock()).thenReturn(true);
when(fsn.getEditLog()).thenReturn(editLog);
assertThat(fsn.nextGenerationStamp(true), is(nextGenerationStampV1));
assertThat(fsn.nextGenerationStamp(false), is(nextGenerationStampV2));
}
}

View File

@ -44,7 +44,13 @@ public class TestOfflineEditsViewer {
private static final Map<FSEditLogOpCodes, Boolean> obsoleteOpCodes = private static final Map<FSEditLogOpCodes, Boolean> obsoleteOpCodes =
new HashMap<FSEditLogOpCodes, Boolean>(); new HashMap<FSEditLogOpCodes, Boolean>();
static { initializeObsoleteOpCodes(); } private static final Map<FSEditLogOpCodes, Boolean> missingOpCodes =
new HashMap<FSEditLogOpCodes, Boolean>();
static {
initializeObsoleteOpCodes();
initializeMissingOpCodes();
}
private static String buildDir = private static String buildDir =
System.getProperty("test.build.data", "build/test/data"); System.getProperty("test.build.data", "build/test/data");
@ -74,6 +80,16 @@ public class TestOfflineEditsViewer {
obsoleteOpCodes.put(FSEditLogOpCodes.OP_CLEAR_NS_QUOTA, true); obsoleteOpCodes.put(FSEditLogOpCodes.OP_CLEAR_NS_QUOTA, true);
} }
/**
* Initialize missingOpcodes
*
* Opcodes that are not available except after uprade from
* an older version. We don't test these here.
*/
private static void initializeMissingOpCodes() {
obsoleteOpCodes.put(FSEditLogOpCodes.OP_SET_GENSTAMP_V1, true);
}
@Before @Before
public void setup() { public void setup() {
new File(cacheDir).mkdirs(); new File(cacheDir).mkdirs();
@ -103,6 +119,8 @@ public class TestOfflineEditsViewer {
assertTrue( assertTrue(
"Edits " + edits + " should have all op codes", "Edits " + edits + " should have all op codes",
hasAllOpCodes(edits)); hasAllOpCodes(edits));
LOG.info("Comparing generated file " + editsReparsed +
" with reference file " + edits);
assertTrue( assertTrue(
"Generated edits and reparsed (bin to XML to bin) should be same", "Generated edits and reparsed (bin to XML to bin) should be same",
filesEqualIgnoreTrailingZeros(edits, editsReparsed)); filesEqualIgnoreTrailingZeros(edits, editsReparsed));
@ -222,9 +240,12 @@ public class TestOfflineEditsViewer {
// don't need to test obsolete opCodes // don't need to test obsolete opCodes
if(obsoleteOpCodes.containsKey(opCode)) { if(obsoleteOpCodes.containsKey(opCode)) {
continue; continue;
} } else if (missingOpCodes.containsKey(opCode)) {
if (opCode == FSEditLogOpCodes.OP_INVALID)
continue; continue;
} else if (opCode == FSEditLogOpCodes.OP_INVALID) {
continue;
}
Long count = visitor.getStatistics().get(opCode); Long count = visitor.getStatistics().get(opCode);
if((count == null) || (count == 0)) { if((count == null) || (count == 0)) {
hasAllOpCodes = false; hasAllOpCodes = false;