HBASE-3899. Add ability for delayed RPC calls to set return value immediately at call return.

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1154366 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-08-05 20:05:09 +00:00
parent 143acb2d9d
commit d8b463d176
4 changed files with 166 additions and 21 deletions

View File

@ -358,6 +358,8 @@ Release 0.91.0 - Unreleased
HBASE-3810 Registering a coprocessor in HTableDescriptor should be easier
(Mingjie Lai via garyh)
HBASE-4158 Upgrade pom.xml to surefire 2.9 (Aaron Kushner & Mikhail)
HBASE-3899 Add ability for delayed RPC calls to set return value
immediately at call return. (Vlad Dogaru via todd)
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel

View File

@ -21,8 +21,6 @@ package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
/**
* A call whose response can be delayed by the server.
*/
@ -30,8 +28,12 @@ public interface Delayable {
/**
* Signal that the call response should be delayed, thus freeing the RPC
* server to handle different requests.
*
* @param delayReturnValue Controls whether the return value of the call
* should be set when ending the delay or right away. There are cases when
* the return value can be set right away, even if the call is delayed.
*/
public void startDelay();
public void startDelay(boolean delayReturnValue);
/**
* @return is the call delayed?
@ -39,10 +41,31 @@ public interface Delayable {
public boolean isDelayed();
/**
* Signal that the response to the call is ready and the RPC server is now
* allowed to send the response.
* @param result The result to return to the caller.
* @return is the return value delayed?
*/
public boolean isReturnValueDelayed();
/**
* Signal that the RPC server is now allowed to send the response.
* @param result The value to return to the caller. If the corresponding
* {@link #delayResponse(boolean)} specified that the return value should
* not be delayed, this parameter must be null.
* @throws IOException
*/
public void endDelay(Object result) throws IOException;
/**
* Signal the end of a delayed RPC, without specifying the return value. Use
* this only if the return value was not delayed
* @throws IOException
*/
public void endDelay() throws IOException;
/**
* End the call, throwing and exception to the caller. This works regardless
* of the return value being delayed.
* @param t Object to throw to the client.
* @throws IOException
*/
public void endDelayThrowing(Throwable t) throws IOException;
}

View File

@ -256,6 +256,9 @@ public abstract class HBaseServer implements RpcServer {
protected ByteBuffer response; // the response for this call
protected boolean delayResponse;
protected Responder responder;
protected boolean delayReturnValue; // if the return value should be
// set at call completion
protected boolean isError;
public Call(int id, Writable param, Connection connection,
Responder responder) {
@ -266,6 +269,7 @@ public abstract class HBaseServer implements RpcServer {
this.response = null;
this.delayResponse = false;
this.responder = responder;
this.isError = false;
}
@Override
@ -275,6 +279,14 @@ public abstract class HBaseServer implements RpcServer {
private synchronized void setResponse(Object value, String errorClass,
String error) {
// Avoid overwriting an error value in the response. This can happen if
// endDelayThrowing is called by another thread before the actual call
// returning.
if (this.isError)
return;
if (errorClass != null) {
this.isError = true;
}
Writable result = null;
if (value instanceof Writable) {
result = (Writable) value;
@ -334,16 +346,24 @@ public abstract class HBaseServer implements RpcServer {
@Override
public synchronized void endDelay(Object result) throws IOException {
assert this.delayResponse;
assert this.delayReturnValue || result == null;
this.delayResponse = false;
delayedCalls.decrementAndGet();
this.setResponse(result, null, null);
if (this.delayReturnValue)
this.setResponse(result, null, null);
this.responder.doRespond(this);
}
@Override
public synchronized void startDelay() {
public synchronized void endDelay() throws IOException {
this.endDelay(null);
}
@Override
public synchronized void startDelay(boolean delayReturnValue) {
assert !this.delayResponse;
this.delayResponse = true;
this.delayReturnValue = delayReturnValue;
int numDelayed = delayedCalls.incrementAndGet();
if (numDelayed > warnDelayedCalls) {
LOG.warn("Too many delayed calls: limit " + warnDelayedCalls +
@ -351,11 +371,24 @@ public abstract class HBaseServer implements RpcServer {
}
}
@Override
public synchronized void endDelayThrowing(Throwable t) throws IOException {
this.setResponse(null, t.getClass().toString(),
StringUtils.stringifyException(t));
this.delayResponse = false;
this.sendResponseIfReady();
}
@Override
public synchronized boolean isDelayed() {
return this.delayResponse;
}
@Override
public synchronized boolean isReturnValueDelayed() {
return this.delayReturnValue;
}
/**
* If we have a response, and delay is not set, then respond
* immediately. Otherwise, do not respond to client. This is
@ -1194,7 +1227,9 @@ public abstract class HBaseServer implements RpcServer {
}
CurCall.set(null);
if (!call.isDelayed()) {
// Set the response for undelayed calls and delayed calls with
// undelayed responses.
if (!call.isDelayed() || !call.isReturnValueDelayed()) {
call.setResponse(value, errorClass, error);
}
call.sendResponseIfReady();

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.ipc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -48,17 +49,26 @@ public class TestDelayedRpc {
public static final int DELAYED = 1;
@Test
public void testDelayedRpc() throws Exception {
public void testDelayedRpcImmediateReturnValue() throws Exception {
testDelayedRpc(false);
}
@Test
public void testDelayedRpcDelayedReturnValue() throws Exception {
testDelayedRpc(true);
}
private void testDelayedRpc(boolean delayReturnValue) throws Exception {
Configuration conf = HBaseConfiguration.create();
InetSocketAddress isa = new InetSocketAddress("localhost", 0);
rpcServer = HBaseRPC.getServer(new TestRpcImpl(),
rpcServer = HBaseRPC.getServer(new TestRpcImpl(delayReturnValue),
new Class<?>[]{ TestRpcImpl.class },
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
rpcServer.start();
TestRpc client = (TestRpc) HBaseRPC.getProxy(TestRpc.class, 0,
rpcServer.getListenerAddress(), conf, 400);
rpcServer.getListenerAddress(), conf, 1000);
List<Integer> results = new ArrayList<Integer>();
@ -77,7 +87,8 @@ public class TestDelayedRpc {
assertEquals(results.get(0).intValue(), UNDELAYED);
assertEquals(results.get(1).intValue(), UNDELAYED);
assertEquals(results.get(2).intValue(), DELAYED);
assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED :
0xDEADBEEF);
}
private static class ListAppender extends AppenderSkeleton {
@ -113,7 +124,7 @@ public class TestDelayedRpc {
log.addAppender(listAppender);
InetSocketAddress isa = new InetSocketAddress("localhost", 0);
rpcServer = HBaseRPC.getServer(new TestRpcImpl(),
rpcServer = HBaseRPC.getServer(new TestRpcImpl(true),
new Class<?>[]{ TestRpcImpl.class },
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
rpcServer.start();
@ -150,26 +161,41 @@ public class TestDelayedRpc {
}
private static class TestRpcImpl implements TestRpc {
/**
* Should the return value of delayed call be set at the end of the delay
* or at call return.
*/
private boolean delayReturnValue;
/**
* @param delayReturnValue Should the response to the delayed call be set
* at the start or the end of the delay.
* @param delay Amount of milliseconds to delay the call by
*/
public TestRpcImpl(boolean delayReturnValue) {
this.delayReturnValue = delayReturnValue;
}
@Override
public int test(boolean delay) {
public int test(final boolean delay) {
if (!delay) {
return UNDELAYED;
}
final Delayable call = rpcServer.getCurrentCall();
call.startDelay();
call.startDelay(delayReturnValue);
new Thread() {
public void run() {
try {
Thread.sleep(500);
call.endDelay(DELAYED);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
call.endDelay(delayReturnValue ? DELAYED : null);
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
return 0xDEADBEEF; // this return value should not go back to client
// This value should go back to client only if the response is set
// immediately at delay time.
return 0xDEADBEEF;
}
@Override
@ -199,4 +225,63 @@ public class TestDelayedRpc {
}
}
}
@Test
public void testEndDelayThrowing() throws IOException {
Configuration conf = HBaseConfiguration.create();
InetSocketAddress isa = new InetSocketAddress("localhost", 0);
rpcServer = HBaseRPC.getServer(new FaultyTestRpc(),
new Class<?>[]{ TestRpcImpl.class },
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
rpcServer.start();
TestRpc client = (TestRpc) HBaseRPC.getProxy(TestRpc.class, 0,
rpcServer.getListenerAddress(), conf, 1000);
int result = 0xDEADBEEF;
try {
result = client.test(false);
} catch (Exception e) {
fail("No exception should have been thrown.");
}
assertEquals(result, UNDELAYED);
boolean caughtException = false;
try {
result = client.test(true);
} catch(Exception e) {
// Exception thrown by server is enclosed in a RemoteException.
if (e.getCause().getMessage().startsWith(
"java.lang.Exception: Something went wrong"))
caughtException = true;
}
assertTrue(caughtException);
}
/**
* Delayed calls to this class throw an exception.
*/
private static class FaultyTestRpc implements TestRpc {
@Override
public int test(boolean delay) {
if (!delay)
return UNDELAYED;
Delayable call = rpcServer.getCurrentCall();
call.startDelay(true);
try {
call.endDelayThrowing(new Exception("Something went wrong"));
} catch (IOException e) {
e.printStackTrace();
}
// Client will receive the Exception, not this value.
return DELAYED;
}
@Override
public long getProtocolVersion(String arg0, long arg1) throws IOException {
return 0;
}
}
}