HBASE-3899 enhance HBase RPC to support free-ing up server handler threads even if response is not ready

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1151227 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2011-07-26 20:30:57 +00:00
parent 2349ab2491
commit 1f9315ff09
5 changed files with 412 additions and 56 deletions

View File

@ -332,6 +332,8 @@ Release 0.91.0 - Unreleased
HBASE-4081 Issues with HRegion.compactStores methods (Ming Ma)
HBASE-3465 Hbase should use a HADOOP_HOME environment variable if available
(Alejandro Abdelnur)
HBASE-3899 enhance HBase RPC to support free-ing up server handler threads
even if response is not ready (Vlad Dogaru)
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel

View File

@ -0,0 +1,48 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
/**
* A call whose response can be delayed by the server.
*/
public interface Delayable {
/**
* Signal that the call response should be delayed, thus freeing the RPC
* server to handle different requests.
*/
public void startDelay();
/**
* @return is the call delayed?
*/
public boolean isDelayed();
/**
* Signal that the response to the call is ready and the RPC server is now
* allowed to send the response.
* @param result The result to return to the caller.
* @throws IOException
*/
public void endDelay(Object result) throws IOException;
}

View File

@ -47,6 +47,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@ -56,6 +57,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.io.WritableWithSize;
import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
import org.apache.hadoop.hbase.util.Bytes;
@ -97,8 +99,19 @@ public abstract class HBaseServer implements RpcServer {
/** Default value for above param */
private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
static final int BUFFER_INITIAL_SIZE = 1024;
private final int warnResponseSize;
private static final String WARN_DELAYED_CALLS =
"hbase.ipc.warn.delayedrpc.number";
private static final int DEFAULT_WARN_DELAYED_CALLS = 1000;
private final int warnDelayedCalls;
private AtomicInteger delayedCalls;
public static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
@ -234,20 +247,25 @@ public abstract class HBaseServer implements RpcServer {
}
/** A call queued for handling. */
private static class Call {
private class Call implements Delayable {
protected int id; // the client's call id
protected Writable param; // the parameter passed
protected Connection connection; // connection to client
protected long timestamp; // the time received when response is null
// the time served when response is not null
protected ByteBuffer response; // the response for this call
protected boolean delayResponse;
protected Responder responder;
public Call(int id, Writable param, Connection connection) {
public Call(int id, Writable param, Connection connection,
Responder responder) {
this.id = id;
this.param = param;
this.connection = connection;
this.timestamp = System.currentTimeMillis();
this.response = null;
this.delayResponse = false;
this.responder = responder;
}
@Override
@ -255,8 +273,98 @@ public abstract class HBaseServer implements RpcServer {
return param.toString() + " from " + connection.toString();
}
public void setResponse(ByteBuffer response) {
this.response = response;
private synchronized void setResponse(Object value, String errorClass,
String error) {
Writable result = null;
if (value instanceof Writable) {
result = (Writable) value;
} else {
/* We might have a null value and errors. Avoid creating a
* HbaseObjectWritable, because the constructor fails on null. */
if (value != null) {
result = new HbaseObjectWritable(value);
}
}
int size = BUFFER_INITIAL_SIZE;
if (result instanceof WritableWithSize) {
// get the size hint.
WritableWithSize ohint = (WritableWithSize) result;
long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT;
if (hint > Integer.MAX_VALUE) {
// oops, new problem.
IOException ioe =
new IOException("Result buffer size too large: " + hint);
errorClass = ioe.getClass().getName();
error = StringUtils.stringifyException(ioe);
} else {
size = (int)hint;
}
}
ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
DataOutputStream out = new DataOutputStream(buf);
try {
out.writeInt(this.id); // write call id
out.writeBoolean(error != null); // write error flag
} catch (IOException e) {
errorClass = e.getClass().getName();
error = StringUtils.stringifyException(e);
}
try {
if (error == null) {
result.write(out);
} else {
WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error);
}
} catch (IOException e) {
LOG.warn("Error sending response to call: ", e);
}
if (buf.size() > warnResponseSize) {
LOG.warn("responseTooLarge for: "+this+": Size: "
+ StringUtils.humanReadableInt(buf.size()));
}
this.response = buf.getByteBuffer();
}
@Override
public synchronized void endDelay(Object result) throws IOException {
assert this.delayResponse;
this.delayResponse = false;
delayedCalls.decrementAndGet();
this.setResponse(result, null, null);
this.responder.doRespond(this);
}
@Override
public synchronized void startDelay() {
assert !this.delayResponse;
this.delayResponse = true;
int numDelayed = delayedCalls.incrementAndGet();
if (numDelayed > warnDelayedCalls) {
LOG.warn("Too many delayed calls: limit " + warnDelayedCalls +
" current " + numDelayed);
}
}
@Override
public synchronized boolean isDelayed() {
return this.delayResponse;
}
/**
* If we have a response, and delay is not set, then respond
* immediately. Otherwise, do not respond to client. This is
* called the by the RPC code in the context of the Handler thread.
*/
public synchronized void sendResponseIfReady() throws IOException {
if (!this.delayResponse) {
this.responder.doRespond(this);
}
}
}
@ -767,19 +875,8 @@ public abstract class HBaseServer implements RpcServer {
if (inHandler) {
// set the serve time when the response has to be sent later
call.timestamp = System.currentTimeMillis();
incPending();
try {
// Wakeup the thread blocked on select, only then can the call
// to channel.register() complete.
writeSelector.wakeup();
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
} catch (ClosedChannelException e) {
//Its ok. channel might be closed else where.
if (enqueueInSelector(call))
done = true;
} finally {
decPending();
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": responding to #" + call.id + " from " +
@ -799,16 +896,44 @@ public abstract class HBaseServer implements RpcServer {
return done;
}
//
// Enqueue for background thread to send responses out later.
//
private boolean enqueueInSelector(Call call) throws IOException {
boolean done = false;
incPending();
try {
// Wake up the thread blocked on select, only then can the call
// to channel.register() complete.
SocketChannel channel = call.connection.channel;
writeSelector.wakeup();
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
} catch (ClosedChannelException e) {
//It's OK. Channel might be closed else where.
done = true;
} finally {
decPending();
}
return done;
}
//
// Enqueue a response from the application.
//
void doRespond(Call call) throws IOException {
// set the serve time when the response has to be sent later
call.timestamp = System.currentTimeMillis();
boolean doRegister = false;
synchronized (call.connection.responseQueue) {
call.connection.responseQueue.addLast(call);
if (call.connection.responseQueue.size() == 1) {
processResponse(call.connection.responseQueue, true);
doRegister = !processResponse(call.connection.responseQueue, false);
}
}
if (doRegister) {
enqueueInSelector(call);
}
}
private synchronized void incPending() { // call waiting to be enqueued.
@ -1003,7 +1128,7 @@ public abstract class HBaseServer implements RpcServer {
Writable param = ReflectionUtils.newInstance(paramClass, conf); // read param
param.readFields(dis);
Call call = new Call(id, param, this);
Call call = new Call(id, param, this, responder);
if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) {
priorityCallQueue.put(call);
@ -1028,7 +1153,6 @@ public abstract class HBaseServer implements RpcServer {
/** Handles queued calls . */
private class Handler extends Thread {
private final BlockingQueue<Call> myCallQueue;
static final int BUFFER_INITIAL_SIZE = 1024;
public Handler(final BlockingQueue<Call> cq, int instanceNumber) {
this.myCallQueue = cq;
@ -1070,43 +1194,10 @@ public abstract class HBaseServer implements RpcServer {
}
CurCall.set(null);
int size = BUFFER_INITIAL_SIZE;
if (value instanceof WritableWithSize) {
// get the size hint.
WritableWithSize ohint = (WritableWithSize)value;
long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT;
if (hint > 0) {
if ((hint) > Integer.MAX_VALUE) {
// oops, new problem.
IOException ioe =
new IOException("Result buffer size too large: " + hint);
errorClass = ioe.getClass().getName();
error = StringUtils.stringifyException(ioe);
} else {
size = (int)hint;
}
}
if (!call.isDelayed()) {
call.setResponse(value, errorClass, error);
}
ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
DataOutputStream out = new DataOutputStream(buf);
out.writeInt(call.id); // write call id
out.writeBoolean(error != null); // write error flag
if (error == null) {
value.write(out);
} else {
WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error);
}
if (buf.size() > warnResponseSize) {
LOG.warn(getName()+", responseTooLarge for: "+call+": Size: "
+ StringUtils.humanReadableInt(buf.size()));
}
call.setResponse(buf.getByteBuffer());
responder.doRespond(call);
call.sendResponseIfReady();
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
LOG.info(getName() + " caught: " +
@ -1201,6 +1292,9 @@ public abstract class HBaseServer implements RpcServer {
this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE,
DEFAULT_WARN_RESPONSE_SIZE);
this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS,
DEFAULT_WARN_DELAYED_CALLS);
this.delayedCalls = new AtomicInteger(0);
// Create the responder here
@ -1428,4 +1522,8 @@ public abstract class HBaseServer implements RpcServer {
int nBytes = initialRemaining - buf.remaining();
return (nBytes > 0) ? nBytes : ret;
}
public Delayable getCurrentCall() {
return CurCall.get();
}
}

View File

@ -26,7 +26,6 @@ import org.apache.hadoop.ipc.VersionedProtocol;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
/**
*/
@ -64,6 +63,13 @@ public interface RpcServer {
void startThreads();
/**
* Needed for delayed calls. We need to be able to store the current call
* so that we can complete it later.
* @return Call the server is currently handling.
*/
Delayable getCurrentCall();
/**
* Returns the metrics instance for reporting RPC call statistics
*/

View File

@ -0,0 +1,202 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.Test;
/**
* Test that delayed RPCs work. Fire up three calls, the first of which should
* be delayed. Check that the last two, which are undelayed, return before the
* first one.
*/
public class TestDelayedRpc {
public static RpcServer rpcServer;
public static final int UNDELAYED = 0;
public static final int DELAYED = 1;
@Test
public void testDelayedRpc() throws Exception {
Configuration conf = HBaseConfiguration.create();
InetSocketAddress isa = new InetSocketAddress("localhost", 0);
rpcServer = HBaseRPC.getServer(new TestRpcImpl(),
new Class<?>[]{ TestRpcImpl.class },
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
rpcServer.start();
TestRpc client = (TestRpc) HBaseRPC.getProxy(TestRpc.class, 0,
rpcServer.getListenerAddress(), conf, 400);
List<Integer> results = new ArrayList<Integer>();
TestThread th1 = new TestThread(client, true, results);
TestThread th2 = new TestThread(client, false, results);
TestThread th3 = new TestThread(client, false, results);
th1.start();
Thread.sleep(100);
th2.start();
Thread.sleep(200);
th3.start();
th1.join();
th2.join();
th3.join();
assertEquals(results.get(0).intValue(), UNDELAYED);
assertEquals(results.get(1).intValue(), UNDELAYED);
assertEquals(results.get(2).intValue(), DELAYED);
}
private static class ListAppender extends AppenderSkeleton {
private List<String> messages = new ArrayList<String>();
@Override
protected void append(LoggingEvent event) {
messages.add(event.getMessage().toString());
}
@Override
public void close() {
}
@Override
public boolean requiresLayout() {
return false;
}
public List<String> getMessages() {
return messages;
}
}
@Test
public void testTooManyDelayedRpcs() throws Exception {
Configuration conf = HBaseConfiguration.create();
final int MAX_DELAYED_RPC = 10;
conf.setInt("hbase.ipc.warn.delayedrpc.number", MAX_DELAYED_RPC);
ListAppender listAppender = new ListAppender();
Logger log = Logger.getLogger("org.apache.hadoop.ipc.HBaseServer");
log.addAppender(listAppender);
InetSocketAddress isa = new InetSocketAddress("localhost", 0);
rpcServer = HBaseRPC.getServer(new TestRpcImpl(),
new Class<?>[]{ TestRpcImpl.class },
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
rpcServer.start();
TestRpc client = (TestRpc) HBaseRPC.getProxy(TestRpc.class, 0,
rpcServer.getListenerAddress(), conf, 1000);
Thread threads[] = new Thread[MAX_DELAYED_RPC + 1];
for (int i = 0; i < MAX_DELAYED_RPC; i++) {
threads[i] = new TestThread(client, true, null);
threads[i].start();
}
/* No warnings till here. */
assertTrue(listAppender.getMessages().isEmpty());
/* This should give a warning. */
threads[MAX_DELAYED_RPC] = new TestThread(client, true, null);
threads[MAX_DELAYED_RPC].start();
for (int i = 0; i < MAX_DELAYED_RPC; i++) {
threads[i].join();
}
assertFalse(listAppender.getMessages().isEmpty());
assertTrue(listAppender.getMessages().get(0).startsWith(
"Too many delayed calls"));
log.removeAppender(listAppender);
}
public interface TestRpc extends VersionedProtocol {
int test(boolean delay);
}
private static class TestRpcImpl implements TestRpc {
@Override
public int test(boolean delay) {
if (!delay) {
return UNDELAYED;
}
final Delayable call = rpcServer.getCurrentCall();
call.startDelay();
new Thread() {
public void run() {
try {
Thread.sleep(500);
call.endDelay(DELAYED);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
return 0xDEADBEEF; // this return value should not go back to client
}
@Override
public long getProtocolVersion(String arg0, long arg1) throws IOException {
return 0;
}
}
private static class TestThread extends Thread {
private TestRpc server;
private boolean delay;
private List<Integer> results;
public TestThread(TestRpc server, boolean delay, List<Integer> results) {
this.server = server;
this.delay = delay;
this.results = results;
}
@Override
public void run() {
Integer result = new Integer(server.test(delay));
if (results != null) {
synchronized (results) {
results.add(result);
}
}
}
}
}