HDFS-3391. Fix InvalidateBlocks to compare blocks including their generation stamps. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1339898 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c86a4cbe93
commit
e83cf11d92
|
@ -73,6 +73,9 @@ Release 2.0.1-alpha - UNRELEASED
|
||||||
HDFS-2800. Fix cancellation of checkpoints in the standby node to be more
|
HDFS-2800. Fix cancellation of checkpoints in the standby node to be more
|
||||||
reliable. (todd)
|
reliable. (todd)
|
||||||
|
|
||||||
|
HDFS-3391. Fix InvalidateBlocks to compare blocks including their
|
||||||
|
generation stamps. (todd)
|
||||||
|
|
||||||
Release 2.0.0-alpha - UNRELEASED
|
Release 2.0.0-alpha - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -56,10 +55,23 @@ class InvalidateBlocks {
|
||||||
return numBlocks;
|
return numBlocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Does this contain the block which is associated with the storage? */
|
/**
|
||||||
|
* @return true if the given storage has the given block listed for
|
||||||
|
* invalidation. Blocks are compared including their generation stamps:
|
||||||
|
* if a block is pending invalidation but with a different generation stamp,
|
||||||
|
* returns false.
|
||||||
|
* @param storageID the storage to check
|
||||||
|
* @param the block to look for
|
||||||
|
*
|
||||||
|
*/
|
||||||
synchronized boolean contains(final String storageID, final Block block) {
|
synchronized boolean contains(final String storageID, final Block block) {
|
||||||
final Collection<Block> s = node2blocks.get(storageID);
|
final LightWeightHashSet<Block> s = node2blocks.get(storageID);
|
||||||
return s != null && s.contains(block);
|
if (s == null) {
|
||||||
|
return false; // no invalidate blocks for this storage ID
|
||||||
|
}
|
||||||
|
Block blockInSet = s.getElement(block);
|
||||||
|
return blockInSet != null &&
|
||||||
|
block.getGenerationStamp() == blockInSet.getGenerationStamp();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -592,7 +592,8 @@ class BPOfferService {
|
||||||
processDistributedUpgradeCommand((UpgradeCommand)cmd);
|
processDistributedUpgradeCommand((UpgradeCommand)cmd);
|
||||||
break;
|
break;
|
||||||
case DatanodeProtocol.DNA_RECOVERBLOCK:
|
case DatanodeProtocol.DNA_RECOVERBLOCK:
|
||||||
dn.recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
|
String who = "NameNode at " + actor.getNNSocketAddress();
|
||||||
|
dn.recoverBlocks(who, ((BlockRecoveryCommand)cmd).getRecoveringBlocks());
|
||||||
break;
|
break;
|
||||||
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
|
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
|
||||||
LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
|
LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
|
||||||
|
|
|
@ -165,6 +165,7 @@ import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.VersionInfo;
|
import org.apache.hadoop.util.VersionInfo;
|
||||||
import org.mortbay.util.ajax.JSON;
|
import org.mortbay.util.ajax.JSON;
|
||||||
|
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
@ -1710,13 +1711,16 @@ public class DataNode extends Configured
|
||||||
secureMain(args, null);
|
secureMain(args, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Daemon recoverBlocks(final Collection<RecoveringBlock> blocks) {
|
public Daemon recoverBlocks(
|
||||||
|
final String who,
|
||||||
|
final Collection<RecoveringBlock> blocks) {
|
||||||
|
|
||||||
Daemon d = new Daemon(threadGroup, new Runnable() {
|
Daemon d = new Daemon(threadGroup, new Runnable() {
|
||||||
/** Recover a list of blocks. It is run by the primary datanode. */
|
/** Recover a list of blocks. It is run by the primary datanode. */
|
||||||
public void run() {
|
public void run() {
|
||||||
for(RecoveringBlock b : blocks) {
|
for(RecoveringBlock b : blocks) {
|
||||||
try {
|
try {
|
||||||
logRecoverBlock("NameNode", b.getBlock(), b.getLocations());
|
logRecoverBlock(who, b);
|
||||||
recoverBlock(b);
|
recoverBlock(b);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("recoverBlocks FAILED: " + b, e);
|
LOG.warn("recoverBlocks FAILED: " + b, e);
|
||||||
|
@ -1977,14 +1981,13 @@ public class DataNode extends Configured
|
||||||
datanodes, storages);
|
datanodes, storages);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void logRecoverBlock(String who,
|
private static void logRecoverBlock(String who, RecoveringBlock rb) {
|
||||||
ExtendedBlock block, DatanodeID[] targets) {
|
ExtendedBlock block = rb.getBlock();
|
||||||
StringBuilder msg = new StringBuilder(targets[0].toString());
|
DatanodeInfo[] targets = rb.getLocations();
|
||||||
for (int i = 1; i < targets.length; i++) {
|
|
||||||
msg.append(", " + targets[i]);
|
|
||||||
}
|
|
||||||
LOG.info(who + " calls recoverBlock(block=" + block
|
LOG.info(who + " calls recoverBlock(block=" + block
|
||||||
+ ", targets=[" + msg + "])");
|
+ ", targets=[" + Joiner.on(", ").join(targets) + "]"
|
||||||
|
+ ", newGenerationStamp=" + rb.getNewGenerationStamp() + ")");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // ClientDataNodeProtocol
|
@Override // ClientDataNodeProtocol
|
||||||
|
|
|
@ -174,31 +174,42 @@ public class LightWeightHashSet<T> implements Collection<T> {
|
||||||
* @return true if element present, false otherwise.
|
* @return true if element present, false otherwise.
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
@Override
|
||||||
public boolean contains(final Object key) {
|
public boolean contains(final Object key) {
|
||||||
|
return getElement((T)key) != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the element in this set which is equal to
|
||||||
|
* the given key, if such an element exists.
|
||||||
|
* Otherwise returns null.
|
||||||
|
*/
|
||||||
|
public T getElement(final T key) {
|
||||||
// validate key
|
// validate key
|
||||||
if (key == null) {
|
if (key == null) {
|
||||||
throw new IllegalArgumentException("Null element is not supported.");
|
throw new IllegalArgumentException("Null element is not supported.");
|
||||||
}
|
}
|
||||||
// find element
|
// find element
|
||||||
final int hashCode = ((T)key).hashCode();
|
final int hashCode = key.hashCode();
|
||||||
final int index = getIndex(hashCode);
|
final int index = getIndex(hashCode);
|
||||||
return containsElem(index, (T) key, hashCode);
|
return getContainedElem(index, key, hashCode);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if the set contains given element at given index.
|
* Check if the set contains given element at given index. If it
|
||||||
|
* does, return that element.
|
||||||
*
|
*
|
||||||
* @return true if element present, false otherwise.
|
* @return the element, or null, if no element matches
|
||||||
*/
|
*/
|
||||||
protected boolean containsElem(int index, final T key, int hashCode) {
|
protected T getContainedElem(int index, final T key, int hashCode) {
|
||||||
for (LinkedElement<T> e = entries[index]; e != null; e = e.next) {
|
for (LinkedElement<T> e = entries[index]; e != null; e = e.next) {
|
||||||
// element found
|
// element found
|
||||||
if (hashCode == e.hashCode && e.element.equals(key)) {
|
if (hashCode == e.hashCode && e.element.equals(key)) {
|
||||||
return true;
|
return e.element;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// element not found
|
// element not found
|
||||||
return false;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -241,7 +252,7 @@ public class LightWeightHashSet<T> implements Collection<T> {
|
||||||
final int hashCode = element.hashCode();
|
final int hashCode = element.hashCode();
|
||||||
final int index = getIndex(hashCode);
|
final int index = getIndex(hashCode);
|
||||||
// return false if already present
|
// return false if already present
|
||||||
if (containsElem(index, element, hashCode)) {
|
if (getContainedElem(index, element, hashCode) != null) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -88,7 +88,7 @@ public class LightWeightLinkedSet<T> extends LightWeightHashSet<T> {
|
||||||
final int hashCode = element.hashCode();
|
final int hashCode = element.hashCode();
|
||||||
final int index = getIndex(hashCode);
|
final int index = getIndex(hashCode);
|
||||||
// return false if already present
|
// return false if already present
|
||||||
if (containsElem(index, element, hashCode)) {
|
if (getContainedElem(index, element, hashCode) != null) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -425,7 +425,7 @@ public class TestBlockRecovery {
|
||||||
DataNode spyDN = spy(dn);
|
DataNode spyDN = spy(dn);
|
||||||
doThrow(new RecoveryInProgressException("Replica recovery is in progress")).
|
doThrow(new RecoveryInProgressException("Replica recovery is in progress")).
|
||||||
when(spyDN).initReplicaRecovery(any(RecoveringBlock.class));
|
when(spyDN).initReplicaRecovery(any(RecoveringBlock.class));
|
||||||
Daemon d = spyDN.recoverBlocks(initRecoveringBlocks());
|
Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
|
||||||
d.join();
|
d.join();
|
||||||
verify(spyDN, never()).syncBlock(
|
verify(spyDN, never()).syncBlock(
|
||||||
any(RecoveringBlock.class), anyListOf(BlockRecord.class));
|
any(RecoveringBlock.class), anyListOf(BlockRecord.class));
|
||||||
|
@ -445,7 +445,7 @@ public class TestBlockRecovery {
|
||||||
DataNode spyDN = spy(dn);
|
DataNode spyDN = spy(dn);
|
||||||
doThrow(new IOException()).
|
doThrow(new IOException()).
|
||||||
when(spyDN).initReplicaRecovery(any(RecoveringBlock.class));
|
when(spyDN).initReplicaRecovery(any(RecoveringBlock.class));
|
||||||
Daemon d = spyDN.recoverBlocks(initRecoveringBlocks());
|
Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
|
||||||
d.join();
|
d.join();
|
||||||
verify(spyDN, never()).syncBlock(
|
verify(spyDN, never()).syncBlock(
|
||||||
any(RecoveringBlock.class), anyListOf(BlockRecord.class));
|
any(RecoveringBlock.class), anyListOf(BlockRecord.class));
|
||||||
|
@ -465,7 +465,7 @@ public class TestBlockRecovery {
|
||||||
doReturn(new ReplicaRecoveryInfo(block.getBlockId(), 0,
|
doReturn(new ReplicaRecoveryInfo(block.getBlockId(), 0,
|
||||||
block.getGenerationStamp(), ReplicaState.FINALIZED)).when(spyDN).
|
block.getGenerationStamp(), ReplicaState.FINALIZED)).when(spyDN).
|
||||||
initReplicaRecovery(any(RecoveringBlock.class));
|
initReplicaRecovery(any(RecoveringBlock.class));
|
||||||
Daemon d = spyDN.recoverBlocks(initRecoveringBlocks());
|
Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
|
||||||
d.join();
|
d.join();
|
||||||
DatanodeProtocol dnP = dn.getActiveNamenodeForBP(POOL_ID);
|
DatanodeProtocol dnP = dn.getActiveNamenodeForBP(POOL_ID);
|
||||||
verify(dnP).commitBlockSynchronization(
|
verify(dnP).commitBlockSynchronization(
|
||||||
|
|
|
@ -421,5 +421,48 @@ public class TestLightWeightHashSet{
|
||||||
|
|
||||||
LOG.info("Test other - DONE");
|
LOG.info("Test other - DONE");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetElement() {
|
||||||
|
LightWeightHashSet<TestObject> objSet = new LightWeightHashSet<TestObject>();
|
||||||
|
TestObject objA = new TestObject("object A");
|
||||||
|
TestObject equalToObjA = new TestObject("object A");
|
||||||
|
TestObject objB = new TestObject("object B");
|
||||||
|
objSet.add(objA);
|
||||||
|
objSet.add(objB);
|
||||||
|
|
||||||
|
assertSame(objA, objSet.getElement(objA));
|
||||||
|
assertSame(objA, objSet.getElement(equalToObjA));
|
||||||
|
assertSame(objB, objSet.getElement(objB));
|
||||||
|
assertNull(objSet.getElement(new TestObject("not in set")));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrapper class which is used in
|
||||||
|
* {@link TestLightWeightHashSet#testGetElement()}
|
||||||
|
*/
|
||||||
|
private static class TestObject {
|
||||||
|
private final String value;
|
||||||
|
|
||||||
|
public TestObject(String value) {
|
||||||
|
super();
|
||||||
|
this.value = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return value.hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (this == obj) return true;
|
||||||
|
if (obj == null) return false;
|
||||||
|
if (getClass() != obj.getClass())
|
||||||
|
return false;
|
||||||
|
TestObject other = (TestObject) obj;
|
||||||
|
return this.value.equals(other.value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
Loading…
Reference in New Issue