HBASE-3899 HBase RPC to support free-ing up server handler threads even if response is not ready
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1126857 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cc53904c16
commit
222f6836df
|
@ -264,6 +264,9 @@ Release 0.91.0 - Unreleased
|
||||||
RS web UIs
|
RS web UIs
|
||||||
HBASE-3691 Add compressor support for 'snappy', google's compressor
|
HBASE-3691 Add compressor support for 'snappy', google's compressor
|
||||||
(Nichole Treadway and Nicholas Telford)
|
(Nichole Treadway and Nicholas Telford)
|
||||||
|
HBASE-3899 enhance HBase RPC to support free-ing up server handler threads
|
||||||
|
even if response is not ready (Dhruba Borthakur)
|
||||||
|
|
||||||
|
|
||||||
Release 0.90.4 - Unreleased
|
Release 0.90.4 - Unreleased
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,66 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2010 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.io;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An optional interface to indicate the the values are not immediately
|
||||||
|
* readable.
|
||||||
|
*/
|
||||||
|
public interface WritableDelayed {
|
||||||
|
/**
|
||||||
|
* Provide a hint to the caller to indicate that
|
||||||
|
* data is not ready yet.
|
||||||
|
*/
|
||||||
|
public boolean isDelayed();
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
* Copyright 2010 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.io;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An optional interface to indicate the the values are not immediately
|
||||||
|
* readable.
|
||||||
|
*/
|
||||||
|
public interface WritableDelayed {
|
||||||
|
/**
|
||||||
|
* Provide a hint to the caller to indicate that
|
||||||
|
* data is not ready yet.
|
||||||
|
*/
|
||||||
|
public boolean isDelayed();
|
||||||
|
}
|
|
@ -57,6 +57,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.io.WritableWithSize;
|
import org.apache.hadoop.hbase.io.WritableWithSize;
|
||||||
|
import org.apache.hadoop.hbase.io.WritableDelayed;
|
||||||
import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
|
import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
|
@ -97,7 +98,7 @@ public abstract class HBaseServer implements RpcServer {
|
||||||
/** Default value for above param */
|
/** Default value for above param */
|
||||||
private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
|
private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
|
||||||
|
|
||||||
private final int warnResponseSize;
|
private static int warnResponseSize;
|
||||||
|
|
||||||
public static final Log LOG =
|
public static final Log LOG =
|
||||||
LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
|
LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
|
||||||
|
@ -150,6 +151,25 @@ public abstract class HBaseServer implements RpcServer {
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
/**
|
||||||
|
* If invoked from inside an RPC, startDelay tells the current call to not
|
||||||
|
* send out * responses back to the client until endDelay() is invoked.
|
||||||
|
*/
|
||||||
|
public static void startDelay(Object o) {
|
||||||
|
Call call = (Call) o;
|
||||||
|
if (call != null) {
|
||||||
|
call.startDelay();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public static void endDelay(Object o) throws IOException {
|
||||||
|
Call call = (Call) o;
|
||||||
|
if (call != null) {
|
||||||
|
call.endDelay();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public static Object getCall() {
|
||||||
|
return CurCall.get();
|
||||||
|
}
|
||||||
/** Returns remote address as a string when invoked inside an RPC.
|
/** Returns remote address as a string when invoked inside an RPC.
|
||||||
* Returns null in case of an error.
|
* Returns null in case of an error.
|
||||||
* @return String
|
* @return String
|
||||||
|
@ -241,13 +261,19 @@ public abstract class HBaseServer implements RpcServer {
|
||||||
protected long timestamp; // the time received when response is null
|
protected long timestamp; // the time received when response is null
|
||||||
// the time served when response is not null
|
// the time served when response is not null
|
||||||
protected ByteBuffer response; // the response for this call
|
protected ByteBuffer response; // the response for this call
|
||||||
|
private boolean delayResponse; // shall the RPC layer delay the response?
|
||||||
|
private boolean doneResponse; // have we already responded to the client?
|
||||||
|
private Responder responder; // the thread that sends out responses
|
||||||
|
|
||||||
public Call(int id, Writable param, Connection connection) {
|
public Call(int id, Writable param, Connection connection, Responder responder) {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
this.param = param;
|
this.param = param;
|
||||||
this.connection = connection;
|
this.connection = connection;
|
||||||
this.timestamp = System.currentTimeMillis();
|
this.timestamp = System.currentTimeMillis();
|
||||||
this.response = null;
|
this.response = null;
|
||||||
|
this.delayResponse = false;
|
||||||
|
this.doneResponse = false;
|
||||||
|
this.responder = responder;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -255,9 +281,40 @@ public abstract class HBaseServer implements RpcServer {
|
||||||
return param.toString() + " from " + connection.toString();
|
return param.toString() + " from " + connection.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setResponse(ByteBuffer response) {
|
public synchronized void setResponse(ByteBuffer response) {
|
||||||
this.response = response;
|
this.response = response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delay this call, do not send its response back to the client
|
||||||
|
*/
|
||||||
|
public synchronized void startDelay() {
|
||||||
|
this.delayResponse = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* It is ok to send responses back to the client now. This is
|
||||||
|
* typically called by a non-server-handler thread.
|
||||||
|
*/
|
||||||
|
public synchronized void endDelay() throws IOException {
|
||||||
|
this.delayResponse = false;
|
||||||
|
if (response != null && !doneResponse) {
|
||||||
|
doneResponse = true;
|
||||||
|
responder.doRespond(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If we have a reponse, and delay is not set, then respond
|
||||||
|
* immediately. Otherwise, do not respond to client. This is
|
||||||
|
* called the by the RPC code in the context of the Handler thread.
|
||||||
|
*/
|
||||||
|
public synchronized void sendResponseIfReady() throws IOException {
|
||||||
|
if (response != null && !doneResponse && !delayResponse) {
|
||||||
|
doneResponse = true; // going to respond as soon as this call returns.
|
||||||
|
responder.doRespond(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Listens on the socket. Creates jobs for the handler threads*/
|
/** Listens on the socket. Creates jobs for the handler threads*/
|
||||||
|
@ -799,16 +856,41 @@ public abstract class HBaseServer implements RpcServer {
|
||||||
return done;
|
return done;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// Enqueue for background thread to send responses out later.
|
||||||
|
//
|
||||||
|
private void enqueueInSelector(Call call) throws IOException {
|
||||||
|
incPending();
|
||||||
|
try {
|
||||||
|
// Wakeup the thread blocked on select, only then can the call
|
||||||
|
// to channel.register() complete.
|
||||||
|
SocketChannel channel = call.connection.channel;
|
||||||
|
writeSelector.wakeup();
|
||||||
|
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
|
||||||
|
} catch (ClosedChannelException e) {
|
||||||
|
//Its ok. channel might be closed else where.
|
||||||
|
} finally {
|
||||||
|
decPending();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// Enqueue a response from the application.
|
// Enqueue a response from the application.
|
||||||
//
|
//
|
||||||
void doRespond(Call call) throws IOException {
|
void doRespond(Call call) throws IOException {
|
||||||
|
// set the serve time when the response has to be sent later
|
||||||
|
call.timestamp = System.currentTimeMillis();
|
||||||
|
|
||||||
|
boolean doRegister = false;
|
||||||
synchronized (call.connection.responseQueue) {
|
synchronized (call.connection.responseQueue) {
|
||||||
call.connection.responseQueue.addLast(call);
|
call.connection.responseQueue.addLast(call);
|
||||||
if (call.connection.responseQueue.size() == 1) {
|
if (call.connection.responseQueue.size() == 1) {
|
||||||
processResponse(call.connection.responseQueue, true);
|
doRegister = !processResponse(call.connection.responseQueue, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (doRegister) {
|
||||||
|
enqueueInSelector(call); // tell background thread to send it out later
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void incPending() { // call waiting to be enqueued.
|
private synchronized void incPending() { // call waiting to be enqueued.
|
||||||
|
@ -1003,7 +1085,7 @@ public abstract class HBaseServer implements RpcServer {
|
||||||
Writable param = ReflectionUtils.newInstance(paramClass, conf); // read param
|
Writable param = ReflectionUtils.newInstance(paramClass, conf); // read param
|
||||||
param.readFields(dis);
|
param.readFields(dis);
|
||||||
|
|
||||||
Call call = new Call(id, param, this);
|
Call call = new Call(id, param, this, responder);
|
||||||
|
|
||||||
if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) {
|
if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) {
|
||||||
priorityCallQueue.put(call);
|
priorityCallQueue.put(call);
|
||||||
|
@ -1070,6 +1152,10 @@ public abstract class HBaseServer implements RpcServer {
|
||||||
}
|
}
|
||||||
CurCall.set(null);
|
CurCall.set(null);
|
||||||
|
|
||||||
|
if (!(value instanceof WritableDelayed)) {
|
||||||
|
processRpcResponse(call, value, error, errorClass);
|
||||||
|
}
|
||||||
|
|
||||||
int size = BUFFER_INITIAL_SIZE;
|
int size = BUFFER_INITIAL_SIZE;
|
||||||
if (value instanceof WritableWithSize) {
|
if (value instanceof WritableWithSize) {
|
||||||
// get the size hint.
|
// get the size hint.
|
||||||
|
@ -1106,7 +1192,7 @@ public abstract class HBaseServer implements RpcServer {
|
||||||
|
|
||||||
|
|
||||||
call.setResponse(buf.getByteBuffer());
|
call.setResponse(buf.getByteBuffer());
|
||||||
responder.doRespond(call);
|
call.sendResponseIfReady(); // maybe delay sending out response to the client
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
if (running) { // unexpected -- log it
|
if (running) { // unexpected -- log it
|
||||||
LOG.info(getName() + " caught: " +
|
LOG.info(getName() + " caught: " +
|
||||||
|
@ -1132,6 +1218,51 @@ public abstract class HBaseServer implements RpcServer {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Packages the rpc invocation response back into the call.
|
||||||
|
static public void processRpcResponse(Object callobj, Writable value,
|
||||||
|
String error, String errorClass) throws IOException {
|
||||||
|
Call call = (Call)callobj;
|
||||||
|
|
||||||
|
int size = Handler.BUFFER_INITIAL_SIZE;
|
||||||
|
if (value instanceof WritableWithSize) {
|
||||||
|
// get the size hint.
|
||||||
|
WritableWithSize ohint = (WritableWithSize)value;
|
||||||
|
long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT;
|
||||||
|
if (hint > 0) {
|
||||||
|
if ((hint) > Integer.MAX_VALUE) {
|
||||||
|
// oops, new problem.
|
||||||
|
IOException ioe =
|
||||||
|
new IOException("Result buffer size too large: " + hint);
|
||||||
|
errorClass = ioe.getClass().getName();
|
||||||
|
error = StringUtils.stringifyException(ioe);
|
||||||
|
} else {
|
||||||
|
size = (int)hint;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
|
||||||
|
DataOutputStream out = new DataOutputStream(buf);
|
||||||
|
out.writeInt(call.id); // write call id
|
||||||
|
out.writeBoolean(error != null); // write error flag
|
||||||
|
|
||||||
|
if (error == null) {
|
||||||
|
value.write(out);
|
||||||
|
} else {
|
||||||
|
WritableUtils.writeString(out, errorClass);
|
||||||
|
WritableUtils.writeString(out, error);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (buf.size() > warnResponseSize) {
|
||||||
|
LOG.warn("responseTooLarge for: "+call+": Size: "
|
||||||
|
+ StringUtils.humanReadableInt(buf.size()));
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Served: " + ((Invocation)call.param).getMethodName());
|
||||||
|
}
|
||||||
|
call.setResponse(buf.getByteBuffer());
|
||||||
|
call.sendResponseIfReady(); // maybe delay sending out response to the client
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the QOS level for this call. If it is higher than the highPriorityLevel and there
|
* Gets the QOS level for this call. If it is higher than the highPriorityLevel and there
|
||||||
* are priorityHandlers available it will be processed in it's own thread set.
|
* are priorityHandlers available it will be processed in it's own thread set.
|
||||||
|
|
Loading…
Reference in New Issue