HDFS-7435. PB encoding of block reports is very inefficient. Contributed by Daryn Sharp.
(cherry picked from commitd324164a51
) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (cherry picked from commit464271a5ed
)
This commit is contained in:
parent
cdeb1079ea
commit
116a7f1a16
|
@ -426,6 +426,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
HDFS-7491. Add incremental blockreport latency to DN metrics.
|
HDFS-7491. Add incremental blockreport latency to DN metrics.
|
||||||
(Ming Ma via cnauroth)
|
(Ming Ma via cnauroth)
|
||||||
|
|
||||||
|
HDFS-7435. PB encoding of block reports is very inefficient.
|
||||||
|
(Daryn Sharp via kihwal)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
|
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
|
||||||
|
|
|
@ -17,21 +17,146 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.protocol;
|
package org.apache.hadoop.hdfs.protocol;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.Replica;
|
import org.apache.hadoop.hdfs.server.datanode.Replica;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
|
import com.google.protobuf.CodedInputStream;
|
||||||
|
import com.google.protobuf.CodedOutputStream;
|
||||||
|
|
||||||
/**
|
@InterfaceAudience.Private
|
||||||
* This class provides an interface for accessing list of blocks that
|
@InterfaceStability.Evolving
|
||||||
* has been implemented as long[].
|
public abstract class BlockListAsLongs implements Iterable<BlockReportReplica> {
|
||||||
* This class is useful for block report. Rather than send block reports
|
private final static int CHUNK_SIZE = 64*1024; // 64K
|
||||||
* as a Block[] we can send it as a long[].
|
private static long[] EMPTY_LONGS = new long[]{0, 0};
|
||||||
|
|
||||||
|
public static BlockListAsLongs EMPTY = new BlockListAsLongs() {
|
||||||
|
@Override
|
||||||
|
public int getNumberOfBlocks() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public ByteString getBlocksBuffer() {
|
||||||
|
return ByteString.EMPTY;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public long[] getBlockListAsLongs() {
|
||||||
|
return EMPTY_LONGS;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public Iterator<BlockReportReplica> iterator() {
|
||||||
|
return Collections.emptyIterator();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prepare an instance to in-place decode the given ByteString buffer
|
||||||
|
* @param numBlocks - blocks in the buffer
|
||||||
|
* @param blocksBuf - ByteString encoded varints
|
||||||
|
* @return BlockListAsLongs
|
||||||
|
*/
|
||||||
|
public static BlockListAsLongs decodeBuffer(final int numBlocks,
|
||||||
|
final ByteString blocksBuf) {
|
||||||
|
return new BufferDecoder(numBlocks, blocksBuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prepare an instance to in-place decode the given ByteString buffers
|
||||||
|
* @param numBlocks - blocks in the buffers
|
||||||
|
* @param blocksBufs - list of ByteString encoded varints
|
||||||
|
* @return BlockListAsLongs
|
||||||
|
*/
|
||||||
|
public static BlockListAsLongs decodeBuffers(final int numBlocks,
|
||||||
|
final List<ByteString> blocksBufs) {
|
||||||
|
// this doesn't actually copy the data
|
||||||
|
return decodeBuffer(numBlocks, ByteString.copyFrom(blocksBufs));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prepare an instance to in-place decode the given list of Longs. Note
|
||||||
|
* it's much more efficient to decode ByteString buffers and only exists
|
||||||
|
* for compatibility.
|
||||||
|
* @param blocksList - list of longs
|
||||||
|
* @return BlockListAsLongs
|
||||||
|
*/
|
||||||
|
public static BlockListAsLongs decodeLongs(List<Long> blocksList) {
|
||||||
|
return blocksList.isEmpty() ? EMPTY : new LongsDecoder(blocksList);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prepare an instance to encode the collection of replicas into an
|
||||||
|
* efficient ByteString.
|
||||||
|
* @param replicas - replicas to encode
|
||||||
|
* @return BlockListAsLongs
|
||||||
|
*/
|
||||||
|
public static BlockListAsLongs encode(
|
||||||
|
final Collection<? extends Replica> replicas) {
|
||||||
|
BlockListAsLongs.Builder builder = builder();
|
||||||
|
for (Replica replica : replicas) {
|
||||||
|
builder.add(replica);
|
||||||
|
}
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Builder builder() {
|
||||||
|
return new BlockListAsLongs.Builder();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The number of blocks
|
||||||
|
* @return - the number of blocks
|
||||||
|
*/
|
||||||
|
abstract public int getNumberOfBlocks();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Very efficient encoding of the block report into a ByteString to avoid
|
||||||
|
* the overhead of protobuf repeating fields. Primitive repeating fields
|
||||||
|
* require re-allocs of an ArrayList<Long> and the associated (un)boxing
|
||||||
|
* overhead which puts pressure on GC.
|
||||||
|
*
|
||||||
|
* The structure of the buffer is as follows:
|
||||||
|
* - each replica is represented by 4 longs:
|
||||||
|
* blockId, block length, genstamp, replica state
|
||||||
|
*
|
||||||
|
* @return ByteString encoded block report
|
||||||
|
*/
|
||||||
|
abstract public ByteString getBlocksBuffer();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List of ByteStrings that encode this block report
|
||||||
|
*
|
||||||
|
* @return ByteStrings
|
||||||
|
*/
|
||||||
|
public List<ByteString> getBlocksBuffers() {
|
||||||
|
final ByteString blocksBuf = getBlocksBuffer();
|
||||||
|
final List<ByteString> buffers;
|
||||||
|
final int size = blocksBuf.size();
|
||||||
|
if (size <= CHUNK_SIZE) {
|
||||||
|
buffers = Collections.singletonList(blocksBuf);
|
||||||
|
} else {
|
||||||
|
buffers = new ArrayList<ByteString>();
|
||||||
|
for (int pos=0; pos < size; pos += CHUNK_SIZE) {
|
||||||
|
// this doesn't actually copy the data
|
||||||
|
buffers.add(blocksBuf.substring(pos, Math.min(pos+CHUNK_SIZE, size)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return buffers;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert block report to old-style list of longs. Only used to
|
||||||
|
* re-encode the block report when the DN detects an older NN. This is
|
||||||
|
* inefficient, but in practice a DN is unlikely to be upgraded first
|
||||||
*
|
*
|
||||||
* The structure of the array is as follows:
|
* The structure of the array is as follows:
|
||||||
* 0: the length of the finalized replica list;
|
* 0: the length of the finalized replica list;
|
||||||
|
@ -43,316 +168,307 @@ import org.apache.hadoop.hdfs.server.datanode.Replica;
|
||||||
* - followed by the under-construction replica list where each replica is
|
* - followed by the under-construction replica list where each replica is
|
||||||
* represented by 4 longs: three for the block id, length, generation
|
* represented by 4 longs: three for the block id, length, generation
|
||||||
* stamp, and the fourth for the replica state.
|
* stamp, and the fourth for the replica state.
|
||||||
|
* @return list of longs
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
abstract public long[] getBlockListAsLongs();
|
||||||
@InterfaceStability.Evolving
|
|
||||||
public class BlockListAsLongs implements Iterable<Block> {
|
|
||||||
/**
|
|
||||||
* A finalized block as 3 longs
|
|
||||||
* block-id and block length and generation stamp
|
|
||||||
*/
|
|
||||||
private static final int LONGS_PER_FINALIZED_BLOCK = 3;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An under-construction block as 4 longs
|
* Returns a singleton iterator over blocks in the block report. Do not
|
||||||
* block-id and block length, generation stamp and replica state
|
* add the returned blocks to a collection.
|
||||||
|
* @return Iterator
|
||||||
*/
|
*/
|
||||||
private static final int LONGS_PER_UC_BLOCK = 4;
|
abstract public Iterator<BlockReportReplica> iterator();
|
||||||
|
|
||||||
/** Number of longs in the header */
|
public static class Builder {
|
||||||
private static final int HEADER_SIZE = 2;
|
private final ByteString.Output out;
|
||||||
|
private final CodedOutputStream cos;
|
||||||
|
private int numBlocks = 0;
|
||||||
|
private int numFinalized = 0;
|
||||||
|
|
||||||
/**
|
Builder() {
|
||||||
* Returns the index of the first long in blockList
|
out = ByteString.newOutput(64*1024);
|
||||||
* belonging to the specified block.
|
cos = CodedOutputStream.newInstance(out);
|
||||||
* The first long contains the block id.
|
|
||||||
*/
|
|
||||||
private int index2BlockId(int blockIndex) {
|
|
||||||
if(blockIndex < 0 || blockIndex > getNumberOfBlocks())
|
|
||||||
return -1;
|
|
||||||
int finalizedSize = getNumberOfFinalizedReplicas();
|
|
||||||
if(blockIndex < finalizedSize)
|
|
||||||
return HEADER_SIZE + blockIndex * LONGS_PER_FINALIZED_BLOCK;
|
|
||||||
return HEADER_SIZE + (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK
|
|
||||||
+ (blockIndex - finalizedSize) * LONGS_PER_UC_BLOCK;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private final long[] blockList;
|
public void add(Replica replica) {
|
||||||
|
try {
|
||||||
/**
|
// zig-zag to reduce size of legacy blocks
|
||||||
* Create block report from finalized and under construction lists of blocks.
|
cos.writeSInt64NoTag(replica.getBlockId());
|
||||||
*
|
cos.writeRawVarint64(replica.getBytesOnDisk());
|
||||||
* @param finalized - list of finalized blocks
|
cos.writeRawVarint64(replica.getGenerationStamp());
|
||||||
* @param uc - list of under construction blocks
|
ReplicaState state = replica.getState();
|
||||||
*/
|
// although state is not a 64-bit value, using a long varint to
|
||||||
public BlockListAsLongs(final List<? extends Replica> finalized,
|
// allow for future use of the upper bits
|
||||||
final List<? extends Replica> uc) {
|
cos.writeRawVarint64(state.getValue());
|
||||||
int finalizedSize = finalized == null ? 0 : finalized.size();
|
if (state == ReplicaState.FINALIZED) {
|
||||||
int ucSize = uc == null ? 0 : uc.size();
|
numFinalized++;
|
||||||
int len = HEADER_SIZE
|
|
||||||
+ (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK
|
|
||||||
+ ucSize * LONGS_PER_UC_BLOCK;
|
|
||||||
|
|
||||||
blockList = new long[len];
|
|
||||||
|
|
||||||
// set the header
|
|
||||||
blockList[0] = finalizedSize;
|
|
||||||
blockList[1] = ucSize;
|
|
||||||
|
|
||||||
// set finalized blocks
|
|
||||||
for (int i = 0; i < finalizedSize; i++) {
|
|
||||||
setBlock(i, finalized.get(i));
|
|
||||||
}
|
}
|
||||||
|
numBlocks++;
|
||||||
// set invalid delimiting block
|
} catch (IOException ioe) {
|
||||||
setDelimitingBlock(finalizedSize);
|
// shouldn't happen, ByteString.Output doesn't throw IOE
|
||||||
|
throw new IllegalStateException(ioe);
|
||||||
// set under construction blocks
|
|
||||||
for (int i = 0; i < ucSize; i++) {
|
|
||||||
setBlock(finalizedSize + i, uc.get(i));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public int getNumberOfBlocks() {
|
||||||
* Create block report from a list of finalized blocks. Used by
|
return numBlocks;
|
||||||
* NNThroughputBenchmark.
|
|
||||||
*
|
|
||||||
* @param blocks - list of finalized blocks
|
|
||||||
*/
|
|
||||||
public BlockListAsLongs(final List<? extends Block> blocks) {
|
|
||||||
int finalizedSize = blocks == null ? 0 : blocks.size();
|
|
||||||
int len = HEADER_SIZE
|
|
||||||
+ (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK;
|
|
||||||
|
|
||||||
blockList = new long[len];
|
|
||||||
|
|
||||||
// set the header
|
|
||||||
blockList[0] = finalizedSize;
|
|
||||||
blockList[1] = 0;
|
|
||||||
|
|
||||||
// set finalized blocks
|
|
||||||
for (int i = 0; i < finalizedSize; i++) {
|
|
||||||
setBlock(i, blocks.get(i));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// set invalid delimiting block
|
public BlockListAsLongs build() {
|
||||||
setDelimitingBlock(finalizedSize);
|
try {
|
||||||
|
cos.flush();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
// shouldn't happen, ByteString.Output doesn't throw IOE
|
||||||
|
throw new IllegalStateException(ioe);
|
||||||
|
}
|
||||||
|
return new BufferDecoder(numBlocks, numFinalized, out.toByteString());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public BlockListAsLongs() {
|
// decode new-style ByteString buffer based block report
|
||||||
this((long[])null);
|
private static class BufferDecoder extends BlockListAsLongs {
|
||||||
|
// reserve upper bits for future use. decoding masks off these bits to
|
||||||
|
// allow compatibility for the current through future release that may
|
||||||
|
// start using the bits
|
||||||
|
private static long NUM_BYTES_MASK = (-1L) >>> (64 - 48);
|
||||||
|
private static long REPLICA_STATE_MASK = (-1L) >>> (64 - 4);
|
||||||
|
|
||||||
|
private final ByteString buffer;
|
||||||
|
private final int numBlocks;
|
||||||
|
private int numFinalized;
|
||||||
|
|
||||||
|
BufferDecoder(final int numBlocks, final ByteString buf) {
|
||||||
|
this(numBlocks, -1, buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
BufferDecoder(final int numBlocks, final int numFinalized,
|
||||||
* Constructor
|
final ByteString buf) {
|
||||||
* @param iBlockList - BlockListALongs create from this long[] parameter
|
this.numBlocks = numBlocks;
|
||||||
*/
|
this.numFinalized = numFinalized;
|
||||||
public BlockListAsLongs(final long[] iBlockList) {
|
this.buffer = buf;
|
||||||
if (iBlockList == null) {
|
|
||||||
blockList = new long[HEADER_SIZE];
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
blockList = iBlockList;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getNumberOfBlocks() {
|
||||||
|
return numBlocks;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ByteString getBlocksBuffer() {
|
||||||
|
return buffer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public long[] getBlockListAsLongs() {
|
public long[] getBlockListAsLongs() {
|
||||||
return blockList;
|
// terribly inefficient but only occurs if server tries to transcode
|
||||||
|
// an undecoded buffer into longs - ie. it will never happen but let's
|
||||||
|
// handle it anyway
|
||||||
|
if (numFinalized == -1) {
|
||||||
|
int n = 0;
|
||||||
|
for (Replica replica : this) {
|
||||||
|
if (replica.getState() == ReplicaState.FINALIZED) {
|
||||||
|
n++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
numFinalized = n;
|
||||||
|
}
|
||||||
|
int numUc = numBlocks - numFinalized;
|
||||||
|
int size = 2 + 3*(numFinalized+1) + 4*(numUc);
|
||||||
|
long[] longs = new long[size];
|
||||||
|
longs[0] = numFinalized;
|
||||||
|
longs[1] = numUc;
|
||||||
|
|
||||||
|
int idx = 2;
|
||||||
|
int ucIdx = idx + 3*numFinalized;
|
||||||
|
// delimiter block
|
||||||
|
longs[ucIdx++] = -1;
|
||||||
|
longs[ucIdx++] = -1;
|
||||||
|
longs[ucIdx++] = -1;
|
||||||
|
|
||||||
|
for (BlockReportReplica block : this) {
|
||||||
|
switch (block.getState()) {
|
||||||
|
case FINALIZED: {
|
||||||
|
longs[idx++] = block.getBlockId();
|
||||||
|
longs[idx++] = block.getNumBytes();
|
||||||
|
longs[idx++] = block.getGenerationStamp();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default: {
|
||||||
|
longs[ucIdx++] = block.getBlockId();
|
||||||
|
longs[ucIdx++] = block.getNumBytes();
|
||||||
|
longs[ucIdx++] = block.getGenerationStamp();
|
||||||
|
longs[ucIdx++] = block.getState().getValue();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return longs;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Iterates over blocks in the block report.
|
public Iterator<BlockReportReplica> iterator() {
|
||||||
* Avoids object allocation on each iteration.
|
return new Iterator<BlockReportReplica>() {
|
||||||
*/
|
final BlockReportReplica block = new BlockReportReplica();
|
||||||
@InterfaceAudience.Private
|
final CodedInputStream cis = buffer.newCodedInput();
|
||||||
@InterfaceStability.Evolving
|
private int currentBlockIndex = 0;
|
||||||
public class BlockReportIterator implements Iterator<Block> {
|
|
||||||
private int currentBlockIndex;
|
|
||||||
private final Block block;
|
|
||||||
private ReplicaState currentReplicaState;
|
|
||||||
|
|
||||||
BlockReportIterator() {
|
|
||||||
this.currentBlockIndex = 0;
|
|
||||||
this.block = new Block();
|
|
||||||
this.currentReplicaState = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean hasNext() {
|
public boolean hasNext() {
|
||||||
return currentBlockIndex < getNumberOfBlocks();
|
return currentBlockIndex < numBlocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Block next() {
|
public BlockReportReplica next() {
|
||||||
block.set(blockId(currentBlockIndex),
|
|
||||||
blockLength(currentBlockIndex),
|
|
||||||
blockGenerationStamp(currentBlockIndex));
|
|
||||||
currentReplicaState = blockReplicaState(currentBlockIndex);
|
|
||||||
currentBlockIndex++;
|
currentBlockIndex++;
|
||||||
|
try {
|
||||||
|
// zig-zag to reduce size of legacy blocks and mask off bits
|
||||||
|
// we don't (yet) understand
|
||||||
|
block.setBlockId(cis.readSInt64());
|
||||||
|
block.setNumBytes(cis.readRawVarint64() & NUM_BYTES_MASK);
|
||||||
|
block.setGenerationStamp(cis.readRawVarint64());
|
||||||
|
long state = cis.readRawVarint64() & REPLICA_STATE_MASK;
|
||||||
|
block.setState(ReplicaState.getState((int)state));
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new IllegalStateException(e);
|
||||||
|
}
|
||||||
return block;
|
return block;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void remove() {
|
public void remove() {
|
||||||
throw new UnsupportedOperationException("Sorry. can't remove.");
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
};
|
||||||
/**
|
|
||||||
* Get the state of the current replica.
|
|
||||||
* The state corresponds to the replica returned
|
|
||||||
* by the latest {@link #next()}.
|
|
||||||
*/
|
|
||||||
public ReplicaState getCurrentReplicaState() {
|
|
||||||
return currentReplicaState;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
// decode old style block report of longs
|
||||||
* Returns an iterator over blocks in the block report.
|
private static class LongsDecoder extends BlockListAsLongs {
|
||||||
*/
|
private final List<Long> values;
|
||||||
|
private final int finalizedBlocks;
|
||||||
|
private final int numBlocks;
|
||||||
|
|
||||||
|
// set the header
|
||||||
|
LongsDecoder(List<Long> values) {
|
||||||
|
this.values = values.subList(2, values.size());
|
||||||
|
this.finalizedBlocks = values.get(0).intValue();
|
||||||
|
this.numBlocks = finalizedBlocks + values.get(1).intValue();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Iterator<Block> iterator() {
|
|
||||||
return getBlockReportIterator();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns {@link BlockReportIterator}.
|
|
||||||
*/
|
|
||||||
public BlockReportIterator getBlockReportIterator() {
|
|
||||||
return new BlockReportIterator();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The number of blocks
|
|
||||||
* @return - the number of blocks
|
|
||||||
*/
|
|
||||||
public int getNumberOfBlocks() {
|
public int getNumberOfBlocks() {
|
||||||
assert blockList.length == HEADER_SIZE +
|
return numBlocks;
|
||||||
(blockList[0] + 1) * LONGS_PER_FINALIZED_BLOCK +
|
|
||||||
blockList[1] * LONGS_PER_UC_BLOCK :
|
|
||||||
"Number of blocks is inconcistent with the array length";
|
|
||||||
return getNumberOfFinalizedReplicas() + getNumberOfUCReplicas();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Returns the number of finalized replicas in the block report.
|
public ByteString getBlocksBuffer() {
|
||||||
*/
|
Builder builder = builder();
|
||||||
private int getNumberOfFinalizedReplicas() {
|
for (Replica replica : this) {
|
||||||
return (int)blockList[0];
|
builder.add(replica);
|
||||||
|
}
|
||||||
|
return builder.build().getBlocksBuffer();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Returns the number of under construction replicas in the block report.
|
public long[] getBlockListAsLongs() {
|
||||||
*/
|
long[] longs = new long[2+values.size()];
|
||||||
private int getNumberOfUCReplicas() {
|
longs[0] = finalizedBlocks;
|
||||||
return (int)blockList[1];
|
longs[1] = numBlocks - finalizedBlocks;
|
||||||
|
for (int i=0; i < longs.length; i++) {
|
||||||
|
longs[i] = values.get(i);
|
||||||
|
}
|
||||||
|
return longs;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Returns the id of the specified replica of the block report.
|
public Iterator<BlockReportReplica> iterator() {
|
||||||
*/
|
return new Iterator<BlockReportReplica>() {
|
||||||
private long blockId(int index) {
|
private final BlockReportReplica block = new BlockReportReplica();
|
||||||
return blockList[index2BlockId(index)];
|
final Iterator<Long> iter = values.iterator();
|
||||||
|
private int currentBlockIndex = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
return currentBlockIndex < numBlocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Returns the length of the specified replica of the block report.
|
public BlockReportReplica next() {
|
||||||
*/
|
if (currentBlockIndex == finalizedBlocks) {
|
||||||
private long blockLength(int index) {
|
// verify the presence of the delimiter block
|
||||||
return blockList[index2BlockId(index) + 1];
|
readBlock();
|
||||||
|
Preconditions.checkArgument(block.getBlockId() == -1 &&
|
||||||
|
block.getNumBytes() == -1 &&
|
||||||
|
block.getGenerationStamp() == -1,
|
||||||
|
"Invalid delimiter block");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
readBlock();
|
||||||
* Returns the generation stamp of the specified replica of the block report.
|
if (currentBlockIndex++ < finalizedBlocks) {
|
||||||
*/
|
block.setState(ReplicaState.FINALIZED);
|
||||||
private long blockGenerationStamp(int index) {
|
} else {
|
||||||
return blockList[index2BlockId(index) + 2];
|
block.setState(ReplicaState.getState(iter.next().intValue()));
|
||||||
|
}
|
||||||
|
return block;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
private void readBlock() {
|
||||||
* Returns the state of the specified replica of the block report.
|
block.setBlockId(iter.next());
|
||||||
*/
|
block.setNumBytes(iter.next());
|
||||||
private ReplicaState blockReplicaState(int index) {
|
block.setGenerationStamp(iter.next());
|
||||||
if(index < getNumberOfFinalizedReplicas())
|
|
||||||
return ReplicaState.FINALIZED;
|
|
||||||
return ReplicaState.getState((int)blockList[index2BlockId(index) + 3]);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Corrupt the generation stamp of the block with the given index.
|
public void remove() {
|
||||||
* Not meant to be used outside of tests.
|
throw new UnsupportedOperationException();
|
||||||
*/
|
}
|
||||||
@VisibleForTesting
|
};
|
||||||
public long corruptBlockGSForTesting(final int blockIndex, Random rand) {
|
|
||||||
long oldGS = blockList[index2BlockId(blockIndex) + 2];
|
|
||||||
while (blockList[index2BlockId(blockIndex) + 2] == oldGS) {
|
|
||||||
blockList[index2BlockId(blockIndex) + 2] = rand.nextInt();
|
|
||||||
}
|
}
|
||||||
return oldGS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@InterfaceAudience.Private
|
||||||
* Corrupt the length of the block with the given index by truncation.
|
public static class BlockReportReplica extends Block implements Replica {
|
||||||
* Not meant to be used outside of tests.
|
private ReplicaState state;
|
||||||
*/
|
private BlockReportReplica() {
|
||||||
@VisibleForTesting
|
|
||||||
public long corruptBlockLengthForTesting(final int blockIndex, Random rand) {
|
|
||||||
long oldLength = blockList[index2BlockId(blockIndex) + 1];
|
|
||||||
blockList[index2BlockId(blockIndex) + 1] =
|
|
||||||
rand.nextInt((int) oldLength - 1);
|
|
||||||
return oldLength;
|
|
||||||
}
|
}
|
||||||
|
public BlockReportReplica(Block block) {
|
||||||
/**
|
super(block);
|
||||||
* Set the indexTh block
|
if (block instanceof BlockReportReplica) {
|
||||||
* @param index - the index of the block to set
|
this.state = ((BlockReportReplica)block).getState();
|
||||||
* @param r - the block is set to the value of the this Replica
|
} else {
|
||||||
*/
|
this.state = ReplicaState.FINALIZED;
|
||||||
private void setBlock(final int index, final Replica r) {
|
|
||||||
int pos = index2BlockId(index);
|
|
||||||
blockList[pos] = r.getBlockId();
|
|
||||||
blockList[pos + 1] = r.getNumBytes();
|
|
||||||
blockList[pos + 2] = r.getGenerationStamp();
|
|
||||||
if(index < getNumberOfFinalizedReplicas())
|
|
||||||
return;
|
|
||||||
assert r.getState() != ReplicaState.FINALIZED :
|
|
||||||
"Must be under-construction replica.";
|
|
||||||
blockList[pos + 3] = r.getState().getValue();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the indexTh block
|
|
||||||
* @param index - the index of the block to set
|
|
||||||
* @param b - the block is set to the value of the this Block
|
|
||||||
*/
|
|
||||||
private void setBlock(final int index, final Block b) {
|
|
||||||
int pos = index2BlockId(index);
|
|
||||||
blockList[pos] = b.getBlockId();
|
|
||||||
blockList[pos + 1] = b.getNumBytes();
|
|
||||||
blockList[pos + 2] = b.getGenerationStamp();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the invalid delimiting block between the finalized and
|
|
||||||
* the under-construction lists.
|
|
||||||
* The invalid block has all three fields set to -1.
|
|
||||||
* @param finalizedSzie - the size of the finalized list
|
|
||||||
*/
|
|
||||||
private void setDelimitingBlock(final int finalizedSzie) {
|
|
||||||
int idx = HEADER_SIZE + finalizedSzie * LONGS_PER_FINALIZED_BLOCK;
|
|
||||||
blockList[idx] = -1;
|
|
||||||
blockList[idx+1] = -1;
|
|
||||||
blockList[idx+2] = -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getMaxGsInBlockList() {
|
|
||||||
long maxGs = -1;
|
|
||||||
Iterator<Block> iter = getBlockReportIterator();
|
|
||||||
while (iter.hasNext()) {
|
|
||||||
Block b = iter.next();
|
|
||||||
if (b.getGenerationStamp() > maxGs) {
|
|
||||||
maxGs = b.getGenerationStamp();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return maxGs;
|
public void setState(ReplicaState state) {
|
||||||
|
this.state = state;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public ReplicaState getState() {
|
||||||
|
return state;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public long getBytesOnDisk() {
|
||||||
|
return getNumBytes();
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public long getVisibleLength() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public String getStorageUuid() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public boolean isOnTransientStorage() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
return super.equals(o);
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return super.hashCode();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.util.List;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
@ -51,6 +52,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo.Capability;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||||
|
@ -64,6 +66,7 @@ import org.apache.hadoop.ipc.RpcClientUtil;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.protobuf.RpcController;
|
import com.google.protobuf.RpcController;
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
|
@ -83,6 +86,11 @@ public class DatanodeProtocolClientSideTranslatorPB implements
|
||||||
VersionRequestProto.newBuilder().build();
|
VersionRequestProto.newBuilder().build();
|
||||||
private final static RpcController NULL_CONTROLLER = null;
|
private final static RpcController NULL_CONTROLLER = null;
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public DatanodeProtocolClientSideTranslatorPB(DatanodeProtocolPB rpcProxy) {
|
||||||
|
this.rpcProxy = rpcProxy;
|
||||||
|
}
|
||||||
|
|
||||||
public DatanodeProtocolClientSideTranslatorPB(InetSocketAddress nameNodeAddr,
|
public DatanodeProtocolClientSideTranslatorPB(InetSocketAddress nameNodeAddr,
|
||||||
Configuration conf) throws IOException {
|
Configuration conf) throws IOException {
|
||||||
RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
|
RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
|
||||||
|
@ -166,12 +174,20 @@ public class DatanodeProtocolClientSideTranslatorPB implements
|
||||||
.newBuilder().setRegistration(PBHelper.convert(registration))
|
.newBuilder().setRegistration(PBHelper.convert(registration))
|
||||||
.setBlockPoolId(poolId);
|
.setBlockPoolId(poolId);
|
||||||
|
|
||||||
|
boolean useBlocksBuffer = registration.getNamespaceInfo()
|
||||||
|
.isCapabilitySupported(Capability.STORAGE_BLOCK_REPORT_BUFFERS);
|
||||||
|
|
||||||
for (StorageBlockReport r : reports) {
|
for (StorageBlockReport r : reports) {
|
||||||
StorageBlockReportProto.Builder reportBuilder = StorageBlockReportProto
|
StorageBlockReportProto.Builder reportBuilder = StorageBlockReportProto
|
||||||
.newBuilder().setStorage(PBHelper.convert(r.getStorage()));
|
.newBuilder().setStorage(PBHelper.convert(r.getStorage()));
|
||||||
long[] blocks = r.getBlocks();
|
BlockListAsLongs blocks = r.getBlocks();
|
||||||
for (int i = 0; i < blocks.length; i++) {
|
if (useBlocksBuffer) {
|
||||||
reportBuilder.addBlocks(blocks[i]);
|
reportBuilder.setNumberOfBlocks(blocks.getNumberOfBlocks());
|
||||||
|
reportBuilder.addAllBlocksBuffers(blocks.getBlocksBuffers());
|
||||||
|
} else {
|
||||||
|
for (long value : blocks.getBlockListAsLongs()) {
|
||||||
|
reportBuilder.addBlocks(value);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
builder.addReports(reportBuilder.build());
|
builder.addReports(reportBuilder.build());
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.protocolPB;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
|
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
|
||||||
|
@ -58,6 +59,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.protobuf.RpcController;
|
import com.google.protobuf.RpcController;
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
|
@ -145,10 +147,14 @@ public class DatanodeProtocolServerSideTranslatorPB implements
|
||||||
|
|
||||||
int index = 0;
|
int index = 0;
|
||||||
for (StorageBlockReportProto s : request.getReportsList()) {
|
for (StorageBlockReportProto s : request.getReportsList()) {
|
||||||
List<Long> blockIds = s.getBlocksList();
|
final BlockListAsLongs blocks;
|
||||||
long[] blocks = new long[blockIds.size()];
|
if (s.hasNumberOfBlocks()) { // new style buffer based reports
|
||||||
for (int i = 0; i < blockIds.size(); i++) {
|
int num = (int)s.getNumberOfBlocks();
|
||||||
blocks[i] = blockIds.get(i);
|
Preconditions.checkState(s.getBlocksCount() == 0,
|
||||||
|
"cannot send both blocks list and buffers");
|
||||||
|
blocks = BlockListAsLongs.decodeBuffers(num, s.getBlocksBuffersList());
|
||||||
|
} else {
|
||||||
|
blocks = BlockListAsLongs.decodeLongs(s.getBlocksList());
|
||||||
}
|
}
|
||||||
report[index++] = new StorageBlockReport(PBHelper.convert(s.getStorage()),
|
report[index++] = new StorageBlockReport(PBHelper.convert(s.getStorage()),
|
||||||
blocks);
|
blocks);
|
||||||
|
|
|
@ -574,7 +574,7 @@ public class PBHelper {
|
||||||
StorageInfoProto storage = info.getStorageInfo();
|
StorageInfoProto storage = info.getStorageInfo();
|
||||||
return new NamespaceInfo(storage.getNamespceID(), storage.getClusterID(),
|
return new NamespaceInfo(storage.getNamespceID(), storage.getClusterID(),
|
||||||
info.getBlockPoolID(), storage.getCTime(), info.getBuildVersion(),
|
info.getBlockPoolID(), storage.getCTime(), info.getBuildVersion(),
|
||||||
info.getSoftwareVersion());
|
info.getSoftwareVersion(), info.getCapabilities());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static NamenodeCommand convert(NamenodeCommandProto cmd) {
|
public static NamenodeCommand convert(NamenodeCommandProto cmd) {
|
||||||
|
@ -1234,7 +1234,9 @@ public class PBHelper {
|
||||||
.setBuildVersion(info.getBuildVersion())
|
.setBuildVersion(info.getBuildVersion())
|
||||||
.setUnused(0)
|
.setUnused(0)
|
||||||
.setStorageInfo(PBHelper.convert((StorageInfo)info))
|
.setStorageInfo(PBHelper.convert((StorageInfo)info))
|
||||||
.setSoftwareVersion(info.getSoftwareVersion()).build();
|
.setSoftwareVersion(info.getSoftwareVersion())
|
||||||
|
.setCapabilities(info.getCapabilities())
|
||||||
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Located Block Arrays and Lists
|
// Located Block Arrays and Lists
|
||||||
|
|
|
@ -47,7 +47,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.HAUtil;
|
import org.apache.hadoop.hdfs.HAUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
@ -1971,11 +1971,9 @@ public class BlockManager {
|
||||||
if (report == null) return;
|
if (report == null) return;
|
||||||
assert (namesystem.hasWriteLock());
|
assert (namesystem.hasWriteLock());
|
||||||
assert (storageInfo.numBlocks() == 0);
|
assert (storageInfo.numBlocks() == 0);
|
||||||
BlockReportIterator itBR = report.getBlockReportIterator();
|
|
||||||
|
|
||||||
while(itBR.hasNext()) {
|
for (BlockReportReplica iblk : report) {
|
||||||
Block iblk = itBR.next();
|
ReplicaState reportedState = iblk.getState();
|
||||||
ReplicaState reportedState = itBR.getCurrentReplicaState();
|
|
||||||
|
|
||||||
if (shouldPostponeBlocksFromFuture &&
|
if (shouldPostponeBlocksFromFuture &&
|
||||||
namesystem.isGenStampInFuture(iblk)) {
|
namesystem.isGenStampInFuture(iblk)) {
|
||||||
|
@ -2045,13 +2043,11 @@ public class BlockManager {
|
||||||
int curIndex;
|
int curIndex;
|
||||||
|
|
||||||
if (newReport == null) {
|
if (newReport == null) {
|
||||||
newReport = new BlockListAsLongs();
|
newReport = BlockListAsLongs.EMPTY;
|
||||||
}
|
}
|
||||||
// scan the report and process newly reported blocks
|
// scan the report and process newly reported blocks
|
||||||
BlockReportIterator itBR = newReport.getBlockReportIterator();
|
for (BlockReportReplica iblk : newReport) {
|
||||||
while(itBR.hasNext()) {
|
ReplicaState iState = iblk.getState();
|
||||||
Block iblk = itBR.next();
|
|
||||||
ReplicaState iState = itBR.getCurrentReplicaState();
|
|
||||||
BlockInfoContiguous storedBlock = processReportedBlock(storageInfo,
|
BlockInfoContiguous storedBlock = processReportedBlock(storageInfo,
|
||||||
iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
|
iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
|
||||||
|
|
||||||
|
|
|
@ -228,7 +228,7 @@ class BPServiceActor implements Runnable {
|
||||||
bpos.verifyAndSetNamespaceInfo(nsInfo);
|
bpos.verifyAndSetNamespaceInfo(nsInfo);
|
||||||
|
|
||||||
// Second phase of the handshake with the NN.
|
// Second phase of the handshake with the NN.
|
||||||
register();
|
register(nsInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is useful to make sure NN gets Heartbeat before Blockreport
|
// This is useful to make sure NN gets Heartbeat before Blockreport
|
||||||
|
@ -468,8 +468,7 @@ class BPServiceActor implements Runnable {
|
||||||
|
|
||||||
for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
|
for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
|
||||||
BlockListAsLongs blockList = kvPair.getValue();
|
BlockListAsLongs blockList = kvPair.getValue();
|
||||||
reports[i++] = new StorageBlockReport(
|
reports[i++] = new StorageBlockReport(kvPair.getKey(), blockList);
|
||||||
kvPair.getKey(), blockList.getBlockListAsLongs());
|
|
||||||
totalBlockCount += blockList.getNumberOfBlocks();
|
totalBlockCount += blockList.getNumberOfBlocks();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -774,10 +773,11 @@ class BPServiceActor implements Runnable {
|
||||||
*
|
*
|
||||||
* issued by the namenode to recognize registered datanodes.
|
* issued by the namenode to recognize registered datanodes.
|
||||||
*
|
*
|
||||||
|
* @param nsInfo current NamespaceInfo
|
||||||
* @see FSNamesystem#registerDatanode(DatanodeRegistration)
|
* @see FSNamesystem#registerDatanode(DatanodeRegistration)
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
void register() throws IOException {
|
void register(NamespaceInfo nsInfo) throws IOException {
|
||||||
// The handshake() phase loaded the block pool storage
|
// The handshake() phase loaded the block pool storage
|
||||||
// off disk - so update the bpRegistration object from that info
|
// off disk - so update the bpRegistration object from that info
|
||||||
bpRegistration = bpos.createRegistration();
|
bpRegistration = bpos.createRegistration();
|
||||||
|
@ -788,6 +788,7 @@ class BPServiceActor implements Runnable {
|
||||||
try {
|
try {
|
||||||
// Use returned registration from namenode with updated fields
|
// Use returned registration from namenode with updated fields
|
||||||
bpRegistration = bpNamenode.registerDatanode(bpRegistration);
|
bpRegistration = bpNamenode.registerDatanode(bpRegistration);
|
||||||
|
bpRegistration.setNamespaceInfo(nsInfo);
|
||||||
break;
|
break;
|
||||||
} catch(EOFException e) { // namenode might have just restarted
|
} catch(EOFException e) { // namenode might have just restarted
|
||||||
LOG.info("Problem connecting to server: " + nnAddr + " :"
|
LOG.info("Problem connecting to server: " + nnAddr + " :"
|
||||||
|
@ -915,9 +916,9 @@ class BPServiceActor implements Runnable {
|
||||||
if (shouldRun()) {
|
if (shouldRun()) {
|
||||||
// re-retrieve namespace info to make sure that, if the NN
|
// re-retrieve namespace info to make sure that, if the NN
|
||||||
// was restarted, we still match its version (HDFS-2120)
|
// was restarted, we still match its version (HDFS-2120)
|
||||||
retrieveNamespaceInfo();
|
NamespaceInfo nsInfo = retrieveNamespaceInfo();
|
||||||
// and re-register
|
// and re-register
|
||||||
register();
|
register(nsInfo);
|
||||||
scheduleHeartbeat();
|
scheduleHeartbeat();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1571,30 +1571,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
Map<DatanodeStorage, BlockListAsLongs> blockReportsMap =
|
Map<DatanodeStorage, BlockListAsLongs> blockReportsMap =
|
||||||
new HashMap<DatanodeStorage, BlockListAsLongs>();
|
new HashMap<DatanodeStorage, BlockListAsLongs>();
|
||||||
|
|
||||||
Map<String, ArrayList<ReplicaInfo>> finalized =
|
Map<String, BlockListAsLongs.Builder> builders =
|
||||||
new HashMap<String, ArrayList<ReplicaInfo>>();
|
new HashMap<String, BlockListAsLongs.Builder>();
|
||||||
Map<String, ArrayList<ReplicaInfo>> uc =
|
|
||||||
new HashMap<String, ArrayList<ReplicaInfo>>();
|
|
||||||
|
|
||||||
List<FsVolumeImpl> curVolumes = getVolumes();
|
List<FsVolumeImpl> curVolumes = getVolumes();
|
||||||
for (FsVolumeSpi v : curVolumes) {
|
for (FsVolumeSpi v : curVolumes) {
|
||||||
finalized.put(v.getStorageID(), new ArrayList<ReplicaInfo>());
|
builders.put(v.getStorageID(), BlockListAsLongs.builder());
|
||||||
uc.put(v.getStorageID(), new ArrayList<ReplicaInfo>());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized(this) {
|
synchronized(this) {
|
||||||
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
|
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
|
||||||
switch(b.getState()) {
|
switch(b.getState()) {
|
||||||
case FINALIZED:
|
case FINALIZED:
|
||||||
finalized.get(b.getVolume().getStorageID()).add(b);
|
|
||||||
break;
|
|
||||||
case RBW:
|
case RBW:
|
||||||
case RWR:
|
case RWR:
|
||||||
uc.get(b.getVolume().getStorageID()).add(b);
|
builders.get(b.getVolume().getStorageID()).add(b);
|
||||||
break;
|
break;
|
||||||
case RUR:
|
case RUR:
|
||||||
ReplicaUnderRecovery rur = (ReplicaUnderRecovery)b;
|
ReplicaUnderRecovery rur = (ReplicaUnderRecovery)b;
|
||||||
uc.get(rur.getVolume().getStorageID()).add(rur.getOriginalReplica());
|
builders.get(rur.getVolume().getStorageID())
|
||||||
|
.add(rur.getOriginalReplica());
|
||||||
break;
|
break;
|
||||||
case TEMPORARY:
|
case TEMPORARY:
|
||||||
break;
|
break;
|
||||||
|
@ -1605,10 +1601,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (FsVolumeImpl v : curVolumes) {
|
for (FsVolumeImpl v : curVolumes) {
|
||||||
ArrayList<ReplicaInfo> finalizedList = finalized.get(v.getStorageID());
|
blockReportsMap.put(v.toDatanodeStorage(),
|
||||||
ArrayList<ReplicaInfo> ucList = uc.get(v.getStorageID());
|
builders.get(v.getStorageID()).build());
|
||||||
blockReportsMap.put(((FsVolumeImpl) v).toDatanodeStorage(),
|
|
||||||
new BlockListAsLongs(finalizedList, ucList));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return blockReportsMap;
|
return blockReportsMap;
|
||||||
|
|
|
@ -1296,7 +1296,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
final BlockManager bm = namesystem.getBlockManager();
|
final BlockManager bm = namesystem.getBlockManager();
|
||||||
boolean noStaleStorages = false;
|
boolean noStaleStorages = false;
|
||||||
for(StorageBlockReport r : reports) {
|
for(StorageBlockReport r : reports) {
|
||||||
final BlockListAsLongs blocks = new BlockListAsLongs(r.getBlocks());
|
final BlockListAsLongs blocks = r.getBlocks();
|
||||||
//
|
//
|
||||||
// BlockManager.processReport accumulates information of prior calls
|
// BlockManager.processReport accumulates information of prior calls
|
||||||
// for the same node and storage, so the value returned by the last
|
// for the same node and storage, so the value returned by the last
|
||||||
|
|
|
@ -40,6 +40,7 @@ public class DatanodeRegistration extends DatanodeID
|
||||||
private final StorageInfo storageInfo;
|
private final StorageInfo storageInfo;
|
||||||
private ExportedBlockKeys exportedKeys;
|
private ExportedBlockKeys exportedKeys;
|
||||||
private final String softwareVersion;
|
private final String softwareVersion;
|
||||||
|
private NamespaceInfo nsInfo;
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public DatanodeRegistration(String uuid, DatanodeRegistration dnr) {
|
public DatanodeRegistration(String uuid, DatanodeRegistration dnr) {
|
||||||
|
@ -78,6 +79,14 @@ public class DatanodeRegistration extends DatanodeID
|
||||||
return storageInfo.getLayoutVersion();
|
return storageInfo.getLayoutVersion();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setNamespaceInfo(NamespaceInfo nsInfo) {
|
||||||
|
this.nsInfo = nsInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
public NamespaceInfo getNamespaceInfo() {
|
||||||
|
return nsInfo;
|
||||||
|
}
|
||||||
|
|
||||||
@Override // NodeRegistration
|
@Override // NodeRegistration
|
||||||
public String getRegistrationID() {
|
public String getRegistrationID() {
|
||||||
return Storage.getRegistrationID(storageInfo);
|
return Storage.getRegistrationID(storageInfo);
|
||||||
|
|
|
@ -29,6 +29,9 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||||
import org.apache.hadoop.util.VersionInfo;
|
import org.apache.hadoop.util.VersionInfo;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* NamespaceInfo is returned by the name-node in reply
|
* NamespaceInfo is returned by the name-node in reply
|
||||||
* to a data-node handshake.
|
* to a data-node handshake.
|
||||||
|
@ -40,19 +43,52 @@ public class NamespaceInfo extends StorageInfo {
|
||||||
final String buildVersion;
|
final String buildVersion;
|
||||||
String blockPoolID = ""; // id of the block pool
|
String blockPoolID = ""; // id of the block pool
|
||||||
String softwareVersion;
|
String softwareVersion;
|
||||||
|
long capabilities;
|
||||||
|
|
||||||
|
// only authoritative on the server-side to determine advertisement to
|
||||||
|
// clients. enum will update the supported values
|
||||||
|
private static long CAPABILITIES_SUPPORTED = 0;
|
||||||
|
|
||||||
|
public enum Capability {
|
||||||
|
UNKNOWN(false),
|
||||||
|
STORAGE_BLOCK_REPORT_BUFFERS(true); // use optimized ByteString buffers
|
||||||
|
private final long mask;
|
||||||
|
Capability(boolean isSupported) {
|
||||||
|
int bits = ordinal() - 1;
|
||||||
|
mask = (bits < 0) ? 0 : (1L << bits);
|
||||||
|
if (isSupported) {
|
||||||
|
CAPABILITIES_SUPPORTED |= mask;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public long getMask() {
|
||||||
|
return mask;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// defaults to enabled capabilites since this ctor is for server
|
||||||
public NamespaceInfo() {
|
public NamespaceInfo() {
|
||||||
super(NodeType.NAME_NODE);
|
super(NodeType.NAME_NODE);
|
||||||
buildVersion = null;
|
buildVersion = null;
|
||||||
|
capabilities = CAPABILITIES_SUPPORTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// defaults to enabled capabilites since this ctor is for server
|
||||||
public NamespaceInfo(int nsID, String clusterID, String bpID,
|
public NamespaceInfo(int nsID, String clusterID, String bpID,
|
||||||
long cT, String buildVersion, String softwareVersion) {
|
long cT, String buildVersion, String softwareVersion) {
|
||||||
|
this(nsID, clusterID, bpID, cT, buildVersion, softwareVersion,
|
||||||
|
CAPABILITIES_SUPPORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
// for use by server and/or client
|
||||||
|
public NamespaceInfo(int nsID, String clusterID, String bpID,
|
||||||
|
long cT, String buildVersion, String softwareVersion,
|
||||||
|
long capabilities) {
|
||||||
super(HdfsConstants.NAMENODE_LAYOUT_VERSION, nsID, clusterID, cT,
|
super(HdfsConstants.NAMENODE_LAYOUT_VERSION, nsID, clusterID, cT,
|
||||||
NodeType.NAME_NODE);
|
NodeType.NAME_NODE);
|
||||||
blockPoolID = bpID;
|
blockPoolID = bpID;
|
||||||
this.buildVersion = buildVersion;
|
this.buildVersion = buildVersion;
|
||||||
this.softwareVersion = softwareVersion;
|
this.softwareVersion = softwareVersion;
|
||||||
|
this.capabilities = capabilities;
|
||||||
}
|
}
|
||||||
|
|
||||||
public NamespaceInfo(int nsID, String clusterID, String bpID,
|
public NamespaceInfo(int nsID, String clusterID, String bpID,
|
||||||
|
@ -61,6 +97,22 @@ public class NamespaceInfo extends StorageInfo {
|
||||||
VersionInfo.getVersion());
|
VersionInfo.getVersion());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getCapabilities() {
|
||||||
|
return capabilities;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setCapabilities(long capabilities) {
|
||||||
|
this.capabilities = capabilities;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isCapabilitySupported(Capability capability) {
|
||||||
|
Preconditions.checkArgument(capability != Capability.UNKNOWN,
|
||||||
|
"cannot test for unknown capability");
|
||||||
|
long mask = capability.getMask();
|
||||||
|
return (capabilities & mask) == mask;
|
||||||
|
}
|
||||||
|
|
||||||
public String getBuildVersion() {
|
public String getBuildVersion() {
|
||||||
return buildVersion;
|
return buildVersion;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,14 +18,16 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.protocol;
|
package org.apache.hadoop.hdfs.server.protocol;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Block report for a Datanode storage
|
* Block report for a Datanode storage
|
||||||
*/
|
*/
|
||||||
public class StorageBlockReport {
|
public class StorageBlockReport {
|
||||||
private final DatanodeStorage storage;
|
private final DatanodeStorage storage;
|
||||||
private final long[] blocks;
|
private final BlockListAsLongs blocks;
|
||||||
|
|
||||||
public StorageBlockReport(DatanodeStorage storage, long[] blocks) {
|
public StorageBlockReport(DatanodeStorage storage, BlockListAsLongs blocks) {
|
||||||
this.storage = storage;
|
this.storage = storage;
|
||||||
this.blocks = blocks;
|
this.blocks = blocks;
|
||||||
}
|
}
|
||||||
|
@ -34,7 +36,7 @@ public class StorageBlockReport {
|
||||||
return storage;
|
return storage;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long[] getBlocks() {
|
public BlockListAsLongs getBlocks() {
|
||||||
return blocks;
|
return blocks;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -240,6 +240,8 @@ message BlockReportRequestProto {
|
||||||
message StorageBlockReportProto {
|
message StorageBlockReportProto {
|
||||||
required DatanodeStorageProto storage = 1; // Storage
|
required DatanodeStorageProto storage = 1; // Storage
|
||||||
repeated uint64 blocks = 2 [packed=true];
|
repeated uint64 blocks = 2 [packed=true];
|
||||||
|
optional uint64 numberOfBlocks = 3;
|
||||||
|
repeated bytes blocksBuffers = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -517,6 +517,7 @@ message NamespaceInfoProto {
|
||||||
required string blockPoolID = 3; // block pool used by the namespace
|
required string blockPoolID = 3; // block pool used by the namespace
|
||||||
required StorageInfoProto storageInfo = 4;// Node information
|
required StorageInfoProto storageInfo = 4;// Node information
|
||||||
required string softwareVersion = 5; // Software version number (e.g. 2.0.0)
|
required string softwareVersion = 5; // Software version number (e.g. 2.0.0)
|
||||||
|
optional uint64 capabilities = 6 [default = 0]; // feature flags
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -0,0 +1,237 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.protocol;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||||
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.Replica;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo.Capability;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
|
import com.google.protobuf.RpcController;
|
||||||
|
import com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
|
public class TestBlockListAsLongs {
|
||||||
|
static Block b1 = new Block(1, 11, 111);
|
||||||
|
static Block b2 = new Block(2, 22, 222);
|
||||||
|
static Block b3 = new Block(3, 33, 333);
|
||||||
|
static Block b4 = new Block(4, 44, 444);
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEmptyReport() {
|
||||||
|
BlockListAsLongs blocks = checkReport();
|
||||||
|
assertArrayEquals(
|
||||||
|
new long[] {
|
||||||
|
0, 0,
|
||||||
|
-1, -1, -1 },
|
||||||
|
blocks.getBlockListAsLongs());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFinalized() {
|
||||||
|
BlockListAsLongs blocks = checkReport(
|
||||||
|
new FinalizedReplica(b1, null, null));
|
||||||
|
assertArrayEquals(
|
||||||
|
new long[] {
|
||||||
|
1, 0,
|
||||||
|
1, 11, 111,
|
||||||
|
-1, -1, -1 },
|
||||||
|
blocks.getBlockListAsLongs());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUc() {
|
||||||
|
BlockListAsLongs blocks = checkReport(
|
||||||
|
new ReplicaBeingWritten(b1, null, null, null));
|
||||||
|
assertArrayEquals(
|
||||||
|
new long[] {
|
||||||
|
0, 1,
|
||||||
|
-1, -1, -1,
|
||||||
|
1, 11, 111, ReplicaState.RBW.getValue() },
|
||||||
|
blocks.getBlockListAsLongs());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMix() {
|
||||||
|
BlockListAsLongs blocks = checkReport(
|
||||||
|
new FinalizedReplica(b1, null, null),
|
||||||
|
new FinalizedReplica(b2, null, null),
|
||||||
|
new ReplicaBeingWritten(b3, null, null, null),
|
||||||
|
new ReplicaWaitingToBeRecovered(b4, null, null));
|
||||||
|
assertArrayEquals(
|
||||||
|
new long[] {
|
||||||
|
2, 2,
|
||||||
|
1, 11, 111,
|
||||||
|
2, 22, 222,
|
||||||
|
-1, -1, -1,
|
||||||
|
3, 33, 333, ReplicaState.RBW.getValue(),
|
||||||
|
4, 44, 444, ReplicaState.RWR.getValue() },
|
||||||
|
blocks.getBlockListAsLongs());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFuzz() throws InterruptedException {
|
||||||
|
Replica[] replicas = new Replica[100000];
|
||||||
|
Random rand = new Random(0);
|
||||||
|
for (int i=0; i<replicas.length; i++) {
|
||||||
|
Block b = new Block(rand.nextLong(), i, i<<4);
|
||||||
|
switch (rand.nextInt(2)) {
|
||||||
|
case 0:
|
||||||
|
replicas[i] = new FinalizedReplica(b, null, null);
|
||||||
|
break;
|
||||||
|
case 1:
|
||||||
|
replicas[i] = new ReplicaBeingWritten(b, null, null, null);
|
||||||
|
break;
|
||||||
|
case 2:
|
||||||
|
replicas[i] = new ReplicaWaitingToBeRecovered(b, null, null);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
checkReport(replicas);
|
||||||
|
}
|
||||||
|
|
||||||
|
private BlockListAsLongs checkReport(Replica...replicas) {
|
||||||
|
Map<Long, Replica> expectedReplicas = new HashMap<>();
|
||||||
|
for (Replica replica : replicas) {
|
||||||
|
expectedReplicas.put(replica.getBlockId(), replica);
|
||||||
|
}
|
||||||
|
expectedReplicas = Collections.unmodifiableMap(expectedReplicas);
|
||||||
|
|
||||||
|
// encode the blocks and extract the buffers
|
||||||
|
BlockListAsLongs blocks =
|
||||||
|
BlockListAsLongs.encode(expectedReplicas.values());
|
||||||
|
List<ByteString> buffers = blocks.getBlocksBuffers();
|
||||||
|
|
||||||
|
// convert to old-style list of longs
|
||||||
|
List<Long> longs = new ArrayList<Long>();
|
||||||
|
for (long value : blocks.getBlockListAsLongs()) {
|
||||||
|
longs.add(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
// decode the buffers and verify its contents
|
||||||
|
BlockListAsLongs decodedBlocks =
|
||||||
|
BlockListAsLongs.decodeBuffers(expectedReplicas.size(), buffers);
|
||||||
|
checkReplicas(expectedReplicas, decodedBlocks);
|
||||||
|
|
||||||
|
// decode the long and verify its contents
|
||||||
|
BlockListAsLongs decodedList = BlockListAsLongs.decodeLongs(longs);
|
||||||
|
checkReplicas(expectedReplicas, decodedList);
|
||||||
|
return blocks;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkReplicas(Map<Long,Replica> expectedReplicas,
|
||||||
|
BlockListAsLongs decodedBlocks) {
|
||||||
|
assertEquals(expectedReplicas.size(), decodedBlocks.getNumberOfBlocks());
|
||||||
|
|
||||||
|
Map<Long, Replica> reportReplicas = new HashMap<>(expectedReplicas);
|
||||||
|
for (BlockReportReplica replica : decodedBlocks) {
|
||||||
|
assertNotNull(replica);
|
||||||
|
Replica expected = reportReplicas.remove(replica.getBlockId());
|
||||||
|
assertNotNull(expected);
|
||||||
|
assertEquals("wrong bytes",
|
||||||
|
expected.getNumBytes(), replica.getNumBytes());
|
||||||
|
assertEquals("wrong genstamp",
|
||||||
|
expected.getGenerationStamp(), replica.getGenerationStamp());
|
||||||
|
assertEquals("wrong replica state",
|
||||||
|
expected.getState(), replica.getState());
|
||||||
|
}
|
||||||
|
assertTrue(reportReplicas.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDatanodeDetect() throws ServiceException, IOException {
|
||||||
|
final AtomicReference<BlockReportRequestProto> request =
|
||||||
|
new AtomicReference<>();
|
||||||
|
|
||||||
|
// just capture the outgoing PB
|
||||||
|
DatanodeProtocolPB mockProxy = mock(DatanodeProtocolPB.class);
|
||||||
|
doAnswer(new Answer<BlockReportResponseProto>() {
|
||||||
|
public BlockReportResponseProto answer(InvocationOnMock invocation) {
|
||||||
|
Object[] args = invocation.getArguments();
|
||||||
|
request.set((BlockReportRequestProto) args[1]);
|
||||||
|
return BlockReportResponseProto.newBuilder().build();
|
||||||
|
}
|
||||||
|
}).when(mockProxy).blockReport(any(RpcController.class),
|
||||||
|
any(BlockReportRequestProto.class));
|
||||||
|
|
||||||
|
@SuppressWarnings("resource")
|
||||||
|
DatanodeProtocolClientSideTranslatorPB nn =
|
||||||
|
new DatanodeProtocolClientSideTranslatorPB(mockProxy);
|
||||||
|
|
||||||
|
DatanodeRegistration reg = DFSTestUtil.getLocalDatanodeRegistration();
|
||||||
|
NamespaceInfo nsInfo = new NamespaceInfo(1, "cluster", "bp", 1);
|
||||||
|
reg.setNamespaceInfo(nsInfo);
|
||||||
|
|
||||||
|
Replica r = new FinalizedReplica(new Block(1, 2, 3), null, null);
|
||||||
|
BlockListAsLongs bbl = BlockListAsLongs.encode(Collections.singleton(r));
|
||||||
|
DatanodeStorage storage = new DatanodeStorage("s1");
|
||||||
|
StorageBlockReport[] sbr = { new StorageBlockReport(storage, bbl) };
|
||||||
|
|
||||||
|
// check DN sends new-style BR
|
||||||
|
request.set(null);
|
||||||
|
nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask());
|
||||||
|
nn.blockReport(reg, "pool", sbr);
|
||||||
|
BlockReportRequestProto proto = request.get();
|
||||||
|
assertNotNull(proto);
|
||||||
|
assertTrue(proto.getReports(0).getBlocksList().isEmpty());
|
||||||
|
assertFalse(proto.getReports(0).getBlocksBuffersList().isEmpty());
|
||||||
|
|
||||||
|
// back up to prior version and check DN sends old-style BR
|
||||||
|
request.set(null);
|
||||||
|
nsInfo.setCapabilities(Capability.UNKNOWN.getMask());
|
||||||
|
nn.blockReport(reg, "pool", sbr);
|
||||||
|
proto = request.get();
|
||||||
|
assertNotNull(proto);
|
||||||
|
assertFalse(proto.getReports(0).getBlocksList().isEmpty());
|
||||||
|
assertTrue(proto.getReports(0).getBlocksBuffersList().isEmpty());
|
||||||
|
}
|
||||||
|
}
|
|
@ -555,12 +555,12 @@ public class TestBlockManager {
|
||||||
reset(node);
|
reset(node);
|
||||||
|
|
||||||
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
||||||
new BlockListAsLongs(null, null));
|
BlockListAsLongs.EMPTY);
|
||||||
assertEquals(1, ds.getBlockReportCount());
|
assertEquals(1, ds.getBlockReportCount());
|
||||||
// send block report again, should NOT be processed
|
// send block report again, should NOT be processed
|
||||||
reset(node);
|
reset(node);
|
||||||
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
||||||
new BlockListAsLongs(null, null));
|
BlockListAsLongs.EMPTY);
|
||||||
assertEquals(1, ds.getBlockReportCount());
|
assertEquals(1, ds.getBlockReportCount());
|
||||||
|
|
||||||
// re-register as if node restarted, should update existing node
|
// re-register as if node restarted, should update existing node
|
||||||
|
@ -571,7 +571,7 @@ public class TestBlockManager {
|
||||||
// send block report, should be processed after restart
|
// send block report, should be processed after restart
|
||||||
reset(node);
|
reset(node);
|
||||||
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
||||||
new BlockListAsLongs(null, null));
|
BlockListAsLongs.EMPTY);
|
||||||
// Reinitialize as registration with empty storage list pruned
|
// Reinitialize as registration with empty storage list pruned
|
||||||
// node.storageMap.
|
// node.storageMap.
|
||||||
ds = node.getStorageInfos()[0];
|
ds = node.getStorageInfos()[0];
|
||||||
|
@ -600,7 +600,7 @@ public class TestBlockManager {
|
||||||
reset(node);
|
reset(node);
|
||||||
doReturn(1).when(node).numBlocks();
|
doReturn(1).when(node).numBlocks();
|
||||||
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
||||||
new BlockListAsLongs(null, null));
|
BlockListAsLongs.EMPTY);
|
||||||
assertEquals(1, ds.getBlockReportCount());
|
assertEquals(1, ds.getBlockReportCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||||
|
@ -146,22 +147,32 @@ public abstract class BlockReportTestBase {
|
||||||
|
|
||||||
// Walk the list of blocks until we find one each to corrupt the
|
// Walk the list of blocks until we find one each to corrupt the
|
||||||
// generation stamp and length, if so requested.
|
// generation stamp and length, if so requested.
|
||||||
for (int i = 0; i < blockList.getNumberOfBlocks(); ++i) {
|
BlockListAsLongs.Builder builder = BlockListAsLongs.builder();
|
||||||
|
for (BlockReportReplica block : blockList) {
|
||||||
if (corruptOneBlockGs && !corruptedGs) {
|
if (corruptOneBlockGs && !corruptedGs) {
|
||||||
blockList.corruptBlockGSForTesting(i, rand);
|
long gsOld = block.getGenerationStamp();
|
||||||
LOG.info("Corrupted the GS for block ID " + i);
|
long gsNew;
|
||||||
|
do {
|
||||||
|
gsNew = rand.nextInt();
|
||||||
|
} while (gsNew == gsOld);
|
||||||
|
block.setGenerationStamp(gsNew);
|
||||||
|
LOG.info("Corrupted the GS for block ID " + block);
|
||||||
corruptedGs = true;
|
corruptedGs = true;
|
||||||
} else if (corruptOneBlockLen && !corruptedLen) {
|
} else if (corruptOneBlockLen && !corruptedLen) {
|
||||||
blockList.corruptBlockLengthForTesting(i, rand);
|
long lenOld = block.getNumBytes();
|
||||||
LOG.info("Corrupted the length for block ID " + i);
|
long lenNew;
|
||||||
|
do {
|
||||||
|
lenNew = rand.nextInt((int)lenOld - 1);
|
||||||
|
} while (lenNew == lenOld);
|
||||||
|
block.setNumBytes(lenNew);
|
||||||
|
LOG.info("Corrupted the length for block ID " + block);
|
||||||
corruptedLen = true;
|
corruptedLen = true;
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
builder.add(new BlockReportReplica(block));
|
||||||
}
|
}
|
||||||
|
|
||||||
reports[reportIndex++] =
|
reports[reportIndex++] =
|
||||||
new StorageBlockReport(dnStorage, blockList.getBlockListAsLongs());
|
new StorageBlockReport(dnStorage, builder.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
return reports;
|
return reports;
|
||||||
|
|
|
@ -51,7 +51,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
|
@ -270,7 +269,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ReplicaState getState() {
|
public ReplicaState getState() {
|
||||||
return null;
|
return finalized ? ReplicaState.FINALIZED : ReplicaState.RBW;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -528,7 +527,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void injectBlocks(String bpid,
|
public synchronized void injectBlocks(String bpid,
|
||||||
Iterable<Block> injectBlocks) throws IOException {
|
Iterable<? extends Block> injectBlocks) throws IOException {
|
||||||
ExtendedBlock blk = new ExtendedBlock();
|
ExtendedBlock blk = new ExtendedBlock();
|
||||||
if (injectBlocks != null) {
|
if (injectBlocks != null) {
|
||||||
for (Block b: injectBlocks) { // if any blocks in list is bad, reject list
|
for (Block b: injectBlocks) { // if any blocks in list is bad, reject list
|
||||||
|
@ -581,16 +580,16 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized BlockListAsLongs getBlockReport(String bpid) {
|
synchronized BlockListAsLongs getBlockReport(String bpid) {
|
||||||
final List<Replica> blocks = new ArrayList<Replica>();
|
BlockListAsLongs.Builder report = BlockListAsLongs.builder();
|
||||||
final Map<Block, BInfo> map = blockMap.get(bpid);
|
final Map<Block, BInfo> map = blockMap.get(bpid);
|
||||||
if (map != null) {
|
if (map != null) {
|
||||||
for (BInfo b : map.values()) {
|
for (BInfo b : map.values()) {
|
||||||
if (b.isFinalized()) {
|
if (b.isFinalized()) {
|
||||||
blocks.add(b);
|
report.add(b);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return new BlockListAsLongs(blocks, null);
|
return report.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -107,17 +107,18 @@ public class TestBlockHasMultipleReplicasOnSameDN {
|
||||||
StorageBlockReport reports[] =
|
StorageBlockReport reports[] =
|
||||||
new StorageBlockReport[cluster.getStoragesPerDatanode()];
|
new StorageBlockReport[cluster.getStoragesPerDatanode()];
|
||||||
|
|
||||||
ArrayList<Block> blocks = new ArrayList<Block>();
|
ArrayList<Replica> blocks = new ArrayList<Replica>();
|
||||||
|
|
||||||
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
|
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
|
||||||
blocks.add(locatedBlock.getBlock().getLocalBlock());
|
Block localBlock = locatedBlock.getBlock().getLocalBlock();
|
||||||
|
blocks.add(new FinalizedReplica(localBlock, null, null));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
BlockListAsLongs bll = BlockListAsLongs.encode(blocks);
|
||||||
for (int i = 0; i < cluster.getStoragesPerDatanode(); ++i) {
|
for (int i = 0; i < cluster.getStoragesPerDatanode(); ++i) {
|
||||||
BlockListAsLongs bll = new BlockListAsLongs(blocks);
|
|
||||||
FsVolumeSpi v = dn.getFSDataset().getVolumes().get(i);
|
FsVolumeSpi v = dn.getFSDataset().getVolumes().get(i);
|
||||||
DatanodeStorage dns = new DatanodeStorage(v.getStorageID());
|
DatanodeStorage dns = new DatanodeStorage(v.getStorageID());
|
||||||
reports[i] = new StorageBlockReport(dns, bll.getBlockListAsLongs());
|
reports[i] = new StorageBlockReport(dns, bll);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Should not assert!
|
// Should not assert!
|
||||||
|
|
|
@ -23,11 +23,9 @@ import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assume.assumeTrue;
|
import static org.junit.Assume.assumeTrue;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FilenameFilter;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -181,7 +179,7 @@ public class TestDataNodeVolumeFailure {
|
||||||
DatanodeStorage dnStorage = kvPair.getKey();
|
DatanodeStorage dnStorage = kvPair.getKey();
|
||||||
BlockListAsLongs blockList = kvPair.getValue();
|
BlockListAsLongs blockList = kvPair.getValue();
|
||||||
reports[reportIndex++] =
|
reports[reportIndex++] =
|
||||||
new StorageBlockReport(dnStorage, blockList.getBlockListAsLongs());
|
new StorageBlockReport(dnStorage, blockList);
|
||||||
}
|
}
|
||||||
|
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, bpid, reports);
|
cluster.getNameNodeRpc().blockReport(dnR, bpid, reports);
|
||||||
|
|
|
@ -98,7 +98,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
|
||||||
assertThat(reports.length, is(expectedReportsPerCall));
|
assertThat(reports.length, is(expectedReportsPerCall));
|
||||||
|
|
||||||
for (StorageBlockReport report : reports) {
|
for (StorageBlockReport report : reports) {
|
||||||
BlockListAsLongs blockList = new BlockListAsLongs(report.getBlocks());
|
BlockListAsLongs blockList = report.getBlocks();
|
||||||
numBlocksReported += blockList.getNumberOfBlocks();
|
numBlocksReported += blockList.getNumberOfBlocks();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -196,7 +196,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
|
||||||
final Map<DatanodeStorage, BlockListAsLongs> result =
|
final Map<DatanodeStorage, BlockListAsLongs> result =
|
||||||
new HashMap<DatanodeStorage, BlockListAsLongs>();
|
new HashMap<DatanodeStorage, BlockListAsLongs>();
|
||||||
|
|
||||||
result.put(storage, new BlockListAsLongs(null, null));
|
result.put(storage, BlockListAsLongs.EMPTY);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -36,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
@ -891,9 +893,9 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
NamespaceInfo nsInfo;
|
NamespaceInfo nsInfo;
|
||||||
DatanodeRegistration dnRegistration;
|
DatanodeRegistration dnRegistration;
|
||||||
DatanodeStorage storage; //only one storage
|
DatanodeStorage storage; //only one storage
|
||||||
final ArrayList<Block> blocks;
|
final ArrayList<BlockReportReplica> blocks;
|
||||||
int nrBlocks; // actual number of blocks
|
int nrBlocks; // actual number of blocks
|
||||||
long[] blockReportList;
|
BlockListAsLongs blockReportList;
|
||||||
final int dnIdx;
|
final int dnIdx;
|
||||||
|
|
||||||
private static int getNodePort(int num) throws IOException {
|
private static int getNodePort(int num) throws IOException {
|
||||||
|
@ -904,7 +906,7 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
|
|
||||||
TinyDatanode(int dnIdx, int blockCapacity) throws IOException {
|
TinyDatanode(int dnIdx, int blockCapacity) throws IOException {
|
||||||
this.dnIdx = dnIdx;
|
this.dnIdx = dnIdx;
|
||||||
this.blocks = new ArrayList<Block>(blockCapacity);
|
this.blocks = new ArrayList<BlockReportReplica>(blockCapacity);
|
||||||
this.nrBlocks = 0;
|
this.nrBlocks = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -934,8 +936,7 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
//first block reports
|
//first block reports
|
||||||
storage = new DatanodeStorage(DatanodeStorage.generateUuid());
|
storage = new DatanodeStorage(DatanodeStorage.generateUuid());
|
||||||
final StorageBlockReport[] reports = {
|
final StorageBlockReport[] reports = {
|
||||||
new StorageBlockReport(storage,
|
new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
|
||||||
new BlockListAsLongs(null, null).getBlockListAsLongs())
|
|
||||||
};
|
};
|
||||||
nameNodeProto.blockReport(dnRegistration,
|
nameNodeProto.blockReport(dnRegistration,
|
||||||
nameNode.getNamesystem().getBlockPoolId(), reports);
|
nameNode.getNamesystem().getBlockPoolId(), reports);
|
||||||
|
@ -968,19 +969,21 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
blocks.set(nrBlocks, blk);
|
blocks.set(nrBlocks, new BlockReportReplica(blk));
|
||||||
nrBlocks++;
|
nrBlocks++;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void formBlockReport() {
|
void formBlockReport() {
|
||||||
// fill remaining slots with blocks that do not exist
|
// fill remaining slots with blocks that do not exist
|
||||||
for(int idx = blocks.size()-1; idx >= nrBlocks; idx--)
|
for (int idx = blocks.size()-1; idx >= nrBlocks; idx--) {
|
||||||
blocks.set(idx, new Block(blocks.size() - idx, 0, 0));
|
Block block = new Block(blocks.size() - idx, 0, 0);
|
||||||
blockReportList = new BlockListAsLongs(blocks).getBlockListAsLongs();
|
blocks.set(idx, new BlockReportReplica(block));
|
||||||
|
}
|
||||||
|
blockReportList = BlockListAsLongs.EMPTY;
|
||||||
}
|
}
|
||||||
|
|
||||||
long[] getBlockReportList() {
|
BlockListAsLongs getBlockReportList() {
|
||||||
return blockReportList;
|
return blockReportList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
|
@ -104,7 +105,7 @@ public class TestDeadDatanode {
|
||||||
// Ensure blockReport from dead datanode is rejected with IOException
|
// Ensure blockReport from dead datanode is rejected with IOException
|
||||||
StorageBlockReport[] report = { new StorageBlockReport(
|
StorageBlockReport[] report = { new StorageBlockReport(
|
||||||
new DatanodeStorage(reg.getDatanodeUuid()),
|
new DatanodeStorage(reg.getDatanodeUuid()),
|
||||||
new long[] { 0L, 0L, 0L }) };
|
BlockListAsLongs.EMPTY) };
|
||||||
try {
|
try {
|
||||||
dnp.blockReport(reg, poolId, report);
|
dnp.blockReport(reg, poolId, report);
|
||||||
fail("Expected IOException is not thrown");
|
fail("Expected IOException is not thrown");
|
||||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
|
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
|
||||||
import org.apache.hadoop.hdfs.util.MD5FileUtils;
|
import org.apache.hadoop.hdfs.util.MD5FileUtils;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
@ -219,6 +220,7 @@ public class TestFSImage {
|
||||||
.manageDataDfsDirs(false)
|
.manageDataDfsDirs(false)
|
||||||
.manageNameDfsDirs(false)
|
.manageNameDfsDirs(false)
|
||||||
.waitSafeMode(false)
|
.waitSafeMode(false)
|
||||||
|
.startupOption(StartupOption.UPGRADE)
|
||||||
.build();
|
.build();
|
||||||
try {
|
try {
|
||||||
FileSystem fs = cluster.getFileSystem();
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.OfflineEditsViewerHelper;
|
import org.apache.hadoop.hdfs.server.namenode.OfflineEditsViewerHelper;
|
||||||
import org.apache.hadoop.hdfs.tools.offlineEditsViewer.OfflineEditsViewer.Flags;
|
import org.apache.hadoop.hdfs.tools.offlineEditsViewer.OfflineEditsViewer.Flags;
|
||||||
import org.apache.hadoop.test.PathUtils;
|
import org.apache.hadoop.test.PathUtils;
|
||||||
|
@ -140,8 +141,8 @@ public class TestOfflineEditsViewer {
|
||||||
assertEquals(0, runOev(editsReparsed, editsParsedXml2, "xml", false));
|
assertEquals(0, runOev(editsReparsed, editsParsedXml2, "xml", false));
|
||||||
|
|
||||||
// judgment time
|
// judgment time
|
||||||
assertTrue("Test round trip",
|
assertTrue("Test round trip", FileUtils.contentEqualsIgnoreEOL(
|
||||||
filesEqualIgnoreTrailingZeros(editsParsedXml, editsParsedXml2));
|
new File(editsParsedXml), new File(editsParsedXml2), "UTF-8"));
|
||||||
|
|
||||||
os.close();
|
os.close();
|
||||||
}
|
}
|
||||||
|
@ -238,6 +239,10 @@ public class TestOfflineEditsViewer {
|
||||||
|
|
||||||
ByteBuffer small = ByteBuffer.wrap(DFSTestUtil.loadFile(filenameSmall));
|
ByteBuffer small = ByteBuffer.wrap(DFSTestUtil.loadFile(filenameSmall));
|
||||||
ByteBuffer large = ByteBuffer.wrap(DFSTestUtil.loadFile(filenameLarge));
|
ByteBuffer large = ByteBuffer.wrap(DFSTestUtil.loadFile(filenameLarge));
|
||||||
|
// OEV outputs with the latest layout version, so tweak the old file's
|
||||||
|
// contents to have latest version so checkedin binary files don't
|
||||||
|
// require frequent updates
|
||||||
|
small.put(3, (byte)NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
|
||||||
|
|
||||||
// now correct if it's otherwise
|
// now correct if it's otherwise
|
||||||
if (small.capacity() > large.capacity()) {
|
if (small.capacity() > large.capacity()) {
|
||||||
|
|
Loading…
Reference in New Issue