HADOOP-16192. Fix CallQueue backoff bugs: perform backoff when add() is used and update backoff when refreshed.
(cherry-picked from 8c95cb9d6b
)
This commit is contained in:
parent
4eb0497091
commit
0de8b55a09
|
@ -222,12 +222,21 @@ public class CallQueueManager<E extends Schedulable>
|
||||||
} else if (shouldBackOff(e)) {
|
} else if (shouldBackOff(e)) {
|
||||||
throwBackoff();
|
throwBackoff();
|
||||||
} else {
|
} else {
|
||||||
add(e);
|
// No need to re-check backoff criteria since they were just checked
|
||||||
|
addInternal(e, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean add(E e) {
|
public boolean add(E e) {
|
||||||
|
return addInternal(e, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
boolean addInternal(E e, boolean checkBackoff) {
|
||||||
|
if (checkBackoff && isClientBackoffEnabled() && shouldBackOff(e)) {
|
||||||
|
throwBackoff();
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
return putRef.get().add(e);
|
return putRef.get().add(e);
|
||||||
} catch (CallQueueOverflowException ex) {
|
} catch (CallQueueOverflowException ex) {
|
||||||
|
|
|
@ -666,6 +666,7 @@ public abstract class Server {
|
||||||
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
|
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
|
||||||
callQueue.swapQueue(getSchedulerClass(prefix, conf),
|
callQueue.swapQueue(getSchedulerClass(prefix, conf),
|
||||||
getQueueClass(prefix, conf), maxQueueSize, prefix, conf);
|
getQueueClass(prefix, conf), maxQueueSize, prefix, conf);
|
||||||
|
callQueue.setClientBackoffEnabled(getClientBackoffEnable(prefix, conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -434,5 +434,18 @@ public class TestCallQueueManager {
|
||||||
}
|
}
|
||||||
verify(queue, times(0)).put(call);
|
verify(queue, times(0)).put(call);
|
||||||
verify(queue, times(0)).add(call);
|
verify(queue, times(0)).add(call);
|
||||||
|
|
||||||
|
// backoff is enabled, add + scheduler backoff = overflow exception.
|
||||||
|
reset(queue);
|
||||||
|
cqm.setClientBackoffEnabled(true);
|
||||||
|
doReturn(Boolean.TRUE).when(cqm).shouldBackOff(call);
|
||||||
|
try {
|
||||||
|
cqm.add(call);
|
||||||
|
fail("didn't fail");
|
||||||
|
} catch (Exception ex) {
|
||||||
|
assertTrue(ex.toString(), ex instanceof CallQueueOverflowException);
|
||||||
|
}
|
||||||
|
verify(queue, times(0)).put(call);
|
||||||
|
verify(queue, times(0)).add(call);
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -52,7 +52,6 @@ import org.apache.hadoop.test.Whitebox;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.slf4j.event.Level;
|
import org.slf4j.event.Level;
|
||||||
|
@ -95,6 +94,8 @@ import static org.junit.Assert.assertNotSame;
|
||||||
import static org.junit.Assert.assertSame;
|
import static org.junit.Assert.assertSame;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.timeout;
|
import static org.mockito.Mockito.timeout;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
@ -1131,7 +1132,7 @@ public class TestRPC extends TestRpcBase {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
verify(spy, timeout(500).times(i + 1)).add(Mockito.<Call>anyObject());
|
verify(spy, timeout(500).times(i + 1)).addInternal(any(), eq(false));
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
proxy.sleep(null, newSleepRequest(100));
|
proxy.sleep(null, newSleepRequest(100));
|
||||||
|
@ -1202,7 +1203,7 @@ public class TestRPC extends TestRpcBase {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
verify(spy, timeout(500).times(i + 1)).add(Mockito.<Call>anyObject());
|
verify(spy, timeout(500).times(i + 1)).addInternal(any(), eq(false));
|
||||||
}
|
}
|
||||||
// Start another sleep RPC call and verify the call is backed off due to
|
// Start another sleep RPC call and verify the call is backed off due to
|
||||||
// avg response time(3s) exceeds threshold (2s).
|
// avg response time(3s) exceeds threshold (2s).
|
||||||
|
|
Loading…
Reference in New Issue