HDFS-3391. Fix InvalidateBlocks to compare blocks including their generation stamps. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1339897 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
15ddb6634f
commit
5258d6bf3f
|
@ -220,6 +220,9 @@ Release 2.0.1-alpha - UNRELEASED
|
|||
HDFS-2800. Fix cancellation of checkpoints in the standby node to be more
|
||||
reliable. (todd)
|
||||
|
||||
HDFS-3391. Fix InvalidateBlocks to compare blocks including their
|
||||
generation stamps. (todd)
|
||||
|
||||
Release 2.0.0-alpha - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
|
|||
|
||||
import java.io.PrintWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
@ -54,10 +53,23 @@ class InvalidateBlocks {
|
|||
return numBlocks;
|
||||
}
|
||||
|
||||
/** Does this contain the block which is associated with the storage? */
|
||||
/**
|
||||
* @return true if the given storage has the given block listed for
|
||||
* invalidation. Blocks are compared including their generation stamps:
|
||||
* if a block is pending invalidation but with a different generation stamp,
|
||||
* returns false.
|
||||
* @param storageID the storage to check
|
||||
* @param the block to look for
|
||||
*
|
||||
*/
|
||||
synchronized boolean contains(final String storageID, final Block block) {
|
||||
final Collection<Block> s = node2blocks.get(storageID);
|
||||
return s != null && s.contains(block);
|
||||
final LightWeightHashSet<Block> s = node2blocks.get(storageID);
|
||||
if (s == null) {
|
||||
return false; // no invalidate blocks for this storage ID
|
||||
}
|
||||
Block blockInSet = s.getElement(block);
|
||||
return blockInSet != null &&
|
||||
block.getGenerationStamp() == blockInSet.getGenerationStamp();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -591,7 +591,8 @@ class BPOfferService {
|
|||
processDistributedUpgradeCommand((UpgradeCommand)cmd);
|
||||
break;
|
||||
case DatanodeProtocol.DNA_RECOVERBLOCK:
|
||||
dn.recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
|
||||
String who = "NameNode at " + actor.getNNSocketAddress();
|
||||
dn.recoverBlocks(who, ((BlockRecoveryCommand)cmd).getRecoveringBlocks());
|
||||
break;
|
||||
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
|
||||
LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
|
||||
|
|
|
@ -163,6 +163,7 @@ import org.apache.hadoop.util.StringUtils;
|
|||
import org.apache.hadoop.util.VersionInfo;
|
||||
import org.mortbay.util.ajax.JSON;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.protobuf.BlockingService;
|
||||
|
@ -1706,13 +1707,16 @@ public class DataNode extends Configured
|
|||
secureMain(args, null);
|
||||
}
|
||||
|
||||
public Daemon recoverBlocks(final Collection<RecoveringBlock> blocks) {
|
||||
public Daemon recoverBlocks(
|
||||
final String who,
|
||||
final Collection<RecoveringBlock> blocks) {
|
||||
|
||||
Daemon d = new Daemon(threadGroup, new Runnable() {
|
||||
/** Recover a list of blocks. It is run by the primary datanode. */
|
||||
public void run() {
|
||||
for(RecoveringBlock b : blocks) {
|
||||
try {
|
||||
logRecoverBlock("NameNode", b.getBlock(), b.getLocations());
|
||||
logRecoverBlock(who, b);
|
||||
recoverBlock(b);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("recoverBlocks FAILED: " + b, e);
|
||||
|
@ -1973,14 +1977,13 @@ public class DataNode extends Configured
|
|||
datanodes, storages);
|
||||
}
|
||||
|
||||
private static void logRecoverBlock(String who,
|
||||
ExtendedBlock block, DatanodeID[] targets) {
|
||||
StringBuilder msg = new StringBuilder(targets[0].toString());
|
||||
for (int i = 1; i < targets.length; i++) {
|
||||
msg.append(", " + targets[i]);
|
||||
}
|
||||
private static void logRecoverBlock(String who, RecoveringBlock rb) {
|
||||
ExtendedBlock block = rb.getBlock();
|
||||
DatanodeInfo[] targets = rb.getLocations();
|
||||
|
||||
LOG.info(who + " calls recoverBlock(block=" + block
|
||||
+ ", targets=[" + msg + "])");
|
||||
+ ", targets=[" + Joiner.on(", ").join(targets) + "]"
|
||||
+ ", newGenerationStamp=" + rb.getNewGenerationStamp() + ")");
|
||||
}
|
||||
|
||||
@Override // ClientDataNodeProtocol
|
||||
|
|
|
@ -173,31 +173,42 @@ public class LightWeightHashSet<T> implements Collection<T> {
|
|||
* @return true if element present, false otherwise.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public boolean contains(final Object key) {
|
||||
return getElement((T)key) != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the element in this set which is equal to
|
||||
* the given key, if such an element exists.
|
||||
* Otherwise returns null.
|
||||
*/
|
||||
public T getElement(final T key) {
|
||||
// validate key
|
||||
if (key == null) {
|
||||
throw new IllegalArgumentException("Null element is not supported.");
|
||||
}
|
||||
// find element
|
||||
final int hashCode = ((T)key).hashCode();
|
||||
final int hashCode = key.hashCode();
|
||||
final int index = getIndex(hashCode);
|
||||
return containsElem(index, (T) key, hashCode);
|
||||
return getContainedElem(index, key, hashCode);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the set contains given element at given index.
|
||||
* Check if the set contains given element at given index. If it
|
||||
* does, return that element.
|
||||
*
|
||||
* @return true if element present, false otherwise.
|
||||
* @return the element, or null, if no element matches
|
||||
*/
|
||||
protected boolean containsElem(int index, final T key, int hashCode) {
|
||||
protected T getContainedElem(int index, final T key, int hashCode) {
|
||||
for (LinkedElement<T> e = entries[index]; e != null; e = e.next) {
|
||||
// element found
|
||||
if (hashCode == e.hashCode && e.element.equals(key)) {
|
||||
return true;
|
||||
return e.element;
|
||||
}
|
||||
}
|
||||
// element not found
|
||||
return false;
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -240,7 +251,7 @@ public class LightWeightHashSet<T> implements Collection<T> {
|
|||
final int hashCode = element.hashCode();
|
||||
final int index = getIndex(hashCode);
|
||||
// return false if already present
|
||||
if (containsElem(index, element, hashCode)) {
|
||||
if (getContainedElem(index, element, hashCode) != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -88,7 +88,7 @@ public class LightWeightLinkedSet<T> extends LightWeightHashSet<T> {
|
|||
final int hashCode = element.hashCode();
|
||||
final int index = getIndex(hashCode);
|
||||
// return false if already present
|
||||
if (containsElem(index, element, hashCode)) {
|
||||
if (getContainedElem(index, element, hashCode) != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -425,7 +425,7 @@ public class TestBlockRecovery {
|
|||
DataNode spyDN = spy(dn);
|
||||
doThrow(new RecoveryInProgressException("Replica recovery is in progress")).
|
||||
when(spyDN).initReplicaRecovery(any(RecoveringBlock.class));
|
||||
Daemon d = spyDN.recoverBlocks(initRecoveringBlocks());
|
||||
Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
|
||||
d.join();
|
||||
verify(spyDN, never()).syncBlock(
|
||||
any(RecoveringBlock.class), anyListOf(BlockRecord.class));
|
||||
|
@ -445,7 +445,7 @@ public class TestBlockRecovery {
|
|||
DataNode spyDN = spy(dn);
|
||||
doThrow(new IOException()).
|
||||
when(spyDN).initReplicaRecovery(any(RecoveringBlock.class));
|
||||
Daemon d = spyDN.recoverBlocks(initRecoveringBlocks());
|
||||
Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
|
||||
d.join();
|
||||
verify(spyDN, never()).syncBlock(
|
||||
any(RecoveringBlock.class), anyListOf(BlockRecord.class));
|
||||
|
@ -465,7 +465,7 @@ public class TestBlockRecovery {
|
|||
doReturn(new ReplicaRecoveryInfo(block.getBlockId(), 0,
|
||||
block.getGenerationStamp(), ReplicaState.FINALIZED)).when(spyDN).
|
||||
initReplicaRecovery(any(RecoveringBlock.class));
|
||||
Daemon d = spyDN.recoverBlocks(initRecoveringBlocks());
|
||||
Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
|
||||
d.join();
|
||||
DatanodeProtocol dnP = dn.getActiveNamenodeForBP(POOL_ID);
|
||||
verify(dnP).commitBlockSynchronization(
|
||||
|
|
|
@ -422,4 +422,47 @@ public class TestLightWeightHashSet{
|
|||
LOG.info("Test other - DONE");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetElement() {
|
||||
LightWeightHashSet<TestObject> objSet = new LightWeightHashSet<TestObject>();
|
||||
TestObject objA = new TestObject("object A");
|
||||
TestObject equalToObjA = new TestObject("object A");
|
||||
TestObject objB = new TestObject("object B");
|
||||
objSet.add(objA);
|
||||
objSet.add(objB);
|
||||
|
||||
assertSame(objA, objSet.getElement(objA));
|
||||
assertSame(objA, objSet.getElement(equalToObjA));
|
||||
assertSame(objB, objSet.getElement(objB));
|
||||
assertNull(objSet.getElement(new TestObject("not in set")));
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper class which is used in
|
||||
* {@link TestLightWeightHashSet#testGetElement()}
|
||||
*/
|
||||
private static class TestObject {
|
||||
private final String value;
|
||||
|
||||
public TestObject(String value) {
|
||||
super();
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return value.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) return true;
|
||||
if (obj == null) return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
TestObject other = (TestObject) obj;
|
||||
return this.value.equals(other.value);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue