HDFS-9466. TestShortCircuitCache#testDataXceiverCleansUpSlotsOnFailure is flaky (Wei-Chiu Chuang via cmccabe)
This commit is contained in:
parent
29d6cadc52
commit
c7921c9bdd
|
@ -394,12 +394,12 @@ public class ShortCircuitRegistry {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static interface Visitor {
|
public static interface Visitor {
|
||||||
void accept(HashMap<ShmId, RegisteredShm> segments,
|
boolean accept(HashMap<ShmId, RegisteredShm> segments,
|
||||||
HashMultimap<ExtendedBlockId, Slot> slots);
|
HashMultimap<ExtendedBlockId, Slot> slots);
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public synchronized void visit(Visitor visitor) {
|
public synchronized boolean visit(Visitor visitor) {
|
||||||
visitor.accept(segments, slots);
|
return visitor.accept(segments, slots);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,7 @@ import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.apache.commons.lang.mutable.MutableBoolean;
|
import org.apache.commons.lang.mutable.MutableBoolean;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -624,16 +625,23 @@ public class TestShortCircuitCache {
|
||||||
}
|
}
|
||||||
|
|
||||||
static private void checkNumberOfSegmentsAndSlots(final int expectedSegments,
|
static private void checkNumberOfSegmentsAndSlots(final int expectedSegments,
|
||||||
final int expectedSlots, ShortCircuitRegistry registry) {
|
final int expectedSlots, final ShortCircuitRegistry registry)
|
||||||
registry.visit(new ShortCircuitRegistry.Visitor() {
|
throws InterruptedException, TimeoutException {
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
@Override
|
@Override
|
||||||
public void accept(HashMap<ShmId, RegisteredShm> segments,
|
public Boolean get() {
|
||||||
|
return registry.visit(new ShortCircuitRegistry.Visitor() {
|
||||||
|
@Override
|
||||||
|
public boolean accept(HashMap<ShmId, RegisteredShm> segments,
|
||||||
HashMultimap<ExtendedBlockId, Slot> slots) {
|
HashMultimap<ExtendedBlockId, Slot> slots) {
|
||||||
Assert.assertEquals(expectedSegments, segments.size());
|
return (expectedSegments == segments.size()) &&
|
||||||
Assert.assertEquals(expectedSlots, slots.size());
|
(expectedSlots == slots.size());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
}, 100, 10000);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public static class TestCleanupFailureInjector
|
public static class TestCleanupFailureInjector
|
||||||
extends BlockReaderFactory.FailureInjector {
|
extends BlockReaderFactory.FailureInjector {
|
||||||
|
@ -774,16 +782,8 @@ public class TestShortCircuitCache {
|
||||||
DFSTestUtil.createFile(fs, TEST_PATH2, 4096, (short)1, 0xFADE2);
|
DFSTestUtil.createFile(fs, TEST_PATH2, 4096, (short)1, 0xFADE2);
|
||||||
DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
|
DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
|
||||||
DFSTestUtil.readFileBuffer(fs, TEST_PATH2);
|
DFSTestUtil.readFileBuffer(fs, TEST_PATH2);
|
||||||
ShortCircuitRegistry registry =
|
checkNumberOfSegmentsAndSlots(1, 2,
|
||||||
cluster.getDataNodes().get(0).getShortCircuitRegistry();
|
cluster.getDataNodes().get(0).getShortCircuitRegistry());
|
||||||
registry.visit(new ShortCircuitRegistry.Visitor() {
|
|
||||||
@Override
|
|
||||||
public void accept(HashMap<ShmId, RegisteredShm> segments,
|
|
||||||
HashMultimap<ExtendedBlockId, Slot> slots) {
|
|
||||||
Assert.assertEquals(1, segments.size());
|
|
||||||
Assert.assertEquals(2, slots.size());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
sockDir.close();
|
sockDir.close();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue