diff --git a/CHANGES.txt b/CHANGES.txt index 9515165daa3..923df0a264e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -264,6 +264,9 @@ Release 0.91.0 - Unreleased RS web UIs HBASE-3691 Add compressor support for 'snappy', google's compressor (Nichole Treadway and Nicholas Telford) + HBASE-3899 enhance HBase RPC to support free-ing up server handler threads + even if response is not ready (Dhruba Borthakur) + Release 0.90.4 - Unreleased diff --git a/src/main/java/org/apache/hadoop/hbase/io/WritableDelayed.java b/src/main/java/org/apache/hadoop/hbase/io/WritableDelayed.java new file mode 100644 index 00000000000..aa54a47af26 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/io/WritableDelayed.java @@ -0,0 +1,66 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io; + +/** + * An optional interface to indicate the the values are not immediately + * readable. + */ +public interface WritableDelayed { + /** + * Provide a hint to the caller to indicate that + * data is not ready yet. + */ + public boolean isDelayed(); +} +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io; + +/** + * An optional interface to indicate the the values are not immediately + * readable. + */ +public interface WritableDelayed { + /** + * Provide a hint to the caller to indicate that + * data is not ready yet. + */ + public boolean isDelayed(); +} diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java index 243c68b944d..0ca8102839a 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java @@ -57,6 +57,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.WritableWithSize; +import org.apache.hadoop.hbase.io.WritableDelayed; import org.apache.hadoop.hbase.util.ByteBufferOutputStream; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; @@ -97,7 +98,7 @@ public abstract class HBaseServer implements RpcServer { /** Default value for above param */ private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024; - private final int warnResponseSize; + private static int warnResponseSize; public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer"); @@ -150,6 +151,25 @@ public abstract class HBaseServer implements RpcServer { } return null; } + /** + * If invoked from inside an RPC, startDelay tells the current call to not + * send out * responses back to the client until endDelay() is invoked. + */ + public static void startDelay(Object o) { + Call call = (Call) o; + if (call != null) { + call.startDelay(); + } + } + public static void endDelay(Object o) throws IOException { + Call call = (Call) o; + if (call != null) { + call.endDelay(); + } + } + public static Object getCall() { + return CurCall.get(); + } /** Returns remote address as a string when invoked inside an RPC. * Returns null in case of an error. * @return String @@ -241,13 +261,19 @@ public abstract class HBaseServer implements RpcServer { protected long timestamp; // the time received when response is null // the time served when response is not null protected ByteBuffer response; // the response for this call + private boolean delayResponse; // shall the RPC layer delay the response? + private boolean doneResponse; // have we already responded to the client? + private Responder responder; // the thread that sends out responses - public Call(int id, Writable param, Connection connection) { + public Call(int id, Writable param, Connection connection, Responder responder) { this.id = id; this.param = param; this.connection = connection; this.timestamp = System.currentTimeMillis(); this.response = null; + this.delayResponse = false; + this.doneResponse = false; + this.responder = responder; } @Override @@ -255,9 +281,40 @@ public abstract class HBaseServer implements RpcServer { return param.toString() + " from " + connection.toString(); } - public void setResponse(ByteBuffer response) { + public synchronized void setResponse(ByteBuffer response) { this.response = response; } + + /** + * Delay this call, do not send its response back to the client + */ + public synchronized void startDelay() { + this.delayResponse = true; + } + + /** + * It is ok to send responses back to the client now. This is + * typically called by a non-server-handler thread. + */ + public synchronized void endDelay() throws IOException { + this.delayResponse = false; + if (response != null && !doneResponse) { + doneResponse = true; + responder.doRespond(this); + } + } + + /* + * If we have a reponse, and delay is not set, then respond + * immediately. Otherwise, do not respond to client. This is + * called the by the RPC code in the context of the Handler thread. + */ + public synchronized void sendResponseIfReady() throws IOException { + if (response != null && !doneResponse && !delayResponse) { + doneResponse = true; // going to respond as soon as this call returns. + responder.doRespond(this); + } + } } /** Listens on the socket. Creates jobs for the handler threads*/ @@ -799,16 +856,41 @@ public abstract class HBaseServer implements RpcServer { return done; } + // + // Enqueue for background thread to send responses out later. + // + private void enqueueInSelector(Call call) throws IOException { + incPending(); + try { + // Wakeup the thread blocked on select, only then can the call + // to channel.register() complete. + SocketChannel channel = call.connection.channel; + writeSelector.wakeup(); + channel.register(writeSelector, SelectionKey.OP_WRITE, call); + } catch (ClosedChannelException e) { + //Its ok. channel might be closed else where. + } finally { + decPending(); + } + } + // // Enqueue a response from the application. // void doRespond(Call call) throws IOException { + // set the serve time when the response has to be sent later + call.timestamp = System.currentTimeMillis(); + + boolean doRegister = false; synchronized (call.connection.responseQueue) { call.connection.responseQueue.addLast(call); if (call.connection.responseQueue.size() == 1) { - processResponse(call.connection.responseQueue, true); + doRegister = !processResponse(call.connection.responseQueue, false); } } + if (doRegister) { + enqueueInSelector(call); // tell background thread to send it out later + } } private synchronized void incPending() { // call waiting to be enqueued. @@ -1003,7 +1085,7 @@ public abstract class HBaseServer implements RpcServer { Writable param = ReflectionUtils.newInstance(paramClass, conf); // read param param.readFields(dis); - Call call = new Call(id, param, this); + Call call = new Call(id, param, this, responder); if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) { priorityCallQueue.put(call); @@ -1070,6 +1152,10 @@ public abstract class HBaseServer implements RpcServer { } CurCall.set(null); + if (!(value instanceof WritableDelayed)) { + processRpcResponse(call, value, error, errorClass); + } + int size = BUFFER_INITIAL_SIZE; if (value instanceof WritableWithSize) { // get the size hint. @@ -1106,7 +1192,7 @@ public abstract class HBaseServer implements RpcServer { call.setResponse(buf.getByteBuffer()); - responder.doRespond(call); + call.sendResponseIfReady(); // maybe delay sending out response to the client } catch (InterruptedException e) { if (running) { // unexpected -- log it LOG.info(getName() + " caught: " + @@ -1132,6 +1218,51 @@ public abstract class HBaseServer implements RpcServer { } + // Packages the rpc invocation response back into the call. + static public void processRpcResponse(Object callobj, Writable value, + String error, String errorClass) throws IOException { + Call call = (Call)callobj; + + int size = Handler.BUFFER_INITIAL_SIZE; + if (value instanceof WritableWithSize) { + // get the size hint. + WritableWithSize ohint = (WritableWithSize)value; + long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT; + if (hint > 0) { + if ((hint) > Integer.MAX_VALUE) { + // oops, new problem. + IOException ioe = + new IOException("Result buffer size too large: " + hint); + errorClass = ioe.getClass().getName(); + error = StringUtils.stringifyException(ioe); + } else { + size = (int)hint; + } + } + } + ByteBufferOutputStream buf = new ByteBufferOutputStream(size); + DataOutputStream out = new DataOutputStream(buf); + out.writeInt(call.id); // write call id + out.writeBoolean(error != null); // write error flag + + if (error == null) { + value.write(out); + } else { + WritableUtils.writeString(out, errorClass); + WritableUtils.writeString(out, error); + } + + if (buf.size() > warnResponseSize) { + LOG.warn("responseTooLarge for: "+call+": Size: " + + StringUtils.humanReadableInt(buf.size())); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Served: " + ((Invocation)call.param).getMethodName()); + } + call.setResponse(buf.getByteBuffer()); + call.sendResponseIfReady(); // maybe delay sending out response to the client + } + /** * Gets the QOS level for this call. If it is higher than the highPriorityLevel and there * are priorityHandlers available it will be processed in it's own thread set.