HADOOP-16192. Fix CallQueue backoff bugs: perform backoff when add() is used and update backoff when refreshed.
(cherry-picked from8c95cb9d6b
) (cherry-picked from0de8b55a09
) (cherry-picked fromd4fbbc83ad
) (cherry-picked frome172fc62ce
)
This commit is contained in:
parent
525f4e52f3
commit
c3973c9ae6
|
@ -221,12 +221,21 @@ public class CallQueueManager<E extends Schedulable>
|
|||
} else if (shouldBackOff(e)) {
|
||||
throwBackoff();
|
||||
} else {
|
||||
add(e);
|
||||
// No need to re-check backoff criteria since they were just checked
|
||||
addInternal(e, false);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean add(E e) {
|
||||
return addInternal(e, true);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
boolean addInternal(E e, boolean checkBackoff) {
|
||||
if (checkBackoff && isClientBackoffEnabled() && shouldBackOff(e)) {
|
||||
throwBackoff();
|
||||
}
|
||||
try {
|
||||
return putRef.get().add(e);
|
||||
} catch (CallQueueOverflowException ex) {
|
||||
|
|
|
@ -661,6 +661,7 @@ public abstract class Server {
|
|||
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
|
||||
callQueue.swapQueue(getSchedulerClass(prefix, conf),
|
||||
getQueueClass(prefix, conf), maxQueueSize, prefix, conf);
|
||||
callQueue.setClientBackoffEnabled(getClientBackoffEnable(prefix, conf));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -434,5 +434,18 @@ public class TestCallQueueManager {
|
|||
}
|
||||
verify(queue, times(0)).put(call);
|
||||
verify(queue, times(0)).add(call);
|
||||
|
||||
// backoff is enabled, add + scheduler backoff = overflow exception.
|
||||
reset(queue);
|
||||
cqm.setClientBackoffEnabled(true);
|
||||
doReturn(Boolean.TRUE).when(cqm).shouldBackOff(call);
|
||||
try {
|
||||
cqm.add(call);
|
||||
fail("didn't fail");
|
||||
} catch (Exception ex) {
|
||||
assertTrue(ex.toString(), ex instanceof CallQueueOverflowException);
|
||||
}
|
||||
verify(queue, times(0)).put(call);
|
||||
verify(queue, times(0)).add(call);
|
||||
}
|
||||
}
|
|
@ -1135,7 +1135,8 @@ public class TestRPC extends TestRpcBase {
|
|||
return null;
|
||||
}
|
||||
}));
|
||||
verify(spy, timeout(500).times(i + 1)).add(Mockito.<Call>anyObject());
|
||||
verify(spy, timeout(500).times(i + 1)).addInternal(
|
||||
Mockito.<Call>anyObject(), Mockito.eq(false));
|
||||
}
|
||||
try {
|
||||
proxy.sleep(null, newSleepRequest(100));
|
||||
|
@ -1206,7 +1207,8 @@ public class TestRPC extends TestRpcBase {
|
|||
return null;
|
||||
}
|
||||
}));
|
||||
verify(spy, timeout(500).times(i + 1)).add(Mockito.<Call>anyObject());
|
||||
verify(spy, timeout(500).times(i + 1)).addInternal(
|
||||
Mockito.<Call>anyObject(), Mockito.eq(false));
|
||||
}
|
||||
// Start another sleep RPC call and verify the call is backed off due to
|
||||
// avg response time(3s) exceeds threshold (2s).
|
||||
|
|
Loading…
Reference in New Issue