HDFS-15204. TestRetryCacheWithHA testRemoveCacheDescriptor fails intermittently. Contributed by Ahmed Hussein.
(cherry picked from commit 1d4d0fcbe1
)
This commit is contained in:
parent
369f4f9c58
commit
1d89ba72f6
|
@ -34,8 +34,10 @@ import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -1296,7 +1298,7 @@ public class TestRetryCacheWithHA {
|
||||||
*/
|
*/
|
||||||
public void testClientRetryWithFailover(final AtMostOnceOp op)
|
public void testClientRetryWithFailover(final AtMostOnceOp op)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
final Map<String, Object> results = new HashMap<String, Object>();
|
final Map<String, Object> results = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
op.prepare();
|
op.prepare();
|
||||||
// set DummyRetryInvocationHandler#block to true
|
// set DummyRetryInvocationHandler#block to true
|
||||||
|
@ -1309,10 +1311,7 @@ public class TestRetryCacheWithHA {
|
||||||
op.invoke();
|
op.invoke();
|
||||||
Object result = op.getResult();
|
Object result = op.getResult();
|
||||||
LOG.info("Operation " + op.name + " finished");
|
LOG.info("Operation " + op.name + " finished");
|
||||||
synchronized (TestRetryCacheWithHA.this) {
|
|
||||||
results.put(op.name, result == null ? "SUCCESS" : result);
|
results.put(op.name, result == null ? "SUCCESS" : result);
|
||||||
TestRetryCacheWithHA.this.notifyAll();
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.info("Got Exception while calling " + op.name, e);
|
LOG.info("Got Exception while calling " + op.name, e);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -1333,39 +1332,47 @@ public class TestRetryCacheWithHA {
|
||||||
LOG.info("Setting block to false");
|
LOG.info("Setting block to false");
|
||||||
DummyRetryInvocationHandler.block.set(false);
|
DummyRetryInvocationHandler.block.set(false);
|
||||||
|
|
||||||
synchronized (this) {
|
GenericTestUtils.waitFor(() -> results.containsKey(op.name), 5, 10000);
|
||||||
while (!results.containsKey(op.name)) {
|
|
||||||
this.wait();
|
|
||||||
}
|
|
||||||
LOG.info("Got the result of " + op.name + ": "
|
LOG.info("Got the result of " + op.name + ": "
|
||||||
+ results.get(op.name));
|
+ results.get(op.name));
|
||||||
}
|
|
||||||
|
|
||||||
// Waiting for failover.
|
// Waiting for failover.
|
||||||
while (cluster.getNamesystem(1).isInStandbyState()) {
|
GenericTestUtils
|
||||||
Thread.sleep(10);
|
.waitFor(() -> !cluster.getNamesystem(1).isInStandbyState(), 5, 10000);
|
||||||
}
|
|
||||||
|
|
||||||
long hitNN0 = cluster.getNamesystem(0).getRetryCache().getMetricsForTests()
|
final long[] hitsNN = new long[]{0, 0};
|
||||||
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
hitsNN[0] = cluster.getNamesystem(0).getRetryCache()
|
||||||
|
.getMetricsForTests()
|
||||||
.getCacheHit();
|
.getCacheHit();
|
||||||
long hitNN1 = cluster.getNamesystem(1).getRetryCache().getMetricsForTests()
|
hitsNN[1] = cluster.getNamesystem(1).getRetryCache()
|
||||||
|
.getMetricsForTests()
|
||||||
.getCacheHit();
|
.getCacheHit();
|
||||||
assertTrue("CacheHit: " + hitNN0 + ", " + hitNN1,
|
return (hitsNN[0] + hitsNN[1]) > 0;
|
||||||
hitNN0 + hitNN1 > 0);
|
}, 5, 10000);
|
||||||
long updatedNN0 = cluster.getNamesystem(0).getRetryCache()
|
|
||||||
.getMetricsForTests().getCacheUpdated();
|
assertTrue("CacheHit: " + hitsNN[0] + ", " + hitsNN[1],
|
||||||
long updatedNN1 = cluster.getNamesystem(1).getRetryCache()
|
+hitsNN[0] + hitsNN[1] > 0);
|
||||||
.getMetricsForTests().getCacheUpdated();
|
final long[] updatesNN = new long[]{0, 0};
|
||||||
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
updatesNN[0] = cluster.getNamesystem(0).getRetryCache()
|
||||||
|
.getMetricsForTests()
|
||||||
|
.getCacheUpdated();
|
||||||
|
updatesNN[1] = cluster.getNamesystem(1).getRetryCache()
|
||||||
|
.getMetricsForTests()
|
||||||
|
.getCacheUpdated();
|
||||||
|
return updatesNN[0] > 0 && updatesNN[1] > 0;
|
||||||
|
}, 5, 10000);
|
||||||
// Cache updated metrics on NN0 should be >0 since the op was process on NN0
|
// Cache updated metrics on NN0 should be >0 since the op was process on NN0
|
||||||
assertTrue("CacheUpdated on NN0: " + updatedNN0, updatedNN0 > 0);
|
assertTrue("CacheUpdated on NN0: " + updatesNN[0], updatesNN[0] > 0);
|
||||||
// Cache updated metrics on NN0 should be >0 since NN1 applied the editlog
|
// Cache updated metrics on NN0 should be >0 since NN1 applied the editlog
|
||||||
assertTrue("CacheUpdated on NN1: " + updatedNN1, updatedNN1 > 0);
|
assertTrue("CacheUpdated on NN1: " + updatesNN[1], updatesNN[1] > 0);
|
||||||
long expectedUpdateCount = op.getExpectedCacheUpdateCount();
|
long expectedUpdateCount = op.getExpectedCacheUpdateCount();
|
||||||
if (expectedUpdateCount > 0) {
|
if (expectedUpdateCount > 0) {
|
||||||
assertEquals("CacheUpdated on NN0: " + updatedNN0, expectedUpdateCount,
|
assertEquals("CacheUpdated on NN0: " + updatesNN[0], expectedUpdateCount,
|
||||||
updatedNN0);
|
updatesNN[0]);
|
||||||
assertEquals("CacheUpdated on NN0: " + updatedNN1, expectedUpdateCount,
|
assertEquals("CacheUpdated on NN0: " + updatesNN[1], expectedUpdateCount,
|
||||||
updatedNN1);
|
updatesNN[1]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue