diff --git a/CHANGES.txt b/CHANGES.txt index 12c4681a633..7c2bbe46bf9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -332,6 +332,8 @@ Release 0.91.0 - Unreleased HBASE-4081 Issues with HRegion.compactStores methods (Ming Ma) HBASE-3465 Hbase should use a HADOOP_HOME environment variable if available (Alejandro Abdelnur) + HBASE-3899 enhance HBase RPC to support free-ing up server handler threads + even if response is not ready (Vlad Dogaru) TASKS HBASE-3559 Move report of split to master OFF the heartbeat channel diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java b/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java new file mode 100644 index 00000000000..f2ae31e0547 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java @@ -0,0 +1,48 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +/** + * A call whose response can be delayed by the server. + */ +public interface Delayable { + /** + * Signal that the call response should be delayed, thus freeing the RPC + * server to handle different requests. + */ + public void startDelay(); + + /** + * @return is the call delayed? + */ + public boolean isDelayed(); + + /** + * Signal that the response to the call is ready and the RPC server is now + * allowed to send the response. + * @param result The result to return to the caller. + * @throws IOException + */ + public void endDelay(Object result) throws IOException; +} diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java index 61d3915bfe8..2f7dfae53c2 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java @@ -47,6 +47,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -56,6 +57,7 @@ import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.HbaseObjectWritable; import org.apache.hadoop.hbase.io.WritableWithSize; import org.apache.hadoop.hbase.util.ByteBufferOutputStream; import org.apache.hadoop.hbase.util.Bytes; @@ -97,8 +99,19 @@ public abstract class HBaseServer implements RpcServer { /** Default value for above param */ private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024; + static final int BUFFER_INITIAL_SIZE = 1024; + private final int warnResponseSize; + private static final String WARN_DELAYED_CALLS = + "hbase.ipc.warn.delayedrpc.number"; + + private static final int DEFAULT_WARN_DELAYED_CALLS = 1000; + + private final int warnDelayedCalls; + + private AtomicInteger delayedCalls; + public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer"); @@ -234,20 +247,25 @@ public abstract class HBaseServer implements RpcServer { } /** A call queued for handling. */ - private static class Call { + private class Call implements Delayable { protected int id; // the client's call id protected Writable param; // the parameter passed protected Connection connection; // connection to client protected long timestamp; // the time received when response is null // the time served when response is not null protected ByteBuffer response; // the response for this call + protected boolean delayResponse; + protected Responder responder; - public Call(int id, Writable param, Connection connection) { + public Call(int id, Writable param, Connection connection, + Responder responder) { this.id = id; this.param = param; this.connection = connection; this.timestamp = System.currentTimeMillis(); this.response = null; + this.delayResponse = false; + this.responder = responder; } @Override @@ -255,8 +273,98 @@ public abstract class HBaseServer implements RpcServer { return param.toString() + " from " + connection.toString(); } - public void setResponse(ByteBuffer response) { - this.response = response; + private synchronized void setResponse(Object value, String errorClass, + String error) { + Writable result = null; + if (value instanceof Writable) { + result = (Writable) value; + } else { + /* We might have a null value and errors. Avoid creating a + * HbaseObjectWritable, because the constructor fails on null. */ + if (value != null) { + result = new HbaseObjectWritable(value); + } + } + + int size = BUFFER_INITIAL_SIZE; + if (result instanceof WritableWithSize) { + // get the size hint. + WritableWithSize ohint = (WritableWithSize) result; + long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT; + if (hint > Integer.MAX_VALUE) { + // oops, new problem. + IOException ioe = + new IOException("Result buffer size too large: " + hint); + errorClass = ioe.getClass().getName(); + error = StringUtils.stringifyException(ioe); + } else { + size = (int)hint; + } + } + + ByteBufferOutputStream buf = new ByteBufferOutputStream(size); + DataOutputStream out = new DataOutputStream(buf); + try { + out.writeInt(this.id); // write call id + out.writeBoolean(error != null); // write error flag + } catch (IOException e) { + errorClass = e.getClass().getName(); + error = StringUtils.stringifyException(e); + } + + try { + if (error == null) { + result.write(out); + } else { + WritableUtils.writeString(out, errorClass); + WritableUtils.writeString(out, error); + } + } catch (IOException e) { + LOG.warn("Error sending response to call: ", e); + } + + if (buf.size() > warnResponseSize) { + LOG.warn("responseTooLarge for: "+this+": Size: " + + StringUtils.humanReadableInt(buf.size())); + } + + this.response = buf.getByteBuffer(); + } + + @Override + public synchronized void endDelay(Object result) throws IOException { + assert this.delayResponse; + this.delayResponse = false; + delayedCalls.decrementAndGet(); + this.setResponse(result, null, null); + this.responder.doRespond(this); + } + + @Override + public synchronized void startDelay() { + assert !this.delayResponse; + this.delayResponse = true; + int numDelayed = delayedCalls.incrementAndGet(); + if (numDelayed > warnDelayedCalls) { + LOG.warn("Too many delayed calls: limit " + warnDelayedCalls + + " current " + numDelayed); + } + } + + @Override + public synchronized boolean isDelayed() { + return this.delayResponse; + } + + /** + * If we have a response, and delay is not set, then respond + * immediately. Otherwise, do not respond to client. This is + * called the by the RPC code in the context of the Handler thread. + */ + public synchronized void sendResponseIfReady() throws IOException { + if (!this.delayResponse) { + this.responder.doRespond(this); + } } } @@ -767,19 +875,8 @@ public abstract class HBaseServer implements RpcServer { if (inHandler) { // set the serve time when the response has to be sent later call.timestamp = System.currentTimeMillis(); - - incPending(); - try { - // Wakeup the thread blocked on select, only then can the call - // to channel.register() complete. - writeSelector.wakeup(); - channel.register(writeSelector, SelectionKey.OP_WRITE, call); - } catch (ClosedChannelException e) { - //Its ok. channel might be closed else where. + if (enqueueInSelector(call)) done = true; - } finally { - decPending(); - } } if (LOG.isDebugEnabled()) { LOG.debug(getName() + ": responding to #" + call.id + " from " + @@ -799,16 +896,44 @@ public abstract class HBaseServer implements RpcServer { return done; } + // + // Enqueue for background thread to send responses out later. + // + private boolean enqueueInSelector(Call call) throws IOException { + boolean done = false; + incPending(); + try { + // Wake up the thread blocked on select, only then can the call + // to channel.register() complete. + SocketChannel channel = call.connection.channel; + writeSelector.wakeup(); + channel.register(writeSelector, SelectionKey.OP_WRITE, call); + } catch (ClosedChannelException e) { + //It's OK. Channel might be closed else where. + done = true; + } finally { + decPending(); + } + return done; + } + // // Enqueue a response from the application. // void doRespond(Call call) throws IOException { + // set the serve time when the response has to be sent later + call.timestamp = System.currentTimeMillis(); + + boolean doRegister = false; synchronized (call.connection.responseQueue) { call.connection.responseQueue.addLast(call); if (call.connection.responseQueue.size() == 1) { - processResponse(call.connection.responseQueue, true); + doRegister = !processResponse(call.connection.responseQueue, false); } } + if (doRegister) { + enqueueInSelector(call); + } } private synchronized void incPending() { // call waiting to be enqueued. @@ -1003,7 +1128,7 @@ public abstract class HBaseServer implements RpcServer { Writable param = ReflectionUtils.newInstance(paramClass, conf); // read param param.readFields(dis); - Call call = new Call(id, param, this); + Call call = new Call(id, param, this, responder); if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) { priorityCallQueue.put(call); @@ -1028,7 +1153,6 @@ public abstract class HBaseServer implements RpcServer { /** Handles queued calls . */ private class Handler extends Thread { private final BlockingQueue myCallQueue; - static final int BUFFER_INITIAL_SIZE = 1024; public Handler(final BlockingQueue cq, int instanceNumber) { this.myCallQueue = cq; @@ -1070,43 +1194,10 @@ public abstract class HBaseServer implements RpcServer { } CurCall.set(null); - int size = BUFFER_INITIAL_SIZE; - if (value instanceof WritableWithSize) { - // get the size hint. - WritableWithSize ohint = (WritableWithSize)value; - long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT; - if (hint > 0) { - if ((hint) > Integer.MAX_VALUE) { - // oops, new problem. - IOException ioe = - new IOException("Result buffer size too large: " + hint); - errorClass = ioe.getClass().getName(); - error = StringUtils.stringifyException(ioe); - } else { - size = (int)hint; - } - } + if (!call.isDelayed()) { + call.setResponse(value, errorClass, error); } - ByteBufferOutputStream buf = new ByteBufferOutputStream(size); - DataOutputStream out = new DataOutputStream(buf); - out.writeInt(call.id); // write call id - out.writeBoolean(error != null); // write error flag - - if (error == null) { - value.write(out); - } else { - WritableUtils.writeString(out, errorClass); - WritableUtils.writeString(out, error); - } - - if (buf.size() > warnResponseSize) { - LOG.warn(getName()+", responseTooLarge for: "+call+": Size: " - + StringUtils.humanReadableInt(buf.size())); - } - - - call.setResponse(buf.getByteBuffer()); - responder.doRespond(call); + call.sendResponseIfReady(); } catch (InterruptedException e) { if (running) { // unexpected -- log it LOG.info(getName() + " caught: " + @@ -1201,6 +1292,9 @@ public abstract class HBaseServer implements RpcServer { this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE); + this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS, + DEFAULT_WARN_DELAYED_CALLS); + this.delayedCalls = new AtomicInteger(0); // Create the responder here @@ -1428,4 +1522,8 @@ public abstract class HBaseServer implements RpcServer { int nBytes = initialRemaining - buf.remaining(); return (nBytes > 0) ? nBytes : ret; } + + public Delayable getCurrentCall() { + return CurCall.get(); + } } diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 0da7f9e5414..aa04fd56457 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -26,7 +26,6 @@ import org.apache.hadoop.ipc.VersionedProtocol; import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; /** */ @@ -64,6 +63,13 @@ public interface RpcServer { void startThreads(); + /** + * Needed for delayed calls. We need to be able to store the current call + * so that we can complete it later. + * @return Call the server is currently handling. + */ + Delayable getCurrentCall(); + /** * Returns the metrics instance for reporting RPC call statistics */ diff --git a/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java b/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java new file mode 100644 index 00000000000..60777cf6d25 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java @@ -0,0 +1,202 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.Test; + +/** + * Test that delayed RPCs work. Fire up three calls, the first of which should + * be delayed. Check that the last two, which are undelayed, return before the + * first one. + */ +public class TestDelayedRpc { + public static RpcServer rpcServer; + + public static final int UNDELAYED = 0; + public static final int DELAYED = 1; + + @Test + public void testDelayedRpc() throws Exception { + Configuration conf = HBaseConfiguration.create(); + InetSocketAddress isa = new InetSocketAddress("localhost", 0); + + rpcServer = HBaseRPC.getServer(new TestRpcImpl(), + new Class[]{ TestRpcImpl.class }, + isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0); + rpcServer.start(); + + TestRpc client = (TestRpc) HBaseRPC.getProxy(TestRpc.class, 0, + rpcServer.getListenerAddress(), conf, 400); + + List results = new ArrayList(); + + TestThread th1 = new TestThread(client, true, results); + TestThread th2 = new TestThread(client, false, results); + TestThread th3 = new TestThread(client, false, results); + th1.start(); + Thread.sleep(100); + th2.start(); + Thread.sleep(200); + th3.start(); + + th1.join(); + th2.join(); + th3.join(); + + assertEquals(results.get(0).intValue(), UNDELAYED); + assertEquals(results.get(1).intValue(), UNDELAYED); + assertEquals(results.get(2).intValue(), DELAYED); + } + + private static class ListAppender extends AppenderSkeleton { + private List messages = new ArrayList(); + + @Override + protected void append(LoggingEvent event) { + messages.add(event.getMessage().toString()); + } + + @Override + public void close() { + } + + @Override + public boolean requiresLayout() { + return false; + } + + public List getMessages() { + return messages; + } + } + + @Test + public void testTooManyDelayedRpcs() throws Exception { + Configuration conf = HBaseConfiguration.create(); + final int MAX_DELAYED_RPC = 10; + conf.setInt("hbase.ipc.warn.delayedrpc.number", MAX_DELAYED_RPC); + + ListAppender listAppender = new ListAppender(); + Logger log = Logger.getLogger("org.apache.hadoop.ipc.HBaseServer"); + log.addAppender(listAppender); + + InetSocketAddress isa = new InetSocketAddress("localhost", 0); + rpcServer = HBaseRPC.getServer(new TestRpcImpl(), + new Class[]{ TestRpcImpl.class }, + isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0); + rpcServer.start(); + TestRpc client = (TestRpc) HBaseRPC.getProxy(TestRpc.class, 0, + rpcServer.getListenerAddress(), conf, 1000); + + Thread threads[] = new Thread[MAX_DELAYED_RPC + 1]; + + for (int i = 0; i < MAX_DELAYED_RPC; i++) { + threads[i] = new TestThread(client, true, null); + threads[i].start(); + } + + /* No warnings till here. */ + assertTrue(listAppender.getMessages().isEmpty()); + + /* This should give a warning. */ + threads[MAX_DELAYED_RPC] = new TestThread(client, true, null); + threads[MAX_DELAYED_RPC].start(); + + for (int i = 0; i < MAX_DELAYED_RPC; i++) { + threads[i].join(); + } + + assertFalse(listAppender.getMessages().isEmpty()); + assertTrue(listAppender.getMessages().get(0).startsWith( + "Too many delayed calls")); + + log.removeAppender(listAppender); + } + + public interface TestRpc extends VersionedProtocol { + int test(boolean delay); + } + + private static class TestRpcImpl implements TestRpc { + @Override + public int test(boolean delay) { + if (!delay) { + return UNDELAYED; + } + final Delayable call = rpcServer.getCurrentCall(); + call.startDelay(); + new Thread() { + public void run() { + try { + Thread.sleep(500); + call.endDelay(DELAYED); + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }.start(); + return 0xDEADBEEF; // this return value should not go back to client + } + + @Override + public long getProtocolVersion(String arg0, long arg1) throws IOException { + return 0; + } + } + + private static class TestThread extends Thread { + private TestRpc server; + private boolean delay; + private List results; + + public TestThread(TestRpc server, boolean delay, List results) { + this.server = server; + this.delay = delay; + this.results = results; + } + + @Override + public void run() { + Integer result = new Integer(server.test(delay)); + if (results != null) { + synchronized (results) { + results.add(result); + } + } + } + } +}