diff --git a/CHANGES.txt b/CHANGES.txt index 6c7a9264f57..f428bbd1fbd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -101,6 +101,8 @@ Release 0.19.0 - Unreleased hard code memory sizes HBASE-1000 Sleeper.sleep does not go back to sleep when interrupted and no stop flag given. + HBASE-900 Regionserver memory leak causing OOME during relatively + modest bulk importing; part 1 IMPROVEMENTS HBASE-901 Add a limit to key length, check key and value length on client side diff --git a/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java index 34feb6d7028..396f7059666 100644 --- a/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -50,7 +50,7 @@ import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion; import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.ipc.HRegionInterface; -import org.apache.hadoop.hbase.ipc.HbaseRPC; +import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.SoftValueSortedMap; import org.apache.hadoop.hbase.util.Writables; @@ -192,7 +192,7 @@ public class HConnectionManager implements HConstants { masterLocation = new HServerAddress(this.conf.get(MASTER_ADDRESS, DEFAULT_MASTER_ADDRESS)); try { - HMasterInterface tryMaster = (HMasterInterface)HbaseRPC.getProxy( + HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy( HMasterInterface.class, HBaseRPCProtocolVersion.versionID, masterLocation.getInetSocketAddress(), this.conf); @@ -732,7 +732,7 @@ public class HConnectionManager implements HConstants { server = this.servers.get(regionServer.toString()); if (server == null) { // Get a connection try { - server = (HRegionInterface)HbaseRPC.waitForProxy( + server = (HRegionInterface)HBaseRPC.waitForProxy( serverInterfaceClass, HBaseRPCProtocolVersion.versionID, regionServer.getInetSocketAddress(), this.conf, this.maxRPCAttempts); @@ -954,7 +954,7 @@ public class HConnectionManager implements HConstants { void close(boolean stopProxy) { if (master != null) { if (stopProxy) { - HbaseRPC.stopProxy(master); + HBaseRPC.stopProxy(master); } master = null; masterChecked = false; @@ -962,7 +962,7 @@ public class HConnectionManager implements HConstants { if (stopProxy) { synchronized (servers) { for (HRegionInterface i: servers.values()) { - HbaseRPC.stopProxy(i); + HBaseRPC.stopProxy(i); } } } diff --git a/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java b/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java new file mode 100644 index 00000000000..8fe8c7e253b --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java @@ -0,0 +1,844 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.net.Socket; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; +import java.net.ConnectException; + +import java.io.IOException; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.FilterInputStream; +import java.io.InputStream; + +import java.util.Hashtable; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import javax.net.SocketFactory; + +import org.apache.commons.logging.*; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; + +/** A client for an IPC service. IPC calls take a single {@link Writable} as a + * parameter, and return a {@link Writable} as their value. A service runs on + * a port and is defined by a parameter class and a value class. + * + *

This is the org.apache.hadoop.ipc.Client renamed as HBaseClient and + * moved into this package so can access package-private methods. + * + * @see HBaseServer + */ +public class HBaseClient { + + public static final Log LOG = + LogFactory.getLog("org.apache.hadoop.ipc.HBaseClass"); + private Hashtable connections = + new Hashtable(); + + private Class valueClass; // class of call values + private int counter; // counter for call ids + private AtomicBoolean running = new AtomicBoolean(true); // if client runs + final private Configuration conf; + final private int maxIdleTime; //connections will be culled if it was idle for + //maxIdleTime msecs + final private int maxRetries; //the max. no. of retries for socket connections + private boolean tcpNoDelay; // if T then disable Nagle's Algorithm + private int pingInterval; // how often sends ping to the server in msecs + + private SocketFactory socketFactory; // how to create sockets + private int refCount = 1; + + final private static String PING_INTERVAL_NAME = "ipc.ping.interval"; + final static int DEFAULT_PING_INTERVAL = 60000; // 1 min + final static int PING_CALL_ID = -1; + + /** + * set the ping interval value in configuration + * + * @param conf Configuration + * @param pingInterval the ping interval + */ + final public static void setPingInterval(Configuration conf, int pingInterval) { + conf.setInt(PING_INTERVAL_NAME, pingInterval); + } + + /** + * Get the ping interval from configuration; + * If not set in the configuration, return the default value. + * + * @param conf Configuration + * @return the ping interval + */ + final static int getPingInterval(Configuration conf) { + return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL); + } + + /** + * Increment this client's reference count + * + */ + synchronized void incCount() { + refCount++; + } + + /** + * Decrement this client's reference count + * + */ + synchronized void decCount() { + refCount--; + } + + /** + * Return if this client has no reference + * + * @return true if this client has no reference; false otherwise + */ + synchronized boolean isZeroReference() { + return refCount==0; + } + + /** A call waiting for a value. */ + private class Call { + int id; // call id + Writable param; // parameter + Writable value; // value, null if error + IOException error; // exception, null if value + boolean done; // true when call is done + + protected Call(Writable param) { + this.param = param; + synchronized (HBaseClient.this) { + this.id = counter++; + } + } + + /** Indicate when the call is complete and the + * value or error are available. Notifies by default. */ + protected synchronized void callComplete() { + this.done = true; + notify(); // notify caller + } + + /** Set the exception when there is an error. + * Notify the caller the call is done. + * + * @param error exception thrown by the call; either local or remote + */ + public synchronized void setException(IOException error) { + this.error = error; + callComplete(); + } + + /** Set the return value when there is no error. + * Notify the caller the call is done. + * + * @param value return value of the call. + */ + public synchronized void setValue(Writable value) { + this.value = value; + callComplete(); + } + } + + /** Thread that reads responses and notifies callers. Each connection owns a + * socket connected to a remote address. Calls are multiplexed through this + * socket: responses may be delivered out of order. */ + private class Connection extends Thread { + private ConnectionId remoteId; + private Socket socket = null; // connected socket + private DataInputStream in; + private DataOutputStream out; + + // currently active calls + private Hashtable calls = new Hashtable(); + private AtomicLong lastActivity = new AtomicLong();// last I/O activity time + private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed + private IOException closeException; // close reason + + public Connection(InetSocketAddress address) throws IOException { + this(new ConnectionId(address, null)); + } + + public Connection(ConnectionId remoteId) throws IOException { + if (remoteId.getAddress().isUnresolved()) { + throw new UnknownHostException("unknown host: " + + remoteId.getAddress().getHostName()); + } + this.remoteId = remoteId; + UserGroupInformation ticket = remoteId.getTicket(); + this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " + + remoteId.getAddress().toString() + + " from " + ((ticket==null)?"an unknown user":ticket.getUserName())); + this.setDaemon(true); + } + + /** Update lastActivity with the current time. */ + private void touch() { + lastActivity.set(System.currentTimeMillis()); + } + + /** + * Add a call to this connection's call queue and notify + * a listener; synchronized. + * Returns false if called during shutdown. + * @param call to add + * @return true if the call was added. + */ + private synchronized boolean addCall(Call call) { + if (shouldCloseConnection.get()) + return false; + calls.put(call.id, call); + notify(); + return true; + } + + /** This class sends a ping to the remote side when timeout on + * reading. If no failure is detected, it retries until at least + * a byte is read. + */ + private class PingInputStream extends FilterInputStream { + /* constructor */ + protected PingInputStream(InputStream in) { + super(in); + } + + /* Process timeout exception + * if the connection is not going to be closed, send a ping. + * otherwise, throw the timeout exception. + */ + private void handleTimeout(SocketTimeoutException e) throws IOException { + if (shouldCloseConnection.get() || !running.get()) { + throw e; + } else { + sendPing(); + } + } + + /** Read a byte from the stream. + * Send a ping if timeout on read. Retries if no failure is detected + * until a byte is read. + * @throws IOException for any IO problem other than socket timeout + */ + public int read() throws IOException { + do { + try { + return super.read(); + } catch (SocketTimeoutException e) { + handleTimeout(e); + } + } while (true); + } + + /** Read bytes into a buffer starting from offset off + * Send a ping if timeout on read. Retries if no failure is detected + * until a byte is read. + * + * @return the total number of bytes read; -1 if the connection is closed. + */ + public int read(byte[] buf, int off, int len) throws IOException { + do { + try { + return super.read(buf, off, len); + } catch (SocketTimeoutException e) { + handleTimeout(e); + } + } while (true); + } + } + + /** Connect to the server and set up the I/O streams. It then sends + * a header to the server and starts + * the connection thread that waits for responses. + */ + private synchronized void setupIOstreams() { + if (socket != null || shouldCloseConnection.get()) { + return; + } + + short ioFailures = 0; + short timeoutFailures = 0; + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Connecting to "+remoteId.getAddress()); + } + while (true) { + try { + this.socket = socketFactory.createSocket(); + this.socket.setTcpNoDelay(tcpNoDelay); + // connection time out is 20s + this.socket.connect(remoteId.getAddress(), 20000); + this.socket.setSoTimeout(pingInterval); + break; + } catch (SocketTimeoutException toe) { + /* The max number of retries is 45, + * which amounts to 20s*45 = 15 minutes retries. + */ + handleConnectionFailure(timeoutFailures++, 45, toe); + } catch (IOException ie) { + handleConnectionFailure(ioFailures++, maxRetries, ie); + } + } + this.in = new DataInputStream(new BufferedInputStream + (new PingInputStream(NetUtils.getInputStream(socket)))); + this.out = new DataOutputStream + (new BufferedOutputStream(NetUtils.getOutputStream(socket))); + writeHeader(); + + // update last activity time + touch(); + + // start the receiver thread after the socket connection has been set up + start(); + } catch (IOException e) { + markClosed(e); + close(); + } + } + + /* Handle connection failures + * + * If the current number of retries is equal to the max number of retries, + * stop retrying and throw the exception; Otherwise backoff 1 second and + * try connecting again. + * + * This Method is only called from inside setupIOstreams(), which is + * synchronized. Hence the sleep is synchronized; the locks will be retained. + * + * @param curRetries current number of retries + * @param maxRetries max number of retries allowed + * @param ioe failure reason + * @throws IOException if max number of retries is reached + */ + private void handleConnectionFailure( + int curRetries, int maxRetries, IOException ioe) throws IOException { + // close the current connection + try { + socket.close(); + } catch (IOException e) { + LOG.warn("Not able to close a socket", e); + } + // set socket to null so that the next call to setupIOstreams + // can start the process of connect all over again. + socket = null; + + // throw the exception if the maximum number of retries is reached + if (curRetries >= maxRetries) { + throw ioe; + } + + // otherwise back off and retry + try { + Thread.sleep(1000); + } catch (InterruptedException ignored) {} + + LOG.info("Retrying connect to server: " + remoteId.getAddress() + + ". Already tried " + curRetries + " time(s)."); + } + + /* Write the header for each connection + * Out is not synchronized because only the first thread does this. + */ + private void writeHeader() throws IOException { + out.write(HBaseServer.HEADER.array()); + out.write(HBaseServer.CURRENT_VERSION); + //When there are more fields we can have ConnectionHeader Writable. + DataOutputBuffer buf = new DataOutputBuffer(); + ObjectWritable.writeObject(buf, remoteId.getTicket(), + UserGroupInformation.class, conf); + int bufLen = buf.getLength(); + out.writeInt(bufLen); + out.write(buf.getData(), 0, bufLen); + } + + /* wait till someone signals us to start reading RPC response or + * it is idle too long, it is marked as to be closed, + * or the client is marked as not running. + * + * Return true if it is time to read a response; false otherwise. + */ + private synchronized boolean waitForWork() { + if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { + long timeout = maxIdleTime- + (System.currentTimeMillis()-lastActivity.get()); + if (timeout>0) { + try { + wait(timeout); + } catch (InterruptedException e) {} + } + } + + if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { + return true; + } else if (shouldCloseConnection.get()) { + return false; + } else if (calls.isEmpty()) { // idle connection closed or stopped + markClosed(null); + return false; + } else { // get stopped but there are still pending requests + markClosed((IOException)new IOException().initCause( + new InterruptedException())); + return false; + } + } + + public InetSocketAddress getRemoteAddress() { + return remoteId.getAddress(); + } + + /* Send a ping to the server if the time elapsed + * since last I/O activity is equal to or greater than the ping interval + */ + private synchronized void sendPing() throws IOException { + long curTime = System.currentTimeMillis(); + if ( curTime - lastActivity.get() >= pingInterval) { + lastActivity.set(curTime); + synchronized (out) { + out.writeInt(PING_CALL_ID); + out.flush(); + } + } + } + + public void run() { + if (LOG.isDebugEnabled()) + LOG.debug(getName() + ": starting, having connections " + + connections.size()); + + while (waitForWork()) {//wait here for work - read or close connection + receiveResponse(); + } + + close(); + + if (LOG.isDebugEnabled()) + LOG.debug(getName() + ": stopped, remaining connections " + + connections.size()); + } + + /** Initiates a call by sending the parameter to the remote server. + * Note: this is not called from the Connection thread, but by other + * threads. + */ + public void sendParam(Call call) { + if (shouldCloseConnection.get()) { + return; + } + + DataOutputBuffer d=null; + try { + synchronized (this.out) { + if (LOG.isDebugEnabled()) + LOG.debug(getName() + " sending #" + call.id); + + //for serializing the + //data to be written + d = new DataOutputBuffer(); + d.writeInt(call.id); + call.param.write(d); + byte[] data = d.getData(); + int dataLength = d.getLength(); + out.writeInt(dataLength); //first put the data length + out.write(data, 0, dataLength);//write the data + out.flush(); + } + } catch(IOException e) { + markClosed(e); + } finally { + //the buffer is just an in-memory buffer, but it is still polite to + // close early + IOUtils.closeStream(d); + } + } + + /* Receive a response. + * Because only one receiver, so no synchronization on in. + */ + private void receiveResponse() { + if (shouldCloseConnection.get()) { + return; + } + touch(); + + try { + int id = in.readInt(); // try to read an id + + if (LOG.isDebugEnabled()) + LOG.debug(getName() + " got value #" + id); + + Call call = calls.remove(id); + + boolean isError = in.readBoolean(); // read if error + if (isError) { + call.setException(new RemoteException( WritableUtils.readString(in), + WritableUtils.readString(in))); + } else { + Writable value = ReflectionUtils.newInstance(valueClass, conf); + value.readFields(in); // read value + call.setValue(value); + } + } catch (IOException e) { + markClosed(e); + } + } + + private synchronized void markClosed(IOException e) { + if (shouldCloseConnection.compareAndSet(false, true)) { + closeException = e; + notifyAll(); + } + } + + /** Close the connection. */ + private synchronized void close() { + if (!shouldCloseConnection.get()) { + LOG.error("The connection is not in the closed state"); + return; + } + + // release the resources + // first thing to do;take the connection out of the connection list + synchronized (connections) { + if (connections.get(remoteId) == this) { + connections.remove(remoteId); + } + } + + // close the streams and therefore the socket + IOUtils.closeStream(out); + IOUtils.closeStream(in); + + // clean up all calls + if (closeException == null) { + if (!calls.isEmpty()) { + LOG.warn( + "A connection is closed for no cause and calls are not empty"); + + // clean up calls anyway + closeException = new IOException("Unexpected closed connection"); + cleanupCalls(); + } + } else { + // log the info + if (LOG.isDebugEnabled()) { + LOG.debug("closing ipc connection to " + remoteId.address + ": " + + closeException.getMessage(),closeException); + } + + // cleanup calls + cleanupCalls(); + } + if (LOG.isDebugEnabled()) + LOG.debug(getName() + ": closed"); + } + + /* Cleanup all calls and mark them as done */ + private void cleanupCalls() { + Iterator> itor = calls.entrySet().iterator() ; + while (itor.hasNext()) { + Call c = itor.next().getValue(); + c.setException(closeException); // local exception + itor.remove(); + } + } + } + + /** Call implementation used for parallel calls. */ + private class ParallelCall extends Call { + private ParallelResults results; + private int index; + + public ParallelCall(Writable param, ParallelResults results, int index) { + super(param); + this.results = results; + this.index = index; + } + + /** Deliver result to result collector. */ + protected void callComplete() { + results.callComplete(this); + } + } + + /** Result collector for parallel calls. */ + private static class ParallelResults { + private Writable[] values; + private int size; + private int count; + + public ParallelResults(int size) { + this.values = new Writable[size]; + this.size = size; + } + + /** Collect a result. */ + public synchronized void callComplete(ParallelCall call) { + values[call.index] = call.value; // store the value + count++; // count it + if (count == size) // if all values are in + notify(); // then notify waiting caller + } + } + + /** Construct an IPC client whose values are of the given {@link Writable} + * class. */ + public HBaseClient(Class valueClass, Configuration conf, + SocketFactory factory) { + this.valueClass = valueClass; + this.maxIdleTime = + conf.getInt("ipc.client.connection.maxidletime", 10000); //10s + this.maxRetries = conf.getInt("ipc.client.connect.max.retries", 10); + this.tcpNoDelay = conf.getBoolean("ipc.client.tcpnodelay", false); + this.pingInterval = getPingInterval(conf); + if (LOG.isDebugEnabled()) { + LOG.debug("The ping interval is" + this.pingInterval + "ms."); + } + this.conf = conf; + this.socketFactory = factory; + } + + /** + * Construct an IPC client with the default SocketFactory + * @param valueClass + * @param conf + */ + public HBaseClient(Class valueClass, Configuration conf) { + this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf)); + } + + /** Return the socket factory of this client + * + * @return this client's socket factory + */ + SocketFactory getSocketFactory() { + return socketFactory; + } + + /** Stop all threads related to this client. No further calls may be made + * using this client. */ + public void stop() { + if (LOG.isDebugEnabled()) { + LOG.debug("Stopping client"); + } + + if (!running.compareAndSet(true, false)) { + return; + } + + // wake up all connections + synchronized (connections) { + for (Connection conn : connections.values()) { + conn.interrupt(); + } + } + + // wait until all connections are closed + while (!connections.isEmpty()) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } + } + } + + /** Make a call, passing param, to the IPC server running at + * address, returning the value. Throws exceptions if there are + * network problems or if the remote code threw an exception. */ + public Writable call(Writable param, InetSocketAddress address) + throws InterruptedException, IOException { + return call(param, address, null); + } + + public Writable call(Writable param, InetSocketAddress addr, + UserGroupInformation ticket) + throws InterruptedException, IOException { + Call call = new Call(param); + Connection connection = getConnection(addr, ticket, call); + connection.sendParam(call); // send the parameter + synchronized (call) { + while (!call.done) { + try { + call.wait(); // wait for the result + } catch (InterruptedException ignored) {} + } + + if (call.error != null) { + if (call.error instanceof RemoteException) { + call.error.fillInStackTrace(); + throw call.error; + } else { // local exception + throw wrapException(addr, call.error); + } + } else { + return call.value; + } + } + } + + /** + * Take an IOException and the address we were trying to connect to + * and return an IOException with the input exception as the cause. + * The new exception provides the stack trace of the place where + * the exception is thrown and some extra diagnostics information. + * If the exception is ConnectException or SocketTimeoutException, + * return a new one of the same type; Otherwise return an IOException. + * + * @param addr target address + * @param exception the relevant exception + * @return an exception to throw + */ + private IOException wrapException(InetSocketAddress addr, + IOException exception) { + if (exception instanceof ConnectException) { + //connection refused; include the host:port in the error + return (ConnectException)new ConnectException( + "Call to " + addr + " failed on connection exception: " + exception) + .initCause(exception); + } else if (exception instanceof SocketTimeoutException) { + return (SocketTimeoutException)new SocketTimeoutException( + "Call to " + addr + " failed on socket timeout exception: " + + exception).initCause(exception); + } else { + return (IOException)new IOException( + "Call to " + addr + " failed on local exception: " + exception) + .initCause(exception); + + } + } + + /** Makes a set of calls in parallel. Each parameter is sent to the + * corresponding address. When all values are available, or have timed out + * or errored, the collected results are returned in an array. The array + * contains nulls for calls that timed out or errored. */ + public Writable[] call(Writable[] params, InetSocketAddress[] addresses) + throws IOException { + if (addresses.length == 0) return new Writable[0]; + + ParallelResults results = new ParallelResults(params.length); + synchronized (results) { + for (int i = 0; i < params.length; i++) { + ParallelCall call = new ParallelCall(params[i], results, i); + try { + Connection connection = getConnection(addresses[i], null, call); + connection.sendParam(call); // send each parameter + } catch (IOException e) { + // log errors + LOG.info("Calling "+addresses[i]+" caught: " + + e.getMessage(),e); + results.size--; // wait for one fewer result + } + } + while (results.count != results.size) { + try { + results.wait(); // wait for all results + } catch (InterruptedException e) {} + } + + return results.values; + } + } + + /** Get a connection from the pool, or create a new one and add it to the + * pool. Connections to a given host/port are reused. */ + private Connection getConnection(InetSocketAddress addr, + UserGroupInformation ticket, + Call call) + throws IOException { + if (!running.get()) { + // the client is stopped + throw new IOException("The client is stopped"); + } + Connection connection; + /* we could avoid this allocation for each RPC by having a + * connectionsId object and with set() method. We need to manage the + * refs for keys in HashMap properly. For now its ok. + */ + ConnectionId remoteId = new ConnectionId(addr, ticket); + do { + synchronized (connections) { + connection = connections.get(remoteId); + if (connection == null) { + connection = new Connection(remoteId); + connections.put(remoteId, connection); + } + } + } while (!connection.addCall(call)); + + //we don't invoke the method below inside "synchronized (connections)" + //block above. The reason for that is if the server happens to be slow, + //it will take longer to establish a connection and that will slow the + //entire system down. + connection.setupIOstreams(); + return connection; + } + + /** + * This class holds the address and the user ticket. The client connections + * to servers are uniquely identified by + */ + private static class ConnectionId { + InetSocketAddress address; + UserGroupInformation ticket; + + ConnectionId(InetSocketAddress address, UserGroupInformation ticket) { + this.address = address; + this.ticket = ticket; + } + + InetSocketAddress getAddress() { + return address; + } + UserGroupInformation getTicket() { + return ticket; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof ConnectionId) { + ConnectionId id = (ConnectionId) obj; + return address.equals(id.address) && ticket == id.ticket; + //Note : ticket is a ref comparision. + } + return false; + } + + @Override + public int hashCode() { + return address.hashCode() ^ System.identityHashCode(ticket); + } + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java b/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java similarity index 97% rename from src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java rename to src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java index 793961f059a..39a8c325972 100644 --- a/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java +++ b/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java @@ -44,8 +44,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.io.HbaseObjectWritable; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.ipc.Client; -import org.apache.hadoop.ipc.HBaseClient; import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate; import org.apache.hadoop.net.NetUtils; @@ -76,14 +74,14 @@ import org.apache.hadoop.security.UserGroupInformation; * All methods in the protocol should throw only IOException. No field data of * the protocol instance is transmitted. */ -public class HbaseRPC { +public class HBaseRPC { // Leave this out in the hadoop ipc package but keep class name. Do this // so that we dont' get the logging of this class's invocations by doing our // blanket enabling DEBUG on the o.a.h.h. package. private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.HbaseRPC"); - private HbaseRPC() { + private HBaseRPC() { super(); } // no public ctor @@ -235,8 +233,8 @@ public class HbaseRPC { /* Cache a client using its socket factory as the hash key */ static private class ClientCache { - private Map clients = - new HashMap(); + private Map clients = + new HashMap(); /** * Construct & cache an IPC client with the user-provided SocketFactory @@ -245,14 +243,14 @@ public class HbaseRPC { * @param conf Configuration * @return an IPC client */ - private synchronized Client getClient(Configuration conf, + private synchronized HBaseClient getClient(Configuration conf, SocketFactory factory) { // Construct & cache client. The configuration is only used for timeout, // and Clients have connection pools. So we can either (a) lose some // connection pooling and leak sockets, or (b) use the same timeout for all // configurations. Since the IPC is usually intended globally, not // per-job, we choose (a). - Client client = clients.get(factory); + HBaseClient client = clients.get(factory); if (client == null) { // Make an hbase client instead of hadoop Client. client = new HBaseClient(HbaseObjectWritable.class, conf, factory); @@ -270,7 +268,7 @@ public class HbaseRPC { * @param conf Configuration * @return an IPC client */ - private synchronized Client getClient(Configuration conf) { + private synchronized HBaseClient getClient(Configuration conf) { return getClient(conf, SocketFactory.getDefault()); } @@ -278,7 +276,7 @@ public class HbaseRPC { * Stop a RPC client connection * A RPC client is closed only when its reference count becomes zero. */ - private void stopClient(Client client) { + private void stopClient(HBaseClient client) { synchronized (this) { ((HBaseClient)client).decCount(); if (((HBaseClient)client).isZeroReference()) { @@ -296,7 +294,7 @@ public class HbaseRPC { private static class Invoker implements InvocationHandler { private InetSocketAddress address; private UserGroupInformation ticket; - private Client client; + private HBaseClient client; private boolean isClosed = false; /** @@ -521,7 +519,7 @@ public class HbaseRPC { Invocation[] invocations = new Invocation[params.length]; for (int i = 0; i < params.length; i++) invocations[i] = new Invocation(method, params[i]); - Client client = CLIENTS.getClient(conf); + HBaseClient client = CLIENTS.getClient(conf); try { Writable[] wrappedValues = client.call(invocations, addrs); @@ -578,7 +576,7 @@ public class HbaseRPC { } /** An RPC Server. */ - public static class Server extends org.apache.hadoop.ipc.Server { + public static class Server extends HBaseServer { private Object instance; private Class implementation; private boolean verbose; diff --git a/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java b/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java index 6b8456f9d39..2a4bf0ad680 100644 --- a/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java +++ b/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java @@ -63,7 +63,8 @@ public interface HBaseRPCProtocolVersion extends VersionedProtocol { *

  • Version 11: Changed getClosestRowBefore signature.
  • *
  • Version 12: HServerLoad extensions (HBASE-1018).
  • *
  • Version 13: HBASE-847
  • + *
  • Version 14: HBASE-900
  • * */ - public static final long versionID = 13L; + public static final long versionID = 14L; } diff --git a/src/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java b/src/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java new file mode 100644 index 00000000000..a53afd29c00 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.metrics.MetricsContext; +import org.apache.hadoop.metrics.MetricsRecord; +import org.apache.hadoop.metrics.MetricsUtil; +import org.apache.hadoop.metrics.Updater; +import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate; + +/** + * + * This class is for maintaining the various RPC statistics + * and publishing them through the metrics interfaces. + * This also registers the JMX MBean for RPC. + *

    + * This class has a number of metrics variables that are publicly accessible; + * these variables (objects) have methods to update their values; + * for example: + *

    {@link #rpcQueueTime}.inc(time) + * + */ +public class HBaseRpcMetrics implements Updater { + private MetricsRecord metricsRecord; + private static Log LOG = LogFactory.getLog(HBaseRpcMetrics.class); + + public HBaseRpcMetrics(String hostName, String port, HBaseServer server) { + MetricsContext context = MetricsUtil.getContext("rpc"); + metricsRecord = MetricsUtil.createRecord(context, "metrics"); + + metricsRecord.setTag("port", port); + + LOG.info("Initializing RPC Metrics with hostName=" + + hostName + ", port=" + port); + + context.registerUpdater(this); + } + + + /** + * The metrics variables are public: + * - they can be set directly by calling their set/inc methods + * -they can also be read directly - e.g. JMX does this. + */ + + public MetricsTimeVaryingRate rpcQueueTime = new MetricsTimeVaryingRate("RpcQueueTime"); + public MetricsTimeVaryingRate rpcProcessingTime = new MetricsTimeVaryingRate("RpcProcessingTime"); + + public Map metricsList = Collections.synchronizedMap(new HashMap()); + + + + /** + * Push the metrics to the monitoring subsystem on doUpdate() call. + */ + public void doUpdates(MetricsContext context) { + rpcQueueTime.pushMetric(metricsRecord); + rpcProcessingTime.pushMetric(metricsRecord); + + synchronized (metricsList) { + // Iterate through the rpcMetrics hashmap to propogate the different rpc metrics. + Set keys = metricsList.keySet(); + + Iterator keyIter = keys.iterator(); + + while (keyIter.hasNext()) { + Object key = keyIter.next(); + MetricsTimeVaryingRate value = metricsList.get(key); + + value.pushMetric(metricsRecord); + } + } + metricsRecord.update(); + } + + public void shutdown() { + // Nothing to do + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java b/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java new file mode 100644 index 00000000000..608fdef09a4 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java @@ -0,0 +1,1061 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.BindException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; + +/** An abstract IPC service. IPC calls take a single {@link Writable} as a + * parameter, and return a {@link Writable} as their value. A service runs on + * a port and is defined by a parameter class and a value class. + * + * + *

    Copied local so can fix HBASE-900. + * + * @see HBaseClient + */ +public abstract class HBaseServer { + + /** + * The first four bytes of Hadoop RPC connections + */ + public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes()); + + // 1 : Introduce ping and server does not throw away RPCs + public static final byte CURRENT_VERSION = 2; + + /** + * How many calls/handler are allowed in the queue. + */ + private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100; + + public static final Log LOG = + LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer"); + + private static final ThreadLocal SERVER = new ThreadLocal(); + + /** Returns the server instance called under or null. May be called under + * {@link #call(Writable, long)} implementations, and under {@link Writable} + * methods of paramters and return values. Permits applications to access + * the server context.*/ + public static HBaseServer get() { + return SERVER.get(); + } + + /** This is set to Call object before Handler invokes an RPC and reset + * after the call returns. + */ + private static final ThreadLocal CurCall = new ThreadLocal(); + + /** Returns the remote side ip address when invoked inside an RPC + * Returns null incase of an error. + */ + public static InetAddress getRemoteIp() { + Call call = CurCall.get(); + if (call != null) { + return call.connection.socket.getInetAddress(); + } + return null; + } + /** Returns remote address as a string when invoked inside an RPC. + * Returns null in case of an error. + */ + public static String getRemoteAddress() { + InetAddress addr = getRemoteIp(); + return (addr == null) ? null : addr.getHostAddress(); + } + + private String bindAddress; + private int port; // port we listen on + private int handlerCount; // number of handler threads + private Class paramClass; // class of call parameters + private int maxIdleTime; // the maximum idle time after + // which a client may be disconnected + private int thresholdIdleConnections; // the number of idle connections + // after which we will start + // cleaning up idle + // connections + int maxConnectionsToNuke; // the max number of + // connections to nuke + //during a cleanup + + protected HBaseRpcMetrics rpcMetrics; + + private Configuration conf; + + private int maxQueueSize; + private int socketSendBufferSize; + private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm + + volatile private boolean running = true; // true while server runs + private BlockingQueue callQueue; // queued calls + + private List connectionList = + Collections.synchronizedList(new LinkedList()); + //maintain a list + //of client connections + private Listener listener = null; + private Responder responder = null; + private int numConnections = 0; + private Handler[] handlers = null; + + /** + * A convenience method to bind to a given address and report + * better exceptions if the address is not a valid host. + * @param socket the socket to bind + * @param address the address to bind to + * @param backlog the number of connections allowed in the queue + * @throws BindException if the address can't be bound + * @throws UnknownHostException if the address isn't a valid host name + * @throws IOException other random errors from bind + */ + public static void bind(ServerSocket socket, InetSocketAddress address, + int backlog) throws IOException { + try { + socket.bind(address, backlog); + } catch (BindException e) { + BindException bindException = new BindException("Problem binding to " + address + + " : " + e.getMessage()); + bindException.initCause(e); + throw bindException; + } catch (SocketException e) { + // If they try to bind to a different host's address, give a better + // error message. + if ("Unresolved address".equals(e.getMessage())) { + throw new UnknownHostException("Invalid hostname for server: " + + address.getHostName()); + } else { + throw e; + } + } + } + + /** A call queued for handling. */ + private static class Call { + private int id; // the client's call id + private Writable param; // the parameter passed + private Connection connection; // connection to client + private long timestamp; // the time received when response is null + // the time served when response is not null + private ByteBuffer response; // the response for this call + + public Call(int id, Writable param, Connection connection) { + this.id = id; + this.param = param; + this.connection = connection; + this.timestamp = System.currentTimeMillis(); + this.response = null; + } + + @Override + public String toString() { + return param.toString() + " from " + connection.toString(); + } + + public void setResponse(ByteBuffer response) { + this.response = response; + } + } + + /** Listens on the socket. Creates jobs for the handler threads*/ + private class Listener extends Thread { + + private ServerSocketChannel acceptChannel = null; //the accept channel + private Selector selector = null; //the selector that we use for the server + private InetSocketAddress address; //the address we bind at + private Random rand = new Random(); + private long lastCleanupRunTime = 0; //the last time when a cleanup connec- + //-tion (for idle connections) ran + private long cleanupInterval = 10000; //the minimum interval between + //two cleanup runs + private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128); + + public Listener() throws IOException { + address = new InetSocketAddress(bindAddress, port); + // Create a new server socket and set to non blocking mode + acceptChannel = ServerSocketChannel.open(); + acceptChannel.configureBlocking(false); + + // Bind the server socket to the local host and port + bind(acceptChannel.socket(), address, backlogLength); + port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port + // create a selector; + selector= Selector.open(); + + // Register accepts on the server socket with the selector. + acceptChannel.register(selector, SelectionKey.OP_ACCEPT); + this.setName("IPC Server listener on " + port); + this.setDaemon(true); + } + /** cleanup connections from connectionList. Choose a random range + * to scan and also have a limit on the number of the connections + * that will be cleanedup per run. The criteria for cleanup is the time + * for which the connection was idle. If 'force' is true then all + * connections will be looked at for the cleanup. + */ + private void cleanupConnections(boolean force) { + if (force || numConnections > thresholdIdleConnections) { + long currentTime = System.currentTimeMillis(); + if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) { + return; + } + int start = 0; + int end = numConnections - 1; + if (!force) { + start = rand.nextInt() % numConnections; + end = rand.nextInt() % numConnections; + int temp; + if (end < start) { + temp = start; + start = end; + end = temp; + } + } + int i = start; + int numNuked = 0; + while (i <= end) { + Connection c; + synchronized (connectionList) { + try { + c = connectionList.get(i); + } catch (Exception e) {return;} + } + if (c.timedOut(currentTime)) { + if (LOG.isDebugEnabled()) + LOG.debug(getName() + ": disconnecting client " + c.getHostAddress()); + closeConnection(c); + numNuked++; + end--; + c = null; + if (!force && numNuked == maxConnectionsToNuke) break; + } + else i++; + } + lastCleanupRunTime = System.currentTimeMillis(); + } + } + + @Override + public void run() { + LOG.info(getName() + ": starting"); + SERVER.set(HBaseServer.this); + while (running) { + SelectionKey key = null; + try { + selector.select(); + Iterator iter = selector.selectedKeys().iterator(); + while (iter.hasNext()) { + key = iter.next(); + iter.remove(); + try { + if (key.isValid()) { + if (key.isAcceptable()) + doAccept(key); + else if (key.isReadable()) + doRead(key); + } + } catch (IOException e) { + } + key = null; + } + } catch (OutOfMemoryError e) { + // we can run out of memory if we have too many threads + // log the event and sleep for a minute and give + // some thread(s) a chance to finish + LOG.warn("Out of Memory in server select", e); + closeCurrentConnection(key, e); + cleanupConnections(true); + try { Thread.sleep(60000); } catch (Exception ie) {} + } catch (InterruptedException e) { + if (running) { // unexpected -- log it + LOG.info(getName() + " caught: " + + StringUtils.stringifyException(e)); + } + } catch (Exception e) { + closeCurrentConnection(key, e); + } + cleanupConnections(false); + } + LOG.info("Stopping " + this.getName()); + + synchronized (this) { + try { + acceptChannel.close(); + selector.close(); + } catch (IOException e) { } + + selector= null; + acceptChannel= null; + + // clean up all connections + while (!connectionList.isEmpty()) { + closeConnection(connectionList.remove(0)); + } + } + } + + private void closeCurrentConnection(SelectionKey key, Throwable e) { + if (key != null) { + Connection c = (Connection)key.attachment(); + if (c != null) { + if (LOG.isDebugEnabled()) + LOG.debug(getName() + ": disconnecting client " + c.getHostAddress()); + closeConnection(c); + c = null; + } + } + } + + InetSocketAddress getAddress() { + return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress(); + } + + void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { + Connection c = null; + ServerSocketChannel server = (ServerSocketChannel) key.channel(); + // accept up to 10 connections + for (int i=0; i<10; i++) { + SocketChannel channel = server.accept(); + if (channel==null) return; + + channel.configureBlocking(false); + channel.socket().setTcpNoDelay(tcpNoDelay); + SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ); + c = new Connection(readKey, channel, System.currentTimeMillis()); + readKey.attach(c); + synchronized (connectionList) { + connectionList.add(numConnections, c); + numConnections++; + } + if (LOG.isDebugEnabled()) + LOG.debug("Server connection from " + c.toString() + + "; # active connections: " + numConnections + + "; # queued calls: " + callQueue.size()); + } + } + + void doRead(SelectionKey key) throws InterruptedException { + int count = 0; + Connection c = (Connection)key.attachment(); + if (c == null) { + return; + } + c.setLastContact(System.currentTimeMillis()); + + try { + count = c.readAndProcess(); + } catch (InterruptedException ieo) { + throw ieo; + } catch (Exception e) { + LOG.debug(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e); + count = -1; //so that the (count < 0) block is executed + } + if (count < 0) { + if (LOG.isDebugEnabled()) + LOG.debug(getName() + ": disconnecting client " + + c.getHostAddress() + ". Number of active connections: "+ + numConnections); + closeConnection(c); + c = null; + } + else { + c.setLastContact(System.currentTimeMillis()); + } + } + + synchronized void doStop() { + if (selector != null) { + selector.wakeup(); + Thread.yield(); + } + if (acceptChannel != null) { + try { + acceptChannel.socket().close(); + } catch (IOException e) { + LOG.info(getName() + ":Exception in closing listener socket. " + e); + } + } + } + } + + // Sends responses of RPC back to clients. + private class Responder extends Thread { + private Selector writeSelector; + private int pending; // connections waiting to register + + final static int PURGE_INTERVAL = 900000; // 15mins + + Responder() throws IOException { + this.setName("IPC Server Responder"); + this.setDaemon(true); + writeSelector = Selector.open(); // create a selector + pending = 0; + } + + @Override + public void run() { + LOG.info(getName() + ": starting"); + SERVER.set(HBaseServer.this); + long lastPurgeTime = 0; // last check for old calls. + + while (running) { + try { + waitPending(); // If a channel is being registered, wait. + writeSelector.select(PURGE_INTERVAL); + Iterator iter = writeSelector.selectedKeys().iterator(); + while (iter.hasNext()) { + SelectionKey key = iter.next(); + iter.remove(); + try { + if (key.isValid() && key.isWritable()) { + doAsyncWrite(key); + } + } catch (IOException e) { + LOG.info(getName() + ": doAsyncWrite threw exception " + e); + } + } + long now = System.currentTimeMillis(); + if (now < lastPurgeTime + PURGE_INTERVAL) { + continue; + } + lastPurgeTime = now; + // + // If there were some calls that have not been sent out for a + // long time, discard them. + // + LOG.debug("Checking for old call responses."); + ArrayList calls; + + // get the list of channels from list of keys. + synchronized (writeSelector.keys()) { + calls = new ArrayList(writeSelector.keys().size()); + iter = writeSelector.keys().iterator(); + while (iter.hasNext()) { + SelectionKey key = iter.next(); + Call call = (Call)key.attachment(); + if (call != null && key.channel() == call.connection.channel) { + calls.add(call); + } + } + } + + for(Call call : calls) { + try { + doPurge(call, now); + } catch (IOException e) { + LOG.warn("Error in purging old calls " + e); + } + } + } catch (OutOfMemoryError e) { + // + // we can run out of memory if we have too many threads + // log the event and sleep for a minute and give + // some thread(s) a chance to finish + // + LOG.warn("Out of Memory in server select", e); + try { Thread.sleep(60000); } catch (Exception ie) {} + } catch (Exception e) { + LOG.warn("Exception in Responder " + + StringUtils.stringifyException(e)); + } + } + LOG.info("Stopping " + this.getName()); + } + + private void doAsyncWrite(SelectionKey key) throws IOException { + Call call = (Call)key.attachment(); + if (call == null) { + return; + } + if (key.channel() != call.connection.channel) { + throw new IOException("doAsyncWrite: bad channel"); + } + + synchronized(call.connection.responseQueue) { + if (processResponse(call.connection.responseQueue, false)) { + try { + key.interestOps(0); + } catch (CancelledKeyException e) { + /* The Listener/reader might have closed the socket. + * We don't explicitly cancel the key, so not sure if this will + * ever fire. + * This warning could be removed. + */ + LOG.warn("Exception while changing ops : " + e); + } + } + } + } + + // + // Remove calls that have been pending in the responseQueue + // for a long time. + // + private void doPurge(Call call, long now) throws IOException { + LinkedList responseQueue = call.connection.responseQueue; + synchronized (responseQueue) { + Iterator iter = responseQueue.listIterator(0); + while (iter.hasNext()) { + call = iter.next(); + if (now > call.timestamp + PURGE_INTERVAL) { + closeConnection(call.connection); + break; + } + } + } + } + + // Processes one response. Returns true if there are no more pending + // data for this channel. + // + private boolean processResponse(LinkedList responseQueue, + boolean inHandler) throws IOException { + boolean error = true; + boolean done = false; // there is more data for this channel. + int numElements = 0; + Call call = null; + try { + synchronized (responseQueue) { + // + // If there are no items for this channel, then we are done + // + numElements = responseQueue.size(); + if (numElements == 0) { + error = false; + return true; // no more data for this channel. + } + // + // Extract the first call + // + call = responseQueue.removeFirst(); + SocketChannel channel = call.connection.channel; + if (LOG.isDebugEnabled()) { + LOG.debug(getName() + ": responding to #" + call.id + " from " + + call.connection); + } + // + // Send as much data as we can in the non-blocking fashion + // + int numBytes = channel.write(call.response); + if (numBytes < 0) { + return true; + } + if (!call.response.hasRemaining()) { + call.connection.decRpcCount(); + if (numElements == 1) { // last call fully processes. + done = true; // no more data for this channel. + } else { + done = false; // more calls pending to be sent. + } + if (LOG.isDebugEnabled()) { + LOG.debug(getName() + ": responding to #" + call.id + " from " + + call.connection + " Wrote " + numBytes + " bytes."); + } + } else { + // + // If we were unable to write the entire response out, then + // insert in Selector queue. + // + call.connection.responseQueue.addFirst(call); + + if (inHandler) { + // set the serve time when the response has to be sent later + call.timestamp = System.currentTimeMillis(); + + incPending(); + try { + // Wakeup the thread blocked on select, only then can the call + // to channel.register() complete. + writeSelector.wakeup(); + channel.register(writeSelector, SelectionKey.OP_WRITE, call); + } catch (ClosedChannelException e) { + //Its ok. channel might be closed else where. + done = true; + } finally { + decPending(); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug(getName() + ": responding to #" + call.id + " from " + + call.connection + " Wrote partial " + numBytes + + " bytes."); + } + } + error = false; // everything went off well + } + } finally { + if (error && call != null) { + LOG.warn(getName()+", call " + call + ": output error"); + done = true; // error. no more data for this channel. + closeConnection(call.connection); + } + } + return done; + } + + // + // Enqueue a response from the application. + // + void doRespond(Call call) throws IOException { + synchronized (call.connection.responseQueue) { + call.connection.responseQueue.addLast(call); + if (call.connection.responseQueue.size() == 1) { + processResponse(call.connection.responseQueue, true); + } + } + } + + private synchronized void incPending() { // call waiting to be enqueued. + pending++; + } + + private synchronized void decPending() { // call done enqueueing. + pending--; + notify(); + } + + private synchronized void waitPending() throws InterruptedException { + while (pending > 0) { + wait(); + } + } + } + + /** Reads calls from a connection and queues them for handling. */ + private class Connection { + private boolean versionRead = false; //if initial signature and + //version are read + private boolean headerRead = false; //if the connection header that + //follows version is read. + private SocketChannel channel; + private ByteBuffer data; + private ByteBuffer dataLengthBuffer; + private LinkedList responseQueue; + private volatile int rpcCount = 0; // number of outstanding rpcs + private long lastContact; + private int dataLength; + private Socket socket; + // Cache the remote host & port info so that even if the socket is + // disconnected, we can say where it used to connect to. + private String hostAddress; + private int remotePort; + private UserGroupInformation ticket = null; + + public Connection(SelectionKey key, SocketChannel channel, + long lastContact) { + this.channel = channel; + this.lastContact = lastContact; + this.data = null; + this.dataLengthBuffer = ByteBuffer.allocate(4); + this.socket = channel.socket(); + InetAddress addr = socket.getInetAddress(); + if (addr == null) { + this.hostAddress = "*Unknown*"; + } else { + this.hostAddress = addr.getHostAddress(); + } + this.remotePort = socket.getPort(); + this.responseQueue = new LinkedList(); + if (socketSendBufferSize != 0) { + try { + socket.setSendBufferSize(socketSendBufferSize); + } catch (IOException e) { + LOG.warn("Connection: unable to set socket send buffer size to " + + socketSendBufferSize); + } + } + } + + @Override + public String toString() { + return getHostAddress() + ":" + remotePort; + } + + public String getHostAddress() { + return hostAddress; + } + + public void setLastContact(long lastContact) { + this.lastContact = lastContact; + } + + public long getLastContact() { + return lastContact; + } + + /* Return true if the connection has no outstanding rpc */ + private boolean isIdle() { + return rpcCount == 0; + } + + /* Decrement the outstanding RPC count */ + private void decRpcCount() { + rpcCount--; + } + + /* Increment the outstanding RPC count */ + private void incRpcCount() { + rpcCount++; + } + + private boolean timedOut(long currentTime) { + if (isIdle() && currentTime - lastContact > maxIdleTime) + return true; + return false; + } + + public int readAndProcess() throws IOException, InterruptedException { + while (true) { + /* Read at most one RPC. If the header is not read completely yet + * then iterate until we read first RPC or until there is no data left. + */ + int count = -1; + if (dataLengthBuffer.remaining() > 0) { + count = channel.read(dataLengthBuffer); + if (count < 0 || dataLengthBuffer.remaining() > 0) + return count; + } + + if (!versionRead) { + //Every connection is expected to send the header. + ByteBuffer versionBuffer = ByteBuffer.allocate(1); + count = channel.read(versionBuffer); + if (count <= 0) { + return count; + } + int version = versionBuffer.get(0); + + dataLengthBuffer.flip(); + if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) { + //Warning is ok since this is not supposed to happen. + LOG.warn("Incorrect header or version mismatch from " + + hostAddress + ":" + remotePort + + " got version " + version + + " expected version " + CURRENT_VERSION); + return -1; + } + dataLengthBuffer.clear(); + versionRead = true; + continue; + } + + if (data == null) { + dataLengthBuffer.flip(); + dataLength = dataLengthBuffer.getInt(); + + if (dataLength == HBaseClient.PING_CALL_ID) { + dataLengthBuffer.clear(); + return 0; //ping message + } + data = ByteBuffer.allocate(dataLength); + incRpcCount(); // Increment the rpc count + } + + count = channel.read(data); + + if (data.remaining() == 0) { + dataLengthBuffer.clear(); + data.flip(); + if (headerRead) { + processData(); + data = null; + return count; + } else { + processHeader(); + headerRead = true; + data = null; + continue; + } + } + return count; + } + } + + /// Reads the header following version + private void processHeader() throws IOException { + /* In the current version, it is just a ticket. + * Later we could introduce a "ConnectionHeader" class. + */ + DataInputStream in = + new DataInputStream(new ByteArrayInputStream(data.array())); + ticket = (UserGroupInformation) ObjectWritable.readObject(in, conf); + } + + private void processData() throws IOException, InterruptedException { + DataInputStream dis = + new DataInputStream(new ByteArrayInputStream(data.array())); + int id = dis.readInt(); // try to read an id + + if (LOG.isDebugEnabled()) + LOG.debug(" got #" + id); + + Writable param = ReflectionUtils.newInstance(paramClass, conf); // read param + param.readFields(dis); + + Call call = new Call(id, param, this); + callQueue.put(call); // queue the call; maybe blocked here + } + + private synchronized void close() throws IOException { + data = null; + dataLengthBuffer = null; + if (!channel.isOpen()) + return; + try {socket.shutdownOutput();} catch(Exception e) {} + if (channel.isOpen()) { + try {channel.close();} catch(Exception e) {} + } + try {socket.close();} catch(Exception e) {} + } + } + + /** Handles queued calls . */ + private class Handler extends Thread { + public Handler(int instanceNumber) { + this.setDaemon(true); + this.setName("IPC Server handler "+ instanceNumber + " on " + port); + } + + @Override + public void run() { + LOG.info(getName() + ": starting"); + SERVER.set(HBaseServer.this); + final int buffersize = 16 * 1024; + ByteArrayOutputStream buf = new ByteArrayOutputStream(buffersize); + while (running) { + try { + Call call = callQueue.take(); // pop the queue; maybe blocked here + + if (LOG.isDebugEnabled()) + LOG.debug(getName() + ": has #" + call.id + " from " + + call.connection); + + String errorClass = null; + String error = null; + Writable value = null; + + CurCall.set(call); + UserGroupInformation previous = UserGroupInformation.getCurrentUGI(); + UserGroupInformation.setCurrentUGI(call.connection.ticket); + try { + value = call(call.param, call.timestamp); // make the call + } catch (Throwable e) { + LOG.info(getName()+", call "+call+": error: " + e, e); + errorClass = e.getClass().getName(); + error = StringUtils.stringifyException(e); + } + UserGroupInformation.setCurrentUGI(previous); + CurCall.set(null); + + if (buf.size() > buffersize) { + // Allocate a new BAOS as reset only moves size back to zero but + // keeps the buffer of whatever the largest write was -- see + // hbase-900. + buf = new ByteArrayOutputStream(buffersize); + } else { + buf.reset(); + } + DataOutputStream out = new DataOutputStream(buf); + out.writeInt(call.id); // write call id + out.writeBoolean(error != null); // write error flag + + if (error == null) { + value.write(out); + } else { + WritableUtils.writeString(out, errorClass); + WritableUtils.writeString(out, error); + } + call.setResponse(ByteBuffer.wrap(buf.toByteArray())); + responder.doRespond(call); + } catch (InterruptedException e) { + if (running) { // unexpected -- log it + LOG.info(getName() + " caught: " + + StringUtils.stringifyException(e)); + } + } catch (Exception e) { + LOG.info(getName() + " caught: " + + StringUtils.stringifyException(e)); + } + } + LOG.info(getName() + ": exiting"); + } + + } + + protected HBaseServer(String bindAddress, int port, + Class paramClass, int handlerCount, + Configuration conf) + throws IOException + { + this(bindAddress, port, paramClass, handlerCount, conf, Integer.toString(port)); + } + /** Constructs a server listening on the named port and address. Parameters passed must + * be of the named class. The handlerCount determines + * the number of handler threads that will be used to process calls. + * + */ + protected HBaseServer(String bindAddress, int port, + Class paramClass, int handlerCount, + Configuration conf, String serverName) + throws IOException { + this.bindAddress = bindAddress; + this.conf = conf; + this.port = port; + this.paramClass = paramClass; + this.handlerCount = handlerCount; + this.socketSendBufferSize = 0; + this.maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER; + this.callQueue = new LinkedBlockingQueue(maxQueueSize); + this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000); + this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10); + this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000); + + // Start the listener here and let it bind to the port + listener = new Listener(); + this.port = listener.getAddress().getPort(); + this.rpcMetrics = new HBaseRpcMetrics(serverName, + Integer.toString(this.port), this); + this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false); + + + // Create the responder here + responder = new Responder(); + } + + private void closeConnection(Connection connection) { + synchronized (connectionList) { + if (connectionList.remove(connection)) + numConnections--; + } + try { + connection.close(); + } catch (IOException e) { + } + } + + /** Sets the socket buffer size used for responding to RPCs */ + public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; } + + /** Starts the service. Must be called before any calls will be handled. */ + public synchronized void start() throws IOException { + responder.start(); + listener.start(); + handlers = new Handler[handlerCount]; + + for (int i = 0; i < handlerCount; i++) { + handlers[i] = new Handler(i); + handlers[i].start(); + } + } + + /** Stops the service. No new calls will be handled after this is called. */ + public synchronized void stop() { + LOG.info("Stopping server on " + port); + running = false; + if (handlers != null) { + for (int i = 0; i < handlerCount; i++) { + if (handlers[i] != null) { + handlers[i].interrupt(); + } + } + } + listener.interrupt(); + listener.doStop(); + responder.interrupt(); + notifyAll(); + if (this.rpcMetrics != null) { + this.rpcMetrics.shutdown(); + } + } + + /** Wait for the server to be stopped. + * Does not wait for all subthreads to finish. + * See {@link #stop()}. + */ + public synchronized void join() throws InterruptedException { + while (running) { + wait(); + } + } + + /** + * Return the socket (ip+port) on which the RPC server is listening to. + * @return the socket (ip+port) on which the RPC server is listening to. + */ + public synchronized InetSocketAddress getListenerAddress() { + return listener.getAddress(); + } + + /** Called for each call. */ + public abstract Writable call(Writable param, long receiveTime) + throws IOException; + + + /** + * The number of open RPC conections + * @return the number of open rpc connections + */ + public int getNumOpenConnections() { + return numConnections; + } + + /** + * The number of rpc calls in the queue. + * @return The number of rpc calls in the queue. + */ + public int getCallQueueLen() { + return callQueue.size(); + } + +} diff --git a/src/java/org/apache/hadoop/hbase/ipc/package.html b/src/java/org/apache/hadoop/hbase/ipc/package.html new file mode 100644 index 00000000000..0e01bdcb861 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/ipc/package.html @@ -0,0 +1,24 @@ + + + + + +Tools to help define network clients and servers. +This is the hadoop copied local so can fix bugs and make hbase-specific optimizations. + + diff --git a/src/java/org/apache/hadoop/hbase/master/HMaster.java b/src/java/org/apache/hadoop/hbase/master/HMaster.java index f855b2f2348..ffcddac88a1 100644 --- a/src/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/src/java/org/apache/hadoop/hbase/master/HMaster.java @@ -60,10 +60,11 @@ import org.apache.hadoop.hbase.io.Cell; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion; +import org.apache.hadoop.hbase.ipc.HBaseServer; import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; import org.apache.hadoop.hbase.ipc.HRegionInterface; -import org.apache.hadoop.hbase.ipc.HbaseRPC; +import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.master.metrics.MasterMetrics; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Bytes; @@ -76,7 +77,6 @@ import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.ipc.Server; /** * HMaster is the "master server" for a HBase. @@ -91,8 +91,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, static final Log LOG = LogFactory.getLog(HMaster.class.getName()); - public long getProtocolVersion(@SuppressWarnings("unused") String protocol, - @SuppressWarnings("unused") long clientVersion) { + public long getProtocolVersion(String protocol, long clientVersion) { return HBaseRPCProtocolVersion.versionID; } @@ -117,7 +116,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, volatile BlockingQueue toDoQueue = new LinkedBlockingQueue(); - private final Server server; + private final HBaseServer server; private final HServerAddress address; final ServerConnection connection; @@ -222,7 +221,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, conf.getLong("hbase.hbasemaster.maxregionopen", 120 * 1000); this.leaseTimeout = conf.getInt("hbase.master.lease.period", 120 * 1000); - this.server = HbaseRPC.getServer(this, address.getBindAddress(), + this.server = HBaseRPC.getServer(this, address.getBindAddress(), address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), false, conf); @@ -530,13 +529,11 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, /* * HMasterRegionInterface */ - - @SuppressWarnings("unused") public MapWritable regionServerStartup(HServerInfo serverInfo) throws IOException { // Set the address for now even tho it will not be persisted on // the HRS side. - String rsAddress = Server.getRemoteAddress(); + String rsAddress = HBaseServer.getRemoteAddress(); serverInfo.setServerAddress(new HServerAddress (rsAddress, serverInfo.getServerAddress().getPort())); // register with server manager @@ -552,7 +549,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, protected MapWritable createConfigurationSubset() { MapWritable mw = addConfig(new MapWritable(), HConstants.HBASE_DIR); // Get the real address of the HRS. - String rsAddress = Server.getRemoteAddress(); + String rsAddress = HBaseServer.getRemoteAddress(); if (rsAddress != null) { mw.put(new Text("hbase.regionserver.address"), new Text(rsAddress)); } diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index a141e04aaa5..173fd291074 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -72,8 +72,8 @@ import org.apache.hadoop.hbase.LocalHBaseCluster; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionHistorian; import org.apache.hadoop.hbase.RemoteExceptionHandler; -import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.UnknownRowLockException; +import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.ValueOverMaxLengthException; import org.apache.hadoop.hbase.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.client.ServerConnection; @@ -84,10 +84,11 @@ import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.Cell; import org.apache.hadoop.hbase.io.HbaseMapWritable; import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion; +import org.apache.hadoop.hbase.ipc.HBaseServer; import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; import org.apache.hadoop.hbase.ipc.HRegionInterface; -import org.apache.hadoop.hbase.ipc.HbaseRPC; import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -96,7 +97,6 @@ import org.apache.hadoop.hbase.util.Sleeper; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.ipc.Server; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.StringUtils; @@ -154,7 +154,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { // Server to handle client requests. Default access so can be accessed by // unit tests. - final Server server; + final HBaseServer server; // Leases private final Leases leases; @@ -258,7 +258,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { this.workerThread = new Thread(worker); // Server to handle client requests - this.server = HbaseRPC.getServer(this, address.getBindAddress(), + this.server = HBaseRPC.getServer(this, address.getBindAddress(), address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), false, conf); // Address is givin a default IP for the moment. Will be changed after @@ -518,7 +518,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { serverInfo.getServerAddress().toString()); } if (this.hbaseMaster != null) { - HbaseRPC.stopProxy(this.hbaseMaster); + HBaseRPC.stopProxy(this.hbaseMaster); this.hbaseMaster = null; } join(); @@ -959,7 +959,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { try { // Do initial RPC setup. The final argument indicates that the RPC // should retry indefinitely. - master = (HMasterRegionInterface)HbaseRPC.waitForProxy( + master = (HMasterRegionInterface)HBaseRPC.waitForProxy( HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID, new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(), this.conf, -1); diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HStore.java b/src/java/org/apache/hadoop/hbase/regionserver/HStore.java index 37f54fcb948..ed3d37c9c0d 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -823,7 +823,7 @@ public class HStore implements HConstants { List filesToCompact = null; synchronized (storefiles) { if (this.storefiles.size() <= 0) { - LOG.debug("no store files to compact"); + LOG.debug(this.storeNameStr + ": no store files to compact"); return null; } // filesToCompact are sorted oldest to newest. diff --git a/src/java/org/apache/hadoop/ipc/HBaseClient.java b/src/java/org/apache/hadoop/ipc/HBaseClient.java deleted file mode 100644 index bad47fbb6a3..00000000000 --- a/src/java/org/apache/hadoop/ipc/HBaseClient.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ipc; - -import javax.net.SocketFactory; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Writable; - -/** - * Subclass of hadoop's Client just so we can make some methods accessible - * in {@link org.apache.hadoop.hbase.ipc.HbaseRPC} - */ -public class HBaseClient extends Client { - /** - * @param valueClass - * @param conf - * @param factory - */ - public HBaseClient(Class valueClass, Configuration conf, - SocketFactory factory) { - super(valueClass, conf, factory); - } - - @Override - public void incCount() { - super.incCount(); - } - - @Override - public void decCount() { - super.decCount(); - } - - @Override - public boolean isZeroReference() { - return super.isZeroReference(); - } - - @Override - public SocketFactory getSocketFactory() { - return super.getSocketFactory(); - } -} \ No newline at end of file