HADOOP-16192. Fix CallQueue backoff bugs: perform backoff when add() is used and update backoff when refreshed.

(cherry-picked from 8c95cb9d6b)
(cherry-picked from 0de8b55a09)
(cherry-picked from d4fbbc83ad)
This commit is contained in:
Erik Krogen 2019-03-14 10:12:58 -07:00
parent 5a8445789b
commit e172fc62ce
4 changed files with 28 additions and 4 deletions

View File

@ -221,12 +221,21 @@ public class CallQueueManager<E extends Schedulable>
} else if (shouldBackOff(e)) { } else if (shouldBackOff(e)) {
throwBackoff(); throwBackoff();
} else { } else {
add(e); // No need to re-check backoff criteria since they were just checked
addInternal(e, false);
} }
} }
@Override @Override
public boolean add(E e) { public boolean add(E e) {
return addInternal(e, true);
}
@VisibleForTesting
boolean addInternal(E e, boolean checkBackoff) {
if (checkBackoff && isClientBackoffEnabled() && shouldBackOff(e)) {
throwBackoff();
}
try { try {
return putRef.get().add(e); return putRef.get().add(e);
} catch (CallQueueOverflowException ex) { } catch (CallQueueOverflowException ex) {

View File

@ -662,6 +662,7 @@ public abstract class Server {
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
callQueue.swapQueue(getSchedulerClass(prefix, conf), callQueue.swapQueue(getSchedulerClass(prefix, conf),
getQueueClass(prefix, conf), maxQueueSize, prefix, conf); getQueueClass(prefix, conf), maxQueueSize, prefix, conf);
callQueue.setClientBackoffEnabled(getClientBackoffEnable(prefix, conf));
} }
/** /**

View File

@ -434,5 +434,18 @@ public class TestCallQueueManager {
} }
verify(queue, times(0)).put(call); verify(queue, times(0)).put(call);
verify(queue, times(0)).add(call); verify(queue, times(0)).add(call);
// backoff is enabled, add + scheduler backoff = overflow exception.
reset(queue);
cqm.setClientBackoffEnabled(true);
doReturn(Boolean.TRUE).when(cqm).shouldBackOff(call);
try {
cqm.add(call);
fail("didn't fail");
} catch (Exception ex) {
assertTrue(ex.toString(), ex instanceof CallQueueOverflowException);
}
verify(queue, times(0)).put(call);
verify(queue, times(0)).add(call);
} }
} }

View File

@ -50,7 +50,6 @@ import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.test.MockitoUtil; import org.apache.hadoop.test.MockitoUtil;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox; import org.mockito.internal.util.reflection.Whitebox;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -96,6 +95,8 @@ import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -1123,7 +1124,7 @@ public class TestRPC extends TestRpcBase {
return null; return null;
} }
})); }));
verify(spy, timeout(500).times(i + 1)).add(Mockito.<Call>anyObject()); verify(spy, timeout(500).times(i + 1)).addInternal(any(), eq(false));
} }
try { try {
proxy.sleep(null, newSleepRequest(100)); proxy.sleep(null, newSleepRequest(100));
@ -1194,7 +1195,7 @@ public class TestRPC extends TestRpcBase {
return null; return null;
} }
})); }));
verify(spy, timeout(500).times(i + 1)).add(Mockito.<Call>anyObject()); verify(spy, timeout(500).times(i + 1)).addInternal(any(), eq(false));
} }
// Start another sleep RPC call and verify the call is backed off due to // Start another sleep RPC call and verify the call is backed off due to
// avg response time(3s) exceeds threshold (2s). // avg response time(3s) exceeds threshold (2s).