HDFS-4645. Move from randomly generated block ID to sequentially generated block ID. Contributed by Arpit Agarwal
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1501993 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
944401defd
commit
221014137b
|
@ -206,6 +206,9 @@ Release 2.1.0-beta - 2013-07-02
|
|||
HDFS-4932. Avoid a wide line on the name node webUI if we have more Journal
|
||||
nodes. (Fengdong Yu via cnauroth)
|
||||
|
||||
HDFS-4645. Move from randomly generated block ID to sequentially generated
|
||||
block ID. (Arpit Agarwal via szetszwo)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-4465. Optimize datanode ReplicasMap and ReplicaInfo. (atm)
|
||||
|
|
|
@ -86,6 +86,10 @@ public class HdfsConstants {
|
|||
// An invalid transaction ID that will never be seen in a real namesystem.
|
||||
public static final long INVALID_TXID = -12345;
|
||||
|
||||
// Number of generation stamps reserved for legacy blocks.
|
||||
public static final long RESERVED_GENERATION_STAMPS_V1 =
|
||||
1024L * 1024 * 1024 * 1024;
|
||||
|
||||
/**
|
||||
* URI Scheme for hdfs://namenode/ URIs.
|
||||
*/
|
||||
|
|
|
@ -102,9 +102,9 @@ public class LayoutVersion {
|
|||
RESERVED_REL1_3_0(-44, -41,
|
||||
"Reserved for release 1.3.0", true, ADD_INODE_ID, SNAPSHOT),
|
||||
OPTIMIZE_SNAPSHOT_INODES(-45, -43,
|
||||
"Reduce snapshot inode memory footprint", false);
|
||||
|
||||
|
||||
"Reduce snapshot inode memory footprint", false),
|
||||
SEQUENTIAL_BLOCK_ID(-46, "Allocate block IDs sequentially and store " +
|
||||
"block IDs in the edits log and image files");
|
||||
|
||||
final int lv;
|
||||
final int ancestorLV;
|
||||
|
|
|
@ -1735,7 +1735,7 @@ public class BlockManager {
|
|||
ReplicaState reportedState = itBR.getCurrentReplicaState();
|
||||
|
||||
if (shouldPostponeBlocksFromFuture &&
|
||||
namesystem.isGenStampInFuture(iblk.getGenerationStamp())) {
|
||||
namesystem.isGenStampInFuture(iblk)) {
|
||||
queueReportedBlock(node, iblk, reportedState,
|
||||
QUEUE_REASON_FUTURE_GENSTAMP);
|
||||
continue;
|
||||
|
@ -1857,7 +1857,7 @@ public class BlockManager {
|
|||
}
|
||||
|
||||
if (shouldPostponeBlocksFromFuture &&
|
||||
namesystem.isGenStampInFuture(block.getGenerationStamp())) {
|
||||
namesystem.isGenStampInFuture(block)) {
|
||||
queueReportedBlock(dn, block, reportedState,
|
||||
QUEUE_REASON_FUTURE_GENSTAMP);
|
||||
return null;
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* This exception is thrown when the name node runs out of V1 generation
|
||||
* stamps.
|
||||
*
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class OutOfV1GenerationStampsException extends IOException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public OutOfV1GenerationStampsException() {
|
||||
super("Out of V1 (legacy) generation stamps\n");
|
||||
}
|
||||
}
|
|
@ -60,7 +60,6 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
|
|||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp;
|
||||
|
@ -69,6 +68,9 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
|
|||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
|
||||
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
||||
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
||||
|
@ -800,15 +802,33 @@ public class FSEditLog implements LogsPurgeable {
|
|||
logEdit(op);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add legacy block generation stamp record to edit log
|
||||
*/
|
||||
void logGenerationStampV1(long genstamp) {
|
||||
SetGenstampV1Op op = SetGenstampV1Op.getInstance(cache.get())
|
||||
.setGenerationStamp(genstamp);
|
||||
logEdit(op);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add generation stamp record to edit log
|
||||
*/
|
||||
void logGenerationStamp(long genstamp) {
|
||||
SetGenstampOp op = SetGenstampOp.getInstance(cache.get())
|
||||
void logGenerationStampV2(long genstamp) {
|
||||
SetGenstampV2Op op = SetGenstampV2Op.getInstance(cache.get())
|
||||
.setGenerationStamp(genstamp);
|
||||
logEdit(op);
|
||||
}
|
||||
|
||||
/**
|
||||
* Record a newly allocated block ID in the edit log
|
||||
*/
|
||||
void logAllocateBlockId(long blockId) {
|
||||
AllocateBlockIdOp op = AllocateBlockIdOp.getInstance(cache.get())
|
||||
.setBlockId(blockId);
|
||||
logEdit(op);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add access time record to edit log
|
||||
*/
|
||||
|
|
|
@ -57,7 +57,6 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
|
|||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetNSQuotaOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp;
|
||||
|
@ -67,6 +66,9 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
|
|||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
|
||||
import org.apache.hadoop.hdfs.util.Holder;
|
||||
|
@ -406,9 +408,9 @@ public class FSEditLogLoader {
|
|||
mkdirOp.timestamp);
|
||||
break;
|
||||
}
|
||||
case OP_SET_GENSTAMP: {
|
||||
SetGenstampOp setGenstampOp = (SetGenstampOp)op;
|
||||
fsNamesys.setGenerationStamp(setGenstampOp.genStamp);
|
||||
case OP_SET_GENSTAMP_V1: {
|
||||
SetGenstampV1Op setGenstampV1Op = (SetGenstampV1Op)op;
|
||||
fsNamesys.setGenerationStampV1(setGenstampV1Op.genStampV1);
|
||||
break;
|
||||
}
|
||||
case OP_SET_PERMISSIONS: {
|
||||
|
@ -554,6 +556,16 @@ public class FSEditLogLoader {
|
|||
disallowSnapshotOp.snapshotRoot);
|
||||
break;
|
||||
}
|
||||
case OP_SET_GENSTAMP_V2: {
|
||||
SetGenstampV2Op setGenstampV2Op = (SetGenstampV2Op) op;
|
||||
fsNamesys.setGenerationStampV2(setGenstampV2Op.genStampV2);
|
||||
break;
|
||||
}
|
||||
case OP_ALLOCATE_BLOCK_ID: {
|
||||
AllocateBlockIdOp allocateBlockIdOp = (AllocateBlockIdOp) op;
|
||||
fsNamesys.setLastAllocatedBlockId(allocateBlockIdOp.blockId);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
throw new IOException("Invalid operation read " + op.opCode);
|
||||
}
|
||||
|
|
|
@ -90,7 +90,7 @@ public abstract class FSEditLogOp {
|
|||
inst.put(OP_RENAME_OLD, new RenameOldOp());
|
||||
inst.put(OP_DELETE, new DeleteOp());
|
||||
inst.put(OP_MKDIR, new MkdirOp());
|
||||
inst.put(OP_SET_GENSTAMP, new SetGenstampOp());
|
||||
inst.put(OP_SET_GENSTAMP_V1, new SetGenstampV1Op());
|
||||
inst.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
|
||||
inst.put(OP_SET_OWNER, new SetOwnerOp());
|
||||
inst.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
|
||||
|
@ -116,6 +116,8 @@ public abstract class FSEditLogOp {
|
|||
inst.put(OP_CREATE_SNAPSHOT, new CreateSnapshotOp());
|
||||
inst.put(OP_DELETE_SNAPSHOT, new DeleteSnapshotOp());
|
||||
inst.put(OP_RENAME_SNAPSHOT, new RenameSnapshotOp());
|
||||
inst.put(OP_SET_GENSTAMP_V2, new SetGenstampV2Op());
|
||||
inst.put(OP_ALLOCATE_BLOCK_ID, new AllocateBlockIdOp());
|
||||
}
|
||||
|
||||
public FSEditLogOp get(FSEditLogOpCodes opcode) {
|
||||
|
@ -1054,39 +1056,39 @@ public abstract class FSEditLogOp {
|
|||
}
|
||||
}
|
||||
|
||||
static class SetGenstampOp extends FSEditLogOp {
|
||||
long genStamp;
|
||||
static class SetGenstampV1Op extends FSEditLogOp {
|
||||
long genStampV1;
|
||||
|
||||
private SetGenstampOp() {
|
||||
super(OP_SET_GENSTAMP);
|
||||
private SetGenstampV1Op() {
|
||||
super(OP_SET_GENSTAMP_V1);
|
||||
}
|
||||
|
||||
static SetGenstampOp getInstance(OpInstanceCache cache) {
|
||||
return (SetGenstampOp)cache.get(OP_SET_GENSTAMP);
|
||||
static SetGenstampV1Op getInstance(OpInstanceCache cache) {
|
||||
return (SetGenstampV1Op)cache.get(OP_SET_GENSTAMP_V1);
|
||||
}
|
||||
|
||||
SetGenstampOp setGenerationStamp(long genStamp) {
|
||||
this.genStamp = genStamp;
|
||||
SetGenstampV1Op setGenerationStamp(long genStamp) {
|
||||
this.genStampV1 = genStamp;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public
|
||||
void writeFields(DataOutputStream out) throws IOException {
|
||||
FSImageSerialization.writeLong(genStamp, out);
|
||||
FSImageSerialization.writeLong(genStampV1, out);
|
||||
}
|
||||
|
||||
@Override
|
||||
void readFields(DataInputStream in, int logVersion)
|
||||
throws IOException {
|
||||
this.genStamp = FSImageSerialization.readLong(in);
|
||||
this.genStampV1 = FSImageSerialization.readLong(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("SetGenstampOp [genStamp=");
|
||||
builder.append(genStamp);
|
||||
builder.append("SetGenstampOp [GenStamp=");
|
||||
builder.append(genStampV1);
|
||||
builder.append(", opCode=");
|
||||
builder.append(opCode);
|
||||
builder.append(", txid=");
|
||||
|
@ -1098,11 +1100,115 @@ public abstract class FSEditLogOp {
|
|||
@Override
|
||||
protected void toXml(ContentHandler contentHandler) throws SAXException {
|
||||
XMLUtils.addSaxString(contentHandler, "GENSTAMP",
|
||||
Long.valueOf(genStamp).toString());
|
||||
Long.valueOf(genStampV1).toString());
|
||||
}
|
||||
|
||||
@Override void fromXml(Stanza st) throws InvalidXmlException {
|
||||
this.genStamp = Long.valueOf(st.getValue("GENSTAMP"));
|
||||
this.genStampV1 = Long.valueOf(st.getValue("GENSTAMP"));
|
||||
}
|
||||
}
|
||||
|
||||
static class SetGenstampV2Op extends FSEditLogOp {
|
||||
long genStampV2;
|
||||
|
||||
private SetGenstampV2Op() {
|
||||
super(OP_SET_GENSTAMP_V2);
|
||||
}
|
||||
|
||||
static SetGenstampV2Op getInstance(OpInstanceCache cache) {
|
||||
return (SetGenstampV2Op)cache.get(OP_SET_GENSTAMP_V2);
|
||||
}
|
||||
|
||||
SetGenstampV2Op setGenerationStamp(long genStamp) {
|
||||
this.genStampV2 = genStamp;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public
|
||||
void writeFields(DataOutputStream out) throws IOException {
|
||||
FSImageSerialization.writeLong(genStampV2, out);
|
||||
}
|
||||
|
||||
@Override
|
||||
void readFields(DataInputStream in, int logVersion)
|
||||
throws IOException {
|
||||
this.genStampV2 = FSImageSerialization.readLong(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("SetGenstampV2Op [GenStampV2=");
|
||||
builder.append(genStampV2);
|
||||
builder.append(", opCode=");
|
||||
builder.append(opCode);
|
||||
builder.append(", txid=");
|
||||
builder.append(txid);
|
||||
builder.append("]");
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void toXml(ContentHandler contentHandler) throws SAXException {
|
||||
XMLUtils.addSaxString(contentHandler, "GENSTAMPV2",
|
||||
Long.valueOf(genStampV2).toString());
|
||||
}
|
||||
|
||||
@Override void fromXml(Stanza st) throws InvalidXmlException {
|
||||
this.genStampV2 = Long.valueOf(st.getValue("GENSTAMPV2"));
|
||||
}
|
||||
}
|
||||
|
||||
static class AllocateBlockIdOp extends FSEditLogOp {
|
||||
long blockId;
|
||||
|
||||
private AllocateBlockIdOp() {
|
||||
super(OP_ALLOCATE_BLOCK_ID);
|
||||
}
|
||||
|
||||
static AllocateBlockIdOp getInstance(OpInstanceCache cache) {
|
||||
return (AllocateBlockIdOp)cache.get(OP_ALLOCATE_BLOCK_ID);
|
||||
}
|
||||
|
||||
AllocateBlockIdOp setBlockId(long blockId) {
|
||||
this.blockId = blockId;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public
|
||||
void writeFields(DataOutputStream out) throws IOException {
|
||||
FSImageSerialization.writeLong(blockId, out);
|
||||
}
|
||||
|
||||
@Override
|
||||
void readFields(DataInputStream in, int logVersion)
|
||||
throws IOException {
|
||||
this.blockId = FSImageSerialization.readLong(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("AllocateBlockIdOp [blockId=");
|
||||
builder.append(blockId);
|
||||
builder.append(", opCode=");
|
||||
builder.append(opCode);
|
||||
builder.append(", txid=");
|
||||
builder.append(txid);
|
||||
builder.append("]");
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void toXml(ContentHandler contentHandler) throws SAXException {
|
||||
XMLUtils.addSaxString(contentHandler, "BLOCK_ID",
|
||||
Long.valueOf(blockId).toString());
|
||||
}
|
||||
|
||||
@Override void fromXml(Stanza st) throws InvalidXmlException {
|
||||
this.blockId = Long.valueOf(st.getValue("BLOCK_ID"));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ public enum FSEditLogOpCodes {
|
|||
OP_SET_PERMISSIONS ((byte) 7),
|
||||
OP_SET_OWNER ((byte) 8),
|
||||
OP_CLOSE ((byte) 9),
|
||||
OP_SET_GENSTAMP ((byte) 10),
|
||||
OP_SET_GENSTAMP_V1 ((byte) 10),
|
||||
OP_SET_NS_QUOTA ((byte) 11), // obsolete
|
||||
OP_CLEAR_NS_QUOTA ((byte) 12), // obsolete
|
||||
OP_TIMES ((byte) 13), // set atime, mtime
|
||||
|
@ -61,8 +61,9 @@ public enum FSEditLogOpCodes {
|
|||
OP_DELETE_SNAPSHOT ((byte) 27),
|
||||
OP_RENAME_SNAPSHOT ((byte) 28),
|
||||
OP_ALLOW_SNAPSHOT ((byte) 29),
|
||||
OP_DISALLOW_SNAPSHOT ((byte) 30);
|
||||
|
||||
OP_DISALLOW_SNAPSHOT ((byte) 30),
|
||||
OP_SET_GENSTAMP_V2 ((byte) 31),
|
||||
OP_ALLOCATE_BLOCK_ID ((byte) 32);
|
||||
private byte opCode;
|
||||
|
||||
/**
|
||||
|
|
|
@ -95,7 +95,6 @@ public class FSImage implements Closeable {
|
|||
final private Configuration conf;
|
||||
|
||||
protected NNStorageRetentionManager archivalManager;
|
||||
protected IdGenerator blockIdGenerator;
|
||||
|
||||
/**
|
||||
* Construct an FSImage
|
||||
|
@ -141,9 +140,6 @@ public class FSImage implements Closeable {
|
|||
Preconditions.checkState(fileCount == 1,
|
||||
"FSImage.format should be called with an uninitialized namesystem, has " +
|
||||
fileCount + " files");
|
||||
// BlockIdGenerator is defined during formatting
|
||||
// currently there is only one BlockIdGenerator
|
||||
blockIdGenerator = createBlockIdGenerator(fsn);
|
||||
NamespaceInfo ns = NNStorage.newNamespaceInfo();
|
||||
ns.clusterID = clusterId;
|
||||
|
||||
|
@ -814,9 +810,6 @@ public class FSImage implements Closeable {
|
|||
FSImageFormat.Loader loader = new FSImageFormat.Loader(
|
||||
conf, target);
|
||||
loader.load(curFile);
|
||||
// BlockIdGenerator is determined after loading image
|
||||
// currently there is only one BlockIdGenerator
|
||||
blockIdGenerator = createBlockIdGenerator(target);
|
||||
target.setBlockPoolId(this.getBlockPoolID());
|
||||
|
||||
// Check that the image digest we loaded matches up with what
|
||||
|
@ -1249,12 +1242,4 @@ public class FSImage implements Closeable {
|
|||
public synchronized long getMostRecentCheckpointTxId() {
|
||||
return storage.getMostRecentCheckpointTxId();
|
||||
}
|
||||
|
||||
public long getUniqueBlockId() {
|
||||
return blockIdGenerator.nextValue();
|
||||
}
|
||||
|
||||
public IdGenerator createBlockIdGenerator(FSNamesystem fsn) {
|
||||
return new RandomBlockIdGenerator(fsn);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -70,8 +70,10 @@ import org.apache.hadoop.io.Text;
|
|||
* <pre>
|
||||
* FSImage {
|
||||
* layoutVersion: int, namespaceID: int, numberItemsInFSDirectoryTree: long,
|
||||
* namesystemGenerationStamp: long, transactionID: long,
|
||||
* snapshotCounter: int, numberOfSnapshots: int, numOfSnapshottableDirs: int,
|
||||
* namesystemGenerationStampV1: long, namesystemGenerationStampV2: long,
|
||||
* generationStampAtBlockIdSwitch:long, lastAllocatedBlockId:
|
||||
* long transactionID: long, snapshotCounter: int, numberOfSnapshots: int,
|
||||
* numOfSnapshottableDirs: int,
|
||||
* {FSDirectoryTree, FilesUnderConstruction, SecretManagerState} (can be compressed)
|
||||
* }
|
||||
*
|
||||
|
@ -257,9 +259,29 @@ public class FSImageFormat {
|
|||
|
||||
long numFiles = in.readLong();
|
||||
|
||||
// read in the last generation stamp.
|
||||
// read in the last generation stamp for legacy blocks.
|
||||
long genstamp = in.readLong();
|
||||
namesystem.setGenerationStamp(genstamp);
|
||||
namesystem.setGenerationStampV1(genstamp);
|
||||
|
||||
if (LayoutVersion.supports(Feature.SEQUENTIAL_BLOCK_ID, imgVersion)) {
|
||||
// read the starting generation stamp for sequential block IDs
|
||||
genstamp = in.readLong();
|
||||
namesystem.setGenerationStampV2(genstamp);
|
||||
|
||||
// read the last generation stamp for blocks created after
|
||||
// the switch to sequential block IDs.
|
||||
long stampAtIdSwitch = in.readLong();
|
||||
namesystem.setGenerationStampV1Limit(stampAtIdSwitch);
|
||||
|
||||
// read the max sequential block ID.
|
||||
long maxSequentialBlockId = in.readLong();
|
||||
namesystem.setLastAllocatedBlockId(maxSequentialBlockId);
|
||||
} else {
|
||||
long startingGenStamp = namesystem.upgradeGenerationStampToV2();
|
||||
// This is an upgrade.
|
||||
LOG.info("Upgrading to sequential block IDs. Generation stamp " +
|
||||
"for new blocks set to " + startingGenStamp);
|
||||
}
|
||||
|
||||
// read the transaction ID of the last edit represented by
|
||||
// this image
|
||||
|
@ -884,10 +906,14 @@ public class FSImageFormat {
|
|||
out.writeInt(sourceNamesystem.unprotectedGetNamespaceInfo()
|
||||
.getNamespaceID());
|
||||
out.writeLong(fsDir.rootDir.numItemsInTree());
|
||||
out.writeLong(sourceNamesystem.getGenerationStamp());
|
||||
out.writeLong(sourceNamesystem.getGenerationStampV1());
|
||||
out.writeLong(sourceNamesystem.getGenerationStampV2());
|
||||
out.writeLong(sourceNamesystem.getGenerationStampAtblockIdSwitch());
|
||||
out.writeLong(sourceNamesystem.getLastAllocatedBlockId());
|
||||
out.writeLong(context.getTxId());
|
||||
out.writeLong(sourceNamesystem.getLastInodeId());
|
||||
|
||||
|
||||
sourceNamesystem.getSnapshotManager().write(out);
|
||||
|
||||
// write compression info and set up compressed stream
|
||||
|
|
|
@ -156,12 +156,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
|||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.*;
|
||||
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
||||
|
@ -363,10 +358,33 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
private final long minBlockSize; // minimum block size
|
||||
private final long maxBlocksPerFile; // maximum # of blocks per file
|
||||
|
||||
/**
|
||||
* The global generation stamp for legacy blocks with randomly
|
||||
* generated block IDs.
|
||||
*/
|
||||
private final GenerationStamp generationStampV1 = new GenerationStamp();
|
||||
|
||||
/**
|
||||
* The global generation stamp for this file system.
|
||||
*/
|
||||
private final GenerationStamp generationStamp = new GenerationStamp();
|
||||
private final GenerationStamp generationStampV2 = new GenerationStamp();
|
||||
|
||||
/**
|
||||
* The value of the generation stamp when the first switch to sequential
|
||||
* block IDs was made. Blocks with generation stamps below this value
|
||||
* have randomly allocated block IDs. Blocks with generation stamps above
|
||||
* this value had sequentially allocated block IDs. Read from the fsImage
|
||||
* (or initialized as an offset from the V1 (legacy) generation stamp on
|
||||
* upgrade).
|
||||
*/
|
||||
private long generationStampV1Limit =
|
||||
GenerationStamp.GRANDFATHER_GENERATION_STAMP;
|
||||
|
||||
/**
|
||||
* The global block ID space for this file system.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
private final SequentialBlockIdGenerator blockIdGenerator;
|
||||
|
||||
// precision of access times.
|
||||
private final long accessTimePrecision;
|
||||
|
@ -426,7 +444,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
void clear() {
|
||||
dir.reset();
|
||||
dtSecretManager.reset();
|
||||
generationStamp.setCurrentValue(GenerationStamp.LAST_RESERVED_STAMP);
|
||||
generationStampV1.setCurrentValue(GenerationStamp.LAST_RESERVED_STAMP);
|
||||
generationStampV2.setCurrentValue(GenerationStamp.LAST_RESERVED_STAMP);
|
||||
blockIdGenerator.setCurrentValue(
|
||||
SequentialBlockIdGenerator.LAST_RESERVED_BLOCK_ID);
|
||||
generationStampV1Limit = GenerationStamp.GRANDFATHER_GENERATION_STAMP;
|
||||
leaseManager.removeAllLeases();
|
||||
inodeId.setCurrentValue(INodeId.LAST_RESERVED_ID);
|
||||
}
|
||||
|
@ -528,9 +550,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
*
|
||||
* Note that this does not load any data off of disk -- if you would
|
||||
* like that behavior, use {@link #loadFromDisk(Configuration)}
|
||||
|
||||
* @param fnImage The FSImage to associate with
|
||||
*
|
||||
* @param conf configuration
|
||||
* @param fsImage The FSImage to associate with
|
||||
* @throws IOException on bad configuration
|
||||
*/
|
||||
FSNamesystem(Configuration conf, FSImage fsImage) throws IOException {
|
||||
|
@ -541,6 +563,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
|
||||
this.blockManager = new BlockManager(this, this, conf);
|
||||
this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
|
||||
this.blockIdGenerator = new SequentialBlockIdGenerator(this.blockManager);
|
||||
|
||||
this.fsOwner = UserGroupInformation.getCurrentUser();
|
||||
this.fsOwnerShortUserName = fsOwner.getShortUserName();
|
||||
|
@ -2658,9 +2681,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
*/
|
||||
Block createNewBlock() throws IOException {
|
||||
assert hasWriteLock();
|
||||
Block b = new Block(getFSImage().getUniqueBlockId(), 0, 0);
|
||||
Block b = new Block(nextBlockId(), 0, 0);
|
||||
// Increment the generation stamp for every new block.
|
||||
b.setGenerationStamp(nextGenerationStamp());
|
||||
b.setGenerationStamp(nextGenerationStamp(false));
|
||||
return b;
|
||||
}
|
||||
|
||||
|
@ -3356,7 +3379,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
uc.setExpectedLocations(blockManager.getNodes(lastBlock));
|
||||
}
|
||||
// start recovery of the last block for this file
|
||||
long blockRecoveryId = nextGenerationStamp();
|
||||
long blockRecoveryId = nextGenerationStamp(isLegacyBlock(uc));
|
||||
lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
|
||||
uc.initializeBlockRecovery(blockRecoveryId);
|
||||
leaseManager.renewLease(lease);
|
||||
|
@ -5007,34 +5030,164 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
}
|
||||
|
||||
/**
|
||||
* Sets the generation stamp for this filesystem
|
||||
* Sets the current generation stamp for legacy blocks
|
||||
*/
|
||||
void setGenerationStamp(long stamp) {
|
||||
generationStamp.setCurrentValue(stamp);
|
||||
void setGenerationStampV1(long stamp) {
|
||||
generationStampV1.setCurrentValue(stamp);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the generation stamp for this filesystem
|
||||
* Gets the current generation stamp for legacy blocks
|
||||
*/
|
||||
long getGenerationStamp() {
|
||||
return generationStamp.getCurrentValue();
|
||||
long getGenerationStampV1() {
|
||||
return generationStampV1.getCurrentValue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the current generation stamp for this filesystem
|
||||
*/
|
||||
void setGenerationStampV2(long stamp) {
|
||||
generationStampV2.setCurrentValue(stamp);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the current generation stamp for this filesystem
|
||||
*/
|
||||
long getGenerationStampV2() {
|
||||
return generationStampV2.getCurrentValue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Upgrades the generation stamp for the filesystem
|
||||
* by reserving a sufficient range for all existing blocks.
|
||||
* Should be invoked only during the first upgrade to
|
||||
* sequential block IDs.
|
||||
*/
|
||||
long upgradeGenerationStampToV2() {
|
||||
Preconditions.checkState(generationStampV2.getCurrentValue() ==
|
||||
GenerationStamp.LAST_RESERVED_STAMP);
|
||||
|
||||
generationStampV2.skipTo(
|
||||
generationStampV1.getCurrentValue() +
|
||||
HdfsConstants.RESERVED_GENERATION_STAMPS_V1);
|
||||
|
||||
generationStampV1Limit = generationStampV2.getCurrentValue();
|
||||
return generationStampV2.getCurrentValue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the generation stamp that delineates random and sequentially
|
||||
* allocated block IDs.
|
||||
* @param stamp
|
||||
*/
|
||||
void setGenerationStampV1Limit(long stamp) {
|
||||
Preconditions.checkState(generationStampV1Limit ==
|
||||
GenerationStamp.GRANDFATHER_GENERATION_STAMP);
|
||||
generationStampV1Limit = stamp;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the value of the generation stamp that delineates sequential
|
||||
* and random block IDs.
|
||||
*/
|
||||
long getGenerationStampAtblockIdSwitch() {
|
||||
return generationStampV1Limit;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
SequentialBlockIdGenerator getBlockIdGenerator() {
|
||||
return blockIdGenerator;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the maximum allocated block ID for this filesystem. This is
|
||||
* the basis for allocating new block IDs.
|
||||
*/
|
||||
void setLastAllocatedBlockId(long blockId) {
|
||||
blockIdGenerator.skipTo(blockId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the maximum sequentially allocated block ID for this filesystem
|
||||
*/
|
||||
long getLastAllocatedBlockId() {
|
||||
return blockIdGenerator.getCurrentValue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Increments, logs and then returns the stamp
|
||||
*/
|
||||
private long nextGenerationStamp() throws SafeModeException {
|
||||
long nextGenerationStamp(boolean legacyBlock)
|
||||
throws IOException, SafeModeException {
|
||||
assert hasWriteLock();
|
||||
if (isInSafeMode()) {
|
||||
throw new SafeModeException(
|
||||
"Cannot get next generation stamp", safeMode);
|
||||
}
|
||||
final long gs = generationStamp.nextValue();
|
||||
getEditLog().logGenerationStamp(gs);
|
||||
|
||||
long gs;
|
||||
if (legacyBlock) {
|
||||
gs = getNextGenerationStampV1();
|
||||
getEditLog().logGenerationStampV1(gs);
|
||||
} else {
|
||||
gs = getNextGenerationStampV2();
|
||||
getEditLog().logGenerationStampV2(gs);
|
||||
}
|
||||
|
||||
// NB: callers sync the log
|
||||
return gs;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
long getNextGenerationStampV1() throws IOException {
|
||||
long genStampV1 = generationStampV1.nextValue();
|
||||
|
||||
if (genStampV1 >= generationStampV1Limit) {
|
||||
// We ran out of generation stamps for legacy blocks. In practice, it
|
||||
// is extremely unlikely as we reserved 1T v1 generation stamps. The
|
||||
// result is that we can no longer append to the legacy blocks that
|
||||
// were created before the upgrade to sequential block IDs.
|
||||
throw new OutOfV1GenerationStampsException();
|
||||
}
|
||||
|
||||
return genStampV1;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
long getNextGenerationStampV2() {
|
||||
return generationStampV2.nextValue();
|
||||
}
|
||||
|
||||
long getGenerationStampV1Limit() {
|
||||
return generationStampV1Limit;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine whether the block ID was randomly generated (legacy) or
|
||||
* sequentially generated. The generation stamp value is used to
|
||||
* make the distinction.
|
||||
* @param block
|
||||
* @return true if the block ID was randomly generated, false otherwise.
|
||||
*/
|
||||
boolean isLegacyBlock(Block block) {
|
||||
return block.getGenerationStamp() < getGenerationStampV1Limit();
|
||||
}
|
||||
|
||||
/**
|
||||
* Increments, logs and then returns the block ID
|
||||
*/
|
||||
private long nextBlockId() throws SafeModeException {
|
||||
assert hasWriteLock();
|
||||
if (isInSafeMode()) {
|
||||
throw new SafeModeException(
|
||||
"Cannot get next block ID", safeMode);
|
||||
}
|
||||
final long blockId = blockIdGenerator.nextValue();
|
||||
getEditLog().logAllocateBlockId(blockId);
|
||||
// NB: callers sync the log
|
||||
return blockId;
|
||||
}
|
||||
|
||||
private INodeFileUnderConstruction checkUCBlock(ExtendedBlock block,
|
||||
String clientName) throws IOException {
|
||||
assert hasWriteLock();
|
||||
|
@ -5115,7 +5268,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
checkUCBlock(block, clientName);
|
||||
|
||||
// get a new generation stamp and an access token
|
||||
block.setGenerationStamp(nextGenerationStamp());
|
||||
block.setGenerationStamp(
|
||||
nextGenerationStamp(isLegacyBlock(block.getLocalBlock())));
|
||||
locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
|
||||
blockManager.setBlockToken(locatedBlock, AccessMode.WRITE);
|
||||
} finally {
|
||||
|
@ -5130,7 +5284,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
* Update a pipeline for a block under construction
|
||||
*
|
||||
* @param clientName the name of the client
|
||||
* @param oldblock and old block
|
||||
* @param oldBlock and old block
|
||||
* @param newBlock a new block with a new generation stamp and length
|
||||
* @param newNodes datanodes in the pipeline
|
||||
* @throws IOException if any error occurs
|
||||
|
@ -5917,9 +6071,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean isGenStampInFuture(long genStamp) {
|
||||
return (genStamp > getGenerationStamp());
|
||||
public boolean isGenStampInFuture(Block block) {
|
||||
if (isLegacyBlock(block)) {
|
||||
return block.getGenerationStamp() > getGenerationStampV1();
|
||||
} else {
|
||||
return block.getGenerationStamp() > getGenerationStampV2();
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public EditLogTailer getEditLogTailer() {
|
||||
return editLogTailer;
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
||||
import org.apache.hadoop.hdfs.util.RwLock;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
|
@ -37,7 +38,7 @@ public interface Namesystem extends RwLock, SafeMode {
|
|||
|
||||
public boolean isInStandbyState();
|
||||
|
||||
public boolean isGenStampInFuture(long generationStamp);
|
||||
public boolean isGenStampInFuture(Block block);
|
||||
|
||||
public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal);
|
||||
|
||||
|
|
|
@ -18,27 +18,41 @@
|
|||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.util.IdGenerator;
|
||||
import org.apache.hadoop.util.SequentialNumber;
|
||||
|
||||
/**
|
||||
* Generator of random block IDs.
|
||||
* Generate the next valid block ID by incrementing the maximum block
|
||||
* ID allocated so far, starting at 2^30+1.
|
||||
*
|
||||
* Block IDs used to be allocated randomly in the past. Hence we may
|
||||
* find some conflicts while stepping through the ID space sequentially.
|
||||
* However given the sparsity of the ID space, conflicts should be rare
|
||||
* and can be skipped over when detected.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RandomBlockIdGenerator implements IdGenerator {
|
||||
public class SequentialBlockIdGenerator extends SequentialNumber {
|
||||
/**
|
||||
* The last reserved block ID.
|
||||
*/
|
||||
public static final long LAST_RESERVED_BLOCK_ID = 1024L * 1024 * 1024;
|
||||
|
||||
private final BlockManager blockManager;
|
||||
|
||||
RandomBlockIdGenerator(FSNamesystem namesystem) {
|
||||
this.blockManager = namesystem.getBlockManager();
|
||||
SequentialBlockIdGenerator(BlockManager blockManagerRef) {
|
||||
super(LAST_RESERVED_BLOCK_ID);
|
||||
this.blockManager = blockManagerRef;
|
||||
}
|
||||
|
||||
@Override // NumberGenerator
|
||||
public long nextValue() {
|
||||
Block b = new Block(DFSUtil.getRandom().nextLong(), 0, 0);
|
||||
Block b = new Block(super.nextValue());
|
||||
|
||||
// There may be an occasional conflict with randomly generated
|
||||
// block IDs. Skip over the conflicts.
|
||||
while(isValidBlock(b)) {
|
||||
b.setBlockId(DFSUtil.getRandom().nextLong());
|
||||
b.setBlockId(super.nextValue());
|
||||
}
|
||||
return b.getBlockId();
|
||||
}
|
|
@ -126,7 +126,7 @@ class ImageLoaderCurrent implements ImageLoader {
|
|||
new SimpleDateFormat("yyyy-MM-dd HH:mm");
|
||||
private static int[] versions = { -16, -17, -18, -19, -20, -21, -22, -23,
|
||||
-24, -25, -26, -27, -28, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39,
|
||||
-40, -41, -42, -43, -44, -45};
|
||||
-40, -41, -42, -43, -44, -45, -46 };
|
||||
private int imageVersion = 0;
|
||||
|
||||
private final Map<Long, String> subtreeMap = new HashMap<Long, String>();
|
||||
|
@ -165,6 +165,12 @@ class ImageLoaderCurrent implements ImageLoader {
|
|||
|
||||
v.visit(ImageElement.GENERATION_STAMP, in.readLong());
|
||||
|
||||
if (LayoutVersion.supports(Feature.SEQUENTIAL_BLOCK_ID, imageVersion)) {
|
||||
v.visit(ImageElement.GENERATION_STAMP_V2, in.readLong());
|
||||
v.visit(ImageElement.GENERATION_STAMP_V1_LIMIT, in.readLong());
|
||||
v.visit(ImageElement.LAST_ALLOCATED_BLOCK_ID, in.readLong());
|
||||
}
|
||||
|
||||
if (LayoutVersion.supports(Feature.STORED_TXIDS, imageVersion)) {
|
||||
v.visit(ImageElement.TRANSACTION_ID, in.readLong());
|
||||
}
|
||||
|
|
|
@ -38,6 +38,9 @@ abstract class ImageVisitor {
|
|||
LAYOUT_VERSION,
|
||||
NUM_INODES,
|
||||
GENERATION_STAMP,
|
||||
GENERATION_STAMP_V2,
|
||||
GENERATION_STAMP_V1_LIMIT,
|
||||
LAST_ALLOCATED_BLOCK_ID,
|
||||
INODES,
|
||||
INODE,
|
||||
INODE_PATH,
|
||||
|
|
|
@ -242,6 +242,39 @@ public class DFSTestUtil {
|
|||
}
|
||||
}
|
||||
|
||||
public static void createFile(FileSystem fs, Path fileName, int bufferLen,
|
||||
long fileLen, long blockSize, short replFactor, long seed)
|
||||
throws IOException {
|
||||
assert bufferLen > 0;
|
||||
if (!fs.mkdirs(fileName.getParent())) {
|
||||
throw new IOException("Mkdirs failed to create " +
|
||||
fileName.getParent().toString());
|
||||
}
|
||||
FSDataOutputStream out = null;
|
||||
try {
|
||||
out = fs.create(fileName, true, fs.getConf()
|
||||
.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
|
||||
replFactor, blockSize);
|
||||
if (fileLen > 0) {
|
||||
byte[] toWrite = new byte[bufferLen];
|
||||
Random rb = new Random(seed);
|
||||
long bytesToWrite = fileLen;
|
||||
while (bytesToWrite>0) {
|
||||
rb.nextBytes(toWrite);
|
||||
int bytesToWriteNext = (bufferLen < bytesToWrite) ? bufferLen
|
||||
: (int) bytesToWrite;
|
||||
|
||||
out.write(toWrite, 0, bytesToWriteNext);
|
||||
bytesToWrite -= bytesToWriteNext;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (out != null) {
|
||||
out.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** check if the files have been copied correctly. */
|
||||
public boolean checkFiles(FileSystem fs, String topdir) throws IOException {
|
||||
Path root = new Path(topdir);
|
||||
|
@ -554,7 +587,7 @@ public class DFSTestUtil {
|
|||
}
|
||||
|
||||
public static ExtendedBlock getFirstBlock(FileSystem fs, Path path) throws IOException {
|
||||
HdfsDataInputStream in = (HdfsDataInputStream)((DistributedFileSystem)fs).open(path);
|
||||
HdfsDataInputStream in = (HdfsDataInputStream) fs.open(path);
|
||||
in.readByte();
|
||||
return in.getCurrentBlock();
|
||||
}
|
||||
|
@ -564,6 +597,12 @@ public class DFSTestUtil {
|
|||
return ((HdfsDataInputStream) in).getAllBlocks();
|
||||
}
|
||||
|
||||
public static List<LocatedBlock> getAllBlocks(FileSystem fs, Path path)
|
||||
throws IOException {
|
||||
HdfsDataInputStream in = (HdfsDataInputStream) fs.open(path);
|
||||
return in.getAllBlocks();
|
||||
}
|
||||
|
||||
public static Token<BlockTokenIdentifier> getBlockToken(
|
||||
FSDataOutputStream out) {
|
||||
return ((DFSOutputStream) out.getWrappedStream()).getBlockToken();
|
||||
|
|
|
@ -53,7 +53,9 @@ import com.google.common.base.Joiner;
|
|||
*/
|
||||
public class TestDFSUpgrade {
|
||||
|
||||
private static final int EXPECTED_TXID = 45;
|
||||
// TODO: Avoid hard-coding expected_txid. The test should be more robust.
|
||||
private static final int EXPECTED_TXID = 61;
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestDFSUpgrade.class.getName());
|
||||
private Configuration conf;
|
||||
private int testCounter = 0;
|
||||
|
|
|
@ -210,6 +210,7 @@ public class TestDataTransferProtocol {
|
|||
@Test
|
||||
public void testOpWrite() throws IOException {
|
||||
int numDataNodes = 1;
|
||||
final long BLOCK_ID_FUDGE = 128;
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
|
||||
try {
|
||||
|
@ -260,8 +261,9 @@ public class TestDataTransferProtocol {
|
|||
"Recover failed close to a finalized replica", false);
|
||||
firstBlock.setGenerationStamp(newGS);
|
||||
|
||||
/* Test writing to a new block */
|
||||
long newBlockId = firstBlock.getBlockId() + 1;
|
||||
// Test writing to a new block. Don't choose the next sequential
|
||||
// block ID to avoid conflicting with IDs chosen by the NN.
|
||||
long newBlockId = firstBlock.getBlockId() + BLOCK_ID_FUDGE;
|
||||
ExtendedBlock newBlock = new ExtendedBlock(firstBlock.getBlockPoolId(),
|
||||
newBlockId, 0, firstBlock.getGenerationStamp());
|
||||
|
||||
|
|
|
@ -64,9 +64,6 @@ public class OfflineEditsViewerHelper {
|
|||
|
||||
/**
|
||||
* Generates edits with all op codes and returns the edits filename
|
||||
*
|
||||
* @param dfsDir DFS directory (where to setup MiniDFS cluster)
|
||||
* @param editsFilename where to copy the edits
|
||||
*/
|
||||
public String generateEdits() throws IOException {
|
||||
CheckpointSignature signature = runOperations();
|
||||
|
@ -142,7 +139,7 @@ public class OfflineEditsViewerHelper {
|
|||
DistributedFileSystem dfs =
|
||||
(DistributedFileSystem)cluster.getFileSystem();
|
||||
FileContext fc = FileContext.getFileContext(cluster.getURI(0), config);
|
||||
// OP_ADD 0, OP_SET_GENSTAMP 10
|
||||
// OP_ADD 0
|
||||
Path pathFileCreate = new Path("/file_create_u\1F431");
|
||||
FSDataOutputStream s = dfs.create(pathFileCreate);
|
||||
// OP_CLOSE 9
|
||||
|
|
|
@ -1171,7 +1171,8 @@ public class TestCheckpoint {
|
|||
throw new IOException(e);
|
||||
}
|
||||
|
||||
final int EXPECTED_TXNS_FIRST_SEG = 11;
|
||||
// TODO: Fix the test to not require a hard-coded transaction count.
|
||||
final int EXPECTED_TXNS_FIRST_SEG = 13;
|
||||
|
||||
// the following steps should have happened:
|
||||
// edits_inprogress_1 -> edits_1-12 (finalized)
|
||||
|
|
|
@ -1083,7 +1083,7 @@ public class TestEditLog {
|
|||
editlog.initJournalsForWrite();
|
||||
editlog.openForWrite();
|
||||
for (int i = 2; i < TXNS_PER_ROLL; i++) {
|
||||
editlog.logGenerationStamp((long)0);
|
||||
editlog.logGenerationStampV2((long) 0);
|
||||
}
|
||||
editlog.logSync();
|
||||
|
||||
|
@ -1095,7 +1095,7 @@ public class TestEditLog {
|
|||
for (int i = 0; i < numrolls; i++) {
|
||||
editlog.rollEditLog();
|
||||
|
||||
editlog.logGenerationStamp((long)i);
|
||||
editlog.logGenerationStampV2((long) i);
|
||||
editlog.logSync();
|
||||
|
||||
while (aborts.size() > 0
|
||||
|
@ -1105,7 +1105,7 @@ public class TestEditLog {
|
|||
}
|
||||
|
||||
for (int j = 3; j < TXNS_PER_ROLL; j++) {
|
||||
editlog.logGenerationStamp((long)i);
|
||||
editlog.logGenerationStampV2((long) i);
|
||||
}
|
||||
editlog.logSync();
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
@ -51,9 +52,9 @@ public class TestEditLogFileInputStream {
|
|||
// Read the edit log and verify that we got all of the data.
|
||||
EnumMap<FSEditLogOpCodes, Holder<Integer>> counts =
|
||||
FSImageTestUtil.countEditLogOpTypes(elis);
|
||||
assertEquals(1L, (long)counts.get(FSEditLogOpCodes.OP_ADD).held);
|
||||
assertEquals(1L, (long)counts.get(FSEditLogOpCodes.OP_SET_GENSTAMP).held);
|
||||
assertEquals(1L, (long)counts.get(FSEditLogOpCodes.OP_CLOSE).held);
|
||||
assertThat(counts.get(FSEditLogOpCodes.OP_ADD).held, is(1));
|
||||
assertThat(counts.get(FSEditLogOpCodes.OP_SET_GENSTAMP_V1).held, is(1));
|
||||
assertThat(counts.get(FSEditLogOpCodes.OP_CLOSE).held, is(1));
|
||||
|
||||
// Check that length header was picked up.
|
||||
assertEquals(FAKE_LOG_DATA.length, elis.length());
|
||||
|
|
|
@ -541,7 +541,7 @@ public class TestSaveNamespace {
|
|||
FSNamesystem spyFsn = spy(fsn);
|
||||
final FSNamesystem finalFsn = spyFsn;
|
||||
DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG);
|
||||
doAnswer(delayer).when(spyFsn).getGenerationStamp();
|
||||
doAnswer(delayer).when(spyFsn).getGenerationStampV2();
|
||||
|
||||
ExecutorService pool = Executors.newFixedThreadPool(2);
|
||||
|
||||
|
|
|
@ -0,0 +1,209 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
/**
|
||||
* Tests the sequential block ID generation mechanism and block ID
|
||||
* collision handling.
|
||||
*/
|
||||
public class TestSequentialBlockId {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog("TestSequentialBlockId");
|
||||
|
||||
private static final DataChecksum DEFAULT_CHECKSUM =
|
||||
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512);
|
||||
|
||||
final int BLOCK_SIZE = 1024;
|
||||
final int IO_SIZE = BLOCK_SIZE;
|
||||
final short REPLICATION = 1;
|
||||
final long SEED = 0;
|
||||
|
||||
DatanodeID datanode;
|
||||
InetSocketAddress dnAddr;
|
||||
|
||||
/**
|
||||
* Test that block IDs are generated sequentially.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testBlockIdGeneration() throws IOException {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
||||
MiniDFSCluster cluster =
|
||||
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
|
||||
try {
|
||||
cluster.waitActive();
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
|
||||
// Create a file that is 10 blocks long.
|
||||
Path path = new Path("testBlockIdGeneration.dat");
|
||||
DFSTestUtil.createFile(
|
||||
fs, path, IO_SIZE, BLOCK_SIZE * 10, BLOCK_SIZE, REPLICATION, SEED);
|
||||
List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(fs, path);
|
||||
LOG.info("Block0 id is " + blocks.get(0).getBlock().getBlockId());
|
||||
long nextBlockExpectedId = blocks.get(0).getBlock().getBlockId() + 1;
|
||||
|
||||
// Ensure that the block IDs are sequentially increasing.
|
||||
for (int i = 1; i < blocks.size(); ++i) {
|
||||
long nextBlockId = blocks.get(i).getBlock().getBlockId();
|
||||
LOG.info("Block" + i + " id is " + nextBlockId);
|
||||
assertThat(nextBlockId, is(nextBlockExpectedId));
|
||||
++nextBlockExpectedId;
|
||||
}
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that collisions in the block ID space are handled gracefully.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testTriggerBlockIdCollision() throws IOException {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
||||
MiniDFSCluster cluster =
|
||||
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
|
||||
try {
|
||||
cluster.waitActive();
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
FSNamesystem fsn = cluster.getNamesystem();
|
||||
final int blockCount = 10;
|
||||
|
||||
|
||||
// Create a file with a few blocks to rev up the global block ID
|
||||
// counter.
|
||||
Path path1 = new Path("testBlockIdCollisionDetection_file1.dat");
|
||||
DFSTestUtil.createFile(
|
||||
fs, path1, IO_SIZE, BLOCK_SIZE * blockCount,
|
||||
BLOCK_SIZE, REPLICATION, SEED);
|
||||
List<LocatedBlock> blocks1 = DFSTestUtil.getAllBlocks(fs, path1);
|
||||
|
||||
|
||||
// Rewind the block ID counter in the name system object. This will result
|
||||
// in block ID collisions when we try to allocate new blocks.
|
||||
SequentialBlockIdGenerator blockIdGenerator = fsn.getBlockIdGenerator();
|
||||
blockIdGenerator.setCurrentValue(blockIdGenerator.getCurrentValue() - 5);
|
||||
|
||||
// Trigger collisions by creating a new file.
|
||||
Path path2 = new Path("testBlockIdCollisionDetection_file2.dat");
|
||||
DFSTestUtil.createFile(
|
||||
fs, path2, IO_SIZE, BLOCK_SIZE * blockCount,
|
||||
BLOCK_SIZE, REPLICATION, SEED);
|
||||
List<LocatedBlock> blocks2 = DFSTestUtil.getAllBlocks(fs, path2);
|
||||
assertThat(blocks2.size(), is(blockCount));
|
||||
|
||||
// Make sure that file2 block IDs start immediately after file1
|
||||
assertThat(blocks2.get(0).getBlock().getBlockId(),
|
||||
is(blocks1.get(9).getBlock().getBlockId() + 1));
|
||||
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the block type (legacy or not) can be correctly detected
|
||||
* based on its generation stamp.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testBlockTypeDetection() throws IOException {
|
||||
|
||||
// Setup a mock object and stub out a few routines to
|
||||
// retrieve the generation stamp counters.
|
||||
FSNamesystem fsn = mock(FSNamesystem.class);
|
||||
final long maxGenStampForLegacyBlocks = 10000;
|
||||
|
||||
when(fsn.getGenerationStampV1Limit())
|
||||
.thenReturn(maxGenStampForLegacyBlocks);
|
||||
|
||||
Block legacyBlock = spy(new Block());
|
||||
when(legacyBlock.getGenerationStamp())
|
||||
.thenReturn(maxGenStampForLegacyBlocks/2);
|
||||
|
||||
Block newBlock = spy(new Block());
|
||||
when(newBlock.getGenerationStamp())
|
||||
.thenReturn(maxGenStampForLegacyBlocks+1);
|
||||
|
||||
// Make sure that isLegacyBlock() can correctly detect
|
||||
// legacy and new blocks.
|
||||
when(fsn.isLegacyBlock(any(Block.class))).thenCallRealMethod();
|
||||
assertThat(fsn.isLegacyBlock(legacyBlock), is(true));
|
||||
assertThat(fsn.isLegacyBlock(newBlock), is(false));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the generation stamp for legacy and new blocks is updated
|
||||
* as expected.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testGenerationStampUpdate() throws IOException {
|
||||
|
||||
// Setup a mock object and stub out a few routines to
|
||||
// retrieve the generation stamp counters.
|
||||
FSNamesystem fsn = mock(FSNamesystem.class);
|
||||
FSEditLog editLog = mock(FSEditLog.class);
|
||||
final long nextGenerationStampV1 = 5000;
|
||||
final long nextGenerationStampV2 = 20000;
|
||||
|
||||
when(fsn.getNextGenerationStampV1())
|
||||
.thenReturn(nextGenerationStampV1);
|
||||
when(fsn.getNextGenerationStampV2())
|
||||
.thenReturn(nextGenerationStampV2);
|
||||
|
||||
// Make sure that the generation stamp is set correctly for both
|
||||
// kinds of blocks.
|
||||
when(fsn.nextGenerationStamp(anyBoolean())).thenCallRealMethod();
|
||||
when(fsn.hasWriteLock()).thenReturn(true);
|
||||
when(fsn.getEditLog()).thenReturn(editLog);
|
||||
assertThat(fsn.nextGenerationStamp(true), is(nextGenerationStampV1));
|
||||
assertThat(fsn.nextGenerationStamp(false), is(nextGenerationStampV2));
|
||||
}
|
||||
}
|
|
@ -44,7 +44,13 @@ public class TestOfflineEditsViewer {
|
|||
private static final Map<FSEditLogOpCodes, Boolean> obsoleteOpCodes =
|
||||
new HashMap<FSEditLogOpCodes, Boolean>();
|
||||
|
||||
static { initializeObsoleteOpCodes(); }
|
||||
private static final Map<FSEditLogOpCodes, Boolean> missingOpCodes =
|
||||
new HashMap<FSEditLogOpCodes, Boolean>();
|
||||
|
||||
static {
|
||||
initializeObsoleteOpCodes();
|
||||
initializeMissingOpCodes();
|
||||
}
|
||||
|
||||
private static String buildDir =
|
||||
System.getProperty("test.build.data", "build/test/data");
|
||||
|
@ -74,6 +80,16 @@ public class TestOfflineEditsViewer {
|
|||
obsoleteOpCodes.put(FSEditLogOpCodes.OP_CLEAR_NS_QUOTA, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize missingOpcodes
|
||||
*
|
||||
* Opcodes that are not available except after uprade from
|
||||
* an older version. We don't test these here.
|
||||
*/
|
||||
private static void initializeMissingOpCodes() {
|
||||
obsoleteOpCodes.put(FSEditLogOpCodes.OP_SET_GENSTAMP_V1, true);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
new File(cacheDir).mkdirs();
|
||||
|
@ -103,6 +119,8 @@ public class TestOfflineEditsViewer {
|
|||
assertTrue(
|
||||
"Edits " + edits + " should have all op codes",
|
||||
hasAllOpCodes(edits));
|
||||
LOG.info("Comparing generated file " + editsReparsed +
|
||||
" with reference file " + edits);
|
||||
assertTrue(
|
||||
"Generated edits and reparsed (bin to XML to bin) should be same",
|
||||
filesEqualIgnoreTrailingZeros(edits, editsReparsed));
|
||||
|
@ -222,9 +240,12 @@ public class TestOfflineEditsViewer {
|
|||
// don't need to test obsolete opCodes
|
||||
if(obsoleteOpCodes.containsKey(opCode)) {
|
||||
continue;
|
||||
}
|
||||
if (opCode == FSEditLogOpCodes.OP_INVALID)
|
||||
} else if (missingOpCodes.containsKey(opCode)) {
|
||||
continue;
|
||||
} else if (opCode == FSEditLogOpCodes.OP_INVALID) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Long count = visitor.getStatistics().get(opCode);
|
||||
if((count == null) || (count == 0)) {
|
||||
hasAllOpCodes = false;
|
||||
|
|
Binary file not shown.
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue