HBASE-900 Regionserver memory leak causing OOME during relatively modest bulk importing

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@724231 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2008-12-08 01:43:45 +00:00
parent 9f5fa6ac6f
commit db520ca53c
12 changed files with 2067 additions and 96 deletions

View File

@ -101,6 +101,8 @@ Release 0.19.0 - Unreleased
hard code memory sizes
HBASE-1000 Sleeper.sleep does not go back to sleep when interrupted
and no stop flag given.
HBASE-900 Regionserver memory leak causing OOME during relatively
modest bulk importing; part 1
IMPROVEMENTS
HBASE-901 Add a limit to key length, check key and value length on client side

View File

@ -50,7 +50,7 @@ import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.ipc.HbaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.SoftValueSortedMap;
import org.apache.hadoop.hbase.util.Writables;
@ -192,7 +192,7 @@ public class HConnectionManager implements HConstants {
masterLocation = new HServerAddress(this.conf.get(MASTER_ADDRESS,
DEFAULT_MASTER_ADDRESS));
try {
HMasterInterface tryMaster = (HMasterInterface)HbaseRPC.getProxy(
HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy(
HMasterInterface.class, HBaseRPCProtocolVersion.versionID,
masterLocation.getInetSocketAddress(), this.conf);
@ -732,7 +732,7 @@ public class HConnectionManager implements HConstants {
server = this.servers.get(regionServer.toString());
if (server == null) { // Get a connection
try {
server = (HRegionInterface)HbaseRPC.waitForProxy(
server = (HRegionInterface)HBaseRPC.waitForProxy(
serverInterfaceClass, HBaseRPCProtocolVersion.versionID,
regionServer.getInetSocketAddress(), this.conf,
this.maxRPCAttempts);
@ -954,7 +954,7 @@ public class HConnectionManager implements HConstants {
void close(boolean stopProxy) {
if (master != null) {
if (stopProxy) {
HbaseRPC.stopProxy(master);
HBaseRPC.stopProxy(master);
}
master = null;
masterChecked = false;
@ -962,7 +962,7 @@ public class HConnectionManager implements HConstants {
if (stopProxy) {
synchronized (servers) {
for (HRegionInterface i: servers.values()) {
HbaseRPC.stopProxy(i);
HBaseRPC.stopProxy(i);
}
}
}

View File

@ -0,0 +1,844 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.net.Socket;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.net.ConnectException;
import java.io.IOException;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.FilterInputStream;
import java.io.InputStream;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
* a port and is defined by a parameter class and a value class.
*
* <p>This is the org.apache.hadoop.ipc.Client renamed as HBaseClient and
* moved into this package so can access package-private methods.
*
* @see HBaseServer
*/
public class HBaseClient {
public static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.HBaseClass");
private Hashtable<ConnectionId, Connection> connections =
new Hashtable<ConnectionId, Connection>();
private Class<? extends Writable> valueClass; // class of call values
private int counter; // counter for call ids
private AtomicBoolean running = new AtomicBoolean(true); // if client runs
final private Configuration conf;
final private int maxIdleTime; //connections will be culled if it was idle for
//maxIdleTime msecs
final private int maxRetries; //the max. no. of retries for socket connections
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
private int pingInterval; // how often sends ping to the server in msecs
private SocketFactory socketFactory; // how to create sockets
private int refCount = 1;
final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
final static int DEFAULT_PING_INTERVAL = 60000; // 1 min
final static int PING_CALL_ID = -1;
/**
* set the ping interval value in configuration
*
* @param conf Configuration
* @param pingInterval the ping interval
*/
final public static void setPingInterval(Configuration conf, int pingInterval) {
conf.setInt(PING_INTERVAL_NAME, pingInterval);
}
/**
* Get the ping interval from configuration;
* If not set in the configuration, return the default value.
*
* @param conf Configuration
* @return the ping interval
*/
final static int getPingInterval(Configuration conf) {
return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL);
}
/**
* Increment this client's reference count
*
*/
synchronized void incCount() {
refCount++;
}
/**
* Decrement this client's reference count
*
*/
synchronized void decCount() {
refCount--;
}
/**
* Return if this client has no reference
*
* @return true if this client has no reference; false otherwise
*/
synchronized boolean isZeroReference() {
return refCount==0;
}
/** A call waiting for a value. */
private class Call {
int id; // call id
Writable param; // parameter
Writable value; // value, null if error
IOException error; // exception, null if value
boolean done; // true when call is done
protected Call(Writable param) {
this.param = param;
synchronized (HBaseClient.this) {
this.id = counter++;
}
}
/** Indicate when the call is complete and the
* value or error are available. Notifies by default. */
protected synchronized void callComplete() {
this.done = true;
notify(); // notify caller
}
/** Set the exception when there is an error.
* Notify the caller the call is done.
*
* @param error exception thrown by the call; either local or remote
*/
public synchronized void setException(IOException error) {
this.error = error;
callComplete();
}
/** Set the return value when there is no error.
* Notify the caller the call is done.
*
* @param value return value of the call.
*/
public synchronized void setValue(Writable value) {
this.value = value;
callComplete();
}
}
/** Thread that reads responses and notifies callers. Each connection owns a
* socket connected to a remote address. Calls are multiplexed through this
* socket: responses may be delivered out of order. */
private class Connection extends Thread {
private ConnectionId remoteId;
private Socket socket = null; // connected socket
private DataInputStream in;
private DataOutputStream out;
// currently active calls
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
private AtomicLong lastActivity = new AtomicLong();// last I/O activity time
private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed
private IOException closeException; // close reason
public Connection(InetSocketAddress address) throws IOException {
this(new ConnectionId(address, null));
}
public Connection(ConnectionId remoteId) throws IOException {
if (remoteId.getAddress().isUnresolved()) {
throw new UnknownHostException("unknown host: " +
remoteId.getAddress().getHostName());
}
this.remoteId = remoteId;
UserGroupInformation ticket = remoteId.getTicket();
this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
remoteId.getAddress().toString() +
" from " + ((ticket==null)?"an unknown user":ticket.getUserName()));
this.setDaemon(true);
}
/** Update lastActivity with the current time. */
private void touch() {
lastActivity.set(System.currentTimeMillis());
}
/**
* Add a call to this connection's call queue and notify
* a listener; synchronized.
* Returns false if called during shutdown.
* @param call to add
* @return true if the call was added.
*/
private synchronized boolean addCall(Call call) {
if (shouldCloseConnection.get())
return false;
calls.put(call.id, call);
notify();
return true;
}
/** This class sends a ping to the remote side when timeout on
* reading. If no failure is detected, it retries until at least
* a byte is read.
*/
private class PingInputStream extends FilterInputStream {
/* constructor */
protected PingInputStream(InputStream in) {
super(in);
}
/* Process timeout exception
* if the connection is not going to be closed, send a ping.
* otherwise, throw the timeout exception.
*/
private void handleTimeout(SocketTimeoutException e) throws IOException {
if (shouldCloseConnection.get() || !running.get()) {
throw e;
} else {
sendPing();
}
}
/** Read a byte from the stream.
* Send a ping if timeout on read. Retries if no failure is detected
* until a byte is read.
* @throws IOException for any IO problem other than socket timeout
*/
public int read() throws IOException {
do {
try {
return super.read();
} catch (SocketTimeoutException e) {
handleTimeout(e);
}
} while (true);
}
/** Read bytes into a buffer starting from offset <code>off</code>
* Send a ping if timeout on read. Retries if no failure is detected
* until a byte is read.
*
* @return the total number of bytes read; -1 if the connection is closed.
*/
public int read(byte[] buf, int off, int len) throws IOException {
do {
try {
return super.read(buf, off, len);
} catch (SocketTimeoutException e) {
handleTimeout(e);
}
} while (true);
}
}
/** Connect to the server and set up the I/O streams. It then sends
* a header to the server and starts
* the connection thread that waits for responses.
*/
private synchronized void setupIOstreams() {
if (socket != null || shouldCloseConnection.get()) {
return;
}
short ioFailures = 0;
short timeoutFailures = 0;
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to "+remoteId.getAddress());
}
while (true) {
try {
this.socket = socketFactory.createSocket();
this.socket.setTcpNoDelay(tcpNoDelay);
// connection time out is 20s
this.socket.connect(remoteId.getAddress(), 20000);
this.socket.setSoTimeout(pingInterval);
break;
} catch (SocketTimeoutException toe) {
/* The max number of retries is 45,
* which amounts to 20s*45 = 15 minutes retries.
*/
handleConnectionFailure(timeoutFailures++, 45, toe);
} catch (IOException ie) {
handleConnectionFailure(ioFailures++, maxRetries, ie);
}
}
this.in = new DataInputStream(new BufferedInputStream
(new PingInputStream(NetUtils.getInputStream(socket))));
this.out = new DataOutputStream
(new BufferedOutputStream(NetUtils.getOutputStream(socket)));
writeHeader();
// update last activity time
touch();
// start the receiver thread after the socket connection has been set up
start();
} catch (IOException e) {
markClosed(e);
close();
}
}
/* Handle connection failures
*
* If the current number of retries is equal to the max number of retries,
* stop retrying and throw the exception; Otherwise backoff 1 second and
* try connecting again.
*
* This Method is only called from inside setupIOstreams(), which is
* synchronized. Hence the sleep is synchronized; the locks will be retained.
*
* @param curRetries current number of retries
* @param maxRetries max number of retries allowed
* @param ioe failure reason
* @throws IOException if max number of retries is reached
*/
private void handleConnectionFailure(
int curRetries, int maxRetries, IOException ioe) throws IOException {
// close the current connection
try {
socket.close();
} catch (IOException e) {
LOG.warn("Not able to close a socket", e);
}
// set socket to null so that the next call to setupIOstreams
// can start the process of connect all over again.
socket = null;
// throw the exception if the maximum number of retries is reached
if (curRetries >= maxRetries) {
throw ioe;
}
// otherwise back off and retry
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {}
LOG.info("Retrying connect to server: " + remoteId.getAddress() +
". Already tried " + curRetries + " time(s).");
}
/* Write the header for each connection
* Out is not synchronized because only the first thread does this.
*/
private void writeHeader() throws IOException {
out.write(HBaseServer.HEADER.array());
out.write(HBaseServer.CURRENT_VERSION);
//When there are more fields we can have ConnectionHeader Writable.
DataOutputBuffer buf = new DataOutputBuffer();
ObjectWritable.writeObject(buf, remoteId.getTicket(),
UserGroupInformation.class, conf);
int bufLen = buf.getLength();
out.writeInt(bufLen);
out.write(buf.getData(), 0, bufLen);
}
/* wait till someone signals us to start reading RPC response or
* it is idle too long, it is marked as to be closed,
* or the client is marked as not running.
*
* Return true if it is time to read a response; false otherwise.
*/
private synchronized boolean waitForWork() {
if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
long timeout = maxIdleTime-
(System.currentTimeMillis()-lastActivity.get());
if (timeout>0) {
try {
wait(timeout);
} catch (InterruptedException e) {}
}
}
if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
return true;
} else if (shouldCloseConnection.get()) {
return false;
} else if (calls.isEmpty()) { // idle connection closed or stopped
markClosed(null);
return false;
} else { // get stopped but there are still pending requests
markClosed((IOException)new IOException().initCause(
new InterruptedException()));
return false;
}
}
public InetSocketAddress getRemoteAddress() {
return remoteId.getAddress();
}
/* Send a ping to the server if the time elapsed
* since last I/O activity is equal to or greater than the ping interval
*/
private synchronized void sendPing() throws IOException {
long curTime = System.currentTimeMillis();
if ( curTime - lastActivity.get() >= pingInterval) {
lastActivity.set(curTime);
synchronized (out) {
out.writeInt(PING_CALL_ID);
out.flush();
}
}
}
public void run() {
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": starting, having connections "
+ connections.size());
while (waitForWork()) {//wait here for work - read or close connection
receiveResponse();
}
close();
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": stopped, remaining connections "
+ connections.size());
}
/** Initiates a call by sending the parameter to the remote server.
* Note: this is not called from the Connection thread, but by other
* threads.
*/
public void sendParam(Call call) {
if (shouldCloseConnection.get()) {
return;
}
DataOutputBuffer d=null;
try {
synchronized (this.out) {
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
//for serializing the
//data to be written
d = new DataOutputBuffer();
d.writeInt(call.id);
call.param.write(d);
byte[] data = d.getData();
int dataLength = d.getLength();
out.writeInt(dataLength); //first put the data length
out.write(data, 0, dataLength);//write the data
out.flush();
}
} catch(IOException e) {
markClosed(e);
} finally {
//the buffer is just an in-memory buffer, but it is still polite to
// close early
IOUtils.closeStream(d);
}
}
/* Receive a response.
* Because only one receiver, so no synchronization on in.
*/
private void receiveResponse() {
if (shouldCloseConnection.get()) {
return;
}
touch();
try {
int id = in.readInt(); // try to read an id
if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + id);
Call call = calls.remove(id);
boolean isError = in.readBoolean(); // read if error
if (isError) {
call.setException(new RemoteException( WritableUtils.readString(in),
WritableUtils.readString(in)));
} else {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value
call.setValue(value);
}
} catch (IOException e) {
markClosed(e);
}
}
private synchronized void markClosed(IOException e) {
if (shouldCloseConnection.compareAndSet(false, true)) {
closeException = e;
notifyAll();
}
}
/** Close the connection. */
private synchronized void close() {
if (!shouldCloseConnection.get()) {
LOG.error("The connection is not in the closed state");
return;
}
// release the resources
// first thing to do;take the connection out of the connection list
synchronized (connections) {
if (connections.get(remoteId) == this) {
connections.remove(remoteId);
}
}
// close the streams and therefore the socket
IOUtils.closeStream(out);
IOUtils.closeStream(in);
// clean up all calls
if (closeException == null) {
if (!calls.isEmpty()) {
LOG.warn(
"A connection is closed for no cause and calls are not empty");
// clean up calls anyway
closeException = new IOException("Unexpected closed connection");
cleanupCalls();
}
} else {
// log the info
if (LOG.isDebugEnabled()) {
LOG.debug("closing ipc connection to " + remoteId.address + ": " +
closeException.getMessage(),closeException);
}
// cleanup calls
cleanupCalls();
}
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": closed");
}
/* Cleanup all calls and mark them as done */
private void cleanupCalls() {
Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator() ;
while (itor.hasNext()) {
Call c = itor.next().getValue();
c.setException(closeException); // local exception
itor.remove();
}
}
}
/** Call implementation used for parallel calls. */
private class ParallelCall extends Call {
private ParallelResults results;
private int index;
public ParallelCall(Writable param, ParallelResults results, int index) {
super(param);
this.results = results;
this.index = index;
}
/** Deliver result to result collector. */
protected void callComplete() {
results.callComplete(this);
}
}
/** Result collector for parallel calls. */
private static class ParallelResults {
private Writable[] values;
private int size;
private int count;
public ParallelResults(int size) {
this.values = new Writable[size];
this.size = size;
}
/** Collect a result. */
public synchronized void callComplete(ParallelCall call) {
values[call.index] = call.value; // store the value
count++; // count it
if (count == size) // if all values are in
notify(); // then notify waiting caller
}
}
/** Construct an IPC client whose values are of the given {@link Writable}
* class. */
public HBaseClient(Class<? extends Writable> valueClass, Configuration conf,
SocketFactory factory) {
this.valueClass = valueClass;
this.maxIdleTime =
conf.getInt("ipc.client.connection.maxidletime", 10000); //10s
this.maxRetries = conf.getInt("ipc.client.connect.max.retries", 10);
this.tcpNoDelay = conf.getBoolean("ipc.client.tcpnodelay", false);
this.pingInterval = getPingInterval(conf);
if (LOG.isDebugEnabled()) {
LOG.debug("The ping interval is" + this.pingInterval + "ms.");
}
this.conf = conf;
this.socketFactory = factory;
}
/**
* Construct an IPC client with the default SocketFactory
* @param valueClass
* @param conf
*/
public HBaseClient(Class<? extends Writable> valueClass, Configuration conf) {
this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
}
/** Return the socket factory of this client
*
* @return this client's socket factory
*/
SocketFactory getSocketFactory() {
return socketFactory;
}
/** Stop all threads related to this client. No further calls may be made
* using this client. */
public void stop() {
if (LOG.isDebugEnabled()) {
LOG.debug("Stopping client");
}
if (!running.compareAndSet(true, false)) {
return;
}
// wake up all connections
synchronized (connections) {
for (Connection conn : connections.values()) {
conn.interrupt();
}
}
// wait until all connections are closed
while (!connections.isEmpty()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
}
/** Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code>, returning the value. Throws exceptions if there are
* network problems or if the remote code threw an exception. */
public Writable call(Writable param, InetSocketAddress address)
throws InterruptedException, IOException {
return call(param, address, null);
}
public Writable call(Writable param, InetSocketAddress addr,
UserGroupInformation ticket)
throws InterruptedException, IOException {
Call call = new Call(param);
Connection connection = getConnection(addr, ticket, call);
connection.sendParam(call); // send the parameter
synchronized (call) {
while (!call.done) {
try {
call.wait(); // wait for the result
} catch (InterruptedException ignored) {}
}
if (call.error != null) {
if (call.error instanceof RemoteException) {
call.error.fillInStackTrace();
throw call.error;
} else { // local exception
throw wrapException(addr, call.error);
}
} else {
return call.value;
}
}
}
/**
* Take an IOException and the address we were trying to connect to
* and return an IOException with the input exception as the cause.
* The new exception provides the stack trace of the place where
* the exception is thrown and some extra diagnostics information.
* If the exception is ConnectException or SocketTimeoutException,
* return a new one of the same type; Otherwise return an IOException.
*
* @param addr target address
* @param exception the relevant exception
* @return an exception to throw
*/
private IOException wrapException(InetSocketAddress addr,
IOException exception) {
if (exception instanceof ConnectException) {
//connection refused; include the host:port in the error
return (ConnectException)new ConnectException(
"Call to " + addr + " failed on connection exception: " + exception)
.initCause(exception);
} else if (exception instanceof SocketTimeoutException) {
return (SocketTimeoutException)new SocketTimeoutException(
"Call to " + addr + " failed on socket timeout exception: "
+ exception).initCause(exception);
} else {
return (IOException)new IOException(
"Call to " + addr + " failed on local exception: " + exception)
.initCause(exception);
}
}
/** Makes a set of calls in parallel. Each parameter is sent to the
* corresponding address. When all values are available, or have timed out
* or errored, the collected results are returned in an array. The array
* contains nulls for calls that timed out or errored. */
public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
throws IOException {
if (addresses.length == 0) return new Writable[0];
ParallelResults results = new ParallelResults(params.length);
synchronized (results) {
for (int i = 0; i < params.length; i++) {
ParallelCall call = new ParallelCall(params[i], results, i);
try {
Connection connection = getConnection(addresses[i], null, call);
connection.sendParam(call); // send each parameter
} catch (IOException e) {
// log errors
LOG.info("Calling "+addresses[i]+" caught: " +
e.getMessage(),e);
results.size--; // wait for one fewer result
}
}
while (results.count != results.size) {
try {
results.wait(); // wait for all results
} catch (InterruptedException e) {}
}
return results.values;
}
}
/** Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given host/port are reused. */
private Connection getConnection(InetSocketAddress addr,
UserGroupInformation ticket,
Call call)
throws IOException {
if (!running.get()) {
// the client is stopped
throw new IOException("The client is stopped");
}
Connection connection;
/* we could avoid this allocation for each RPC by having a
* connectionsId object and with set() method. We need to manage the
* refs for keys in HashMap properly. For now its ok.
*/
ConnectionId remoteId = new ConnectionId(addr, ticket);
do {
synchronized (connections) {
connection = connections.get(remoteId);
if (connection == null) {
connection = new Connection(remoteId);
connections.put(remoteId, connection);
}
}
} while (!connection.addCall(call));
//we don't invoke the method below inside "synchronized (connections)"
//block above. The reason for that is if the server happens to be slow,
//it will take longer to establish a connection and that will slow the
//entire system down.
connection.setupIOstreams();
return connection;
}
/**
* This class holds the address and the user ticket. The client connections
* to servers are uniquely identified by <remoteAddress, ticket>
*/
private static class ConnectionId {
InetSocketAddress address;
UserGroupInformation ticket;
ConnectionId(InetSocketAddress address, UserGroupInformation ticket) {
this.address = address;
this.ticket = ticket;
}
InetSocketAddress getAddress() {
return address;
}
UserGroupInformation getTicket() {
return ticket;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof ConnectionId) {
ConnectionId id = (ConnectionId) obj;
return address.equals(id.address) && ticket == id.ticket;
//Note : ticket is a ref comparision.
}
return false;
}
@Override
public int hashCode() {
return address.hashCode() ^ System.identityHashCode(ticket);
}
}
}

View File

@ -44,8 +44,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.HBaseClient;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
import org.apache.hadoop.net.NetUtils;
@ -76,14 +74,14 @@ import org.apache.hadoop.security.UserGroupInformation;
* All methods in the protocol should throw only IOException. No field data of
* the protocol instance is transmitted.
*/
public class HbaseRPC {
public class HBaseRPC {
// Leave this out in the hadoop ipc package but keep class name. Do this
// so that we dont' get the logging of this class's invocations by doing our
// blanket enabling DEBUG on the o.a.h.h. package.
private static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.HbaseRPC");
private HbaseRPC() {
private HBaseRPC() {
super();
} // no public ctor
@ -235,8 +233,8 @@ public class HbaseRPC {
/* Cache a client using its socket factory as the hash key */
static private class ClientCache {
private Map<SocketFactory, Client> clients =
new HashMap<SocketFactory, Client>();
private Map<SocketFactory, HBaseClient> clients =
new HashMap<SocketFactory, HBaseClient>();
/**
* Construct & cache an IPC client with the user-provided SocketFactory
@ -245,14 +243,14 @@ public class HbaseRPC {
* @param conf Configuration
* @return an IPC client
*/
private synchronized Client getClient(Configuration conf,
private synchronized HBaseClient getClient(Configuration conf,
SocketFactory factory) {
// Construct & cache client. The configuration is only used for timeout,
// and Clients have connection pools. So we can either (a) lose some
// connection pooling and leak sockets, or (b) use the same timeout for all
// configurations. Since the IPC is usually intended globally, not
// per-job, we choose (a).
Client client = clients.get(factory);
HBaseClient client = clients.get(factory);
if (client == null) {
// Make an hbase client instead of hadoop Client.
client = new HBaseClient(HbaseObjectWritable.class, conf, factory);
@ -270,7 +268,7 @@ public class HbaseRPC {
* @param conf Configuration
* @return an IPC client
*/
private synchronized Client getClient(Configuration conf) {
private synchronized HBaseClient getClient(Configuration conf) {
return getClient(conf, SocketFactory.getDefault());
}
@ -278,7 +276,7 @@ public class HbaseRPC {
* Stop a RPC client connection
* A RPC client is closed only when its reference count becomes zero.
*/
private void stopClient(Client client) {
private void stopClient(HBaseClient client) {
synchronized (this) {
((HBaseClient)client).decCount();
if (((HBaseClient)client).isZeroReference()) {
@ -296,7 +294,7 @@ public class HbaseRPC {
private static class Invoker implements InvocationHandler {
private InetSocketAddress address;
private UserGroupInformation ticket;
private Client client;
private HBaseClient client;
private boolean isClosed = false;
/**
@ -521,7 +519,7 @@ public class HbaseRPC {
Invocation[] invocations = new Invocation[params.length];
for (int i = 0; i < params.length; i++)
invocations[i] = new Invocation(method, params[i]);
Client client = CLIENTS.getClient(conf);
HBaseClient client = CLIENTS.getClient(conf);
try {
Writable[] wrappedValues = client.call(invocations, addrs);
@ -578,7 +576,7 @@ public class HbaseRPC {
}
/** An RPC Server. */
public static class Server extends org.apache.hadoop.ipc.Server {
public static class Server extends HBaseServer {
private Object instance;
private Class<?> implementation;
private boolean verbose;

View File

@ -63,7 +63,8 @@ public interface HBaseRPCProtocolVersion extends VersionedProtocol {
* <li>Version 11: Changed getClosestRowBefore signature.</li>
* <li>Version 12: HServerLoad extensions (HBASE-1018).</li>
* <li>Version 13: HBASE-847</li>
* <li>Version 14: HBASE-900</li>
* </ul>
*/
public static final long versionID = 13L;
public static final long versionID = 14L;
}

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
/**
*
* This class is for maintaining the various RPC statistics
* and publishing them through the metrics interfaces.
* This also registers the JMX MBean for RPC.
* <p>
* This class has a number of metrics variables that are publicly accessible;
* these variables (objects) have methods to update their values;
* for example:
* <p> {@link #rpcQueueTime}.inc(time)
*
*/
public class HBaseRpcMetrics implements Updater {
private MetricsRecord metricsRecord;
private static Log LOG = LogFactory.getLog(HBaseRpcMetrics.class);
public HBaseRpcMetrics(String hostName, String port, HBaseServer server) {
MetricsContext context = MetricsUtil.getContext("rpc");
metricsRecord = MetricsUtil.createRecord(context, "metrics");
metricsRecord.setTag("port", port);
LOG.info("Initializing RPC Metrics with hostName="
+ hostName + ", port=" + port);
context.registerUpdater(this);
}
/**
* The metrics variables are public:
* - they can be set directly by calling their set/inc methods
* -they can also be read directly - e.g. JMX does this.
*/
public MetricsTimeVaryingRate rpcQueueTime = new MetricsTimeVaryingRate("RpcQueueTime");
public MetricsTimeVaryingRate rpcProcessingTime = new MetricsTimeVaryingRate("RpcProcessingTime");
public Map <String, MetricsTimeVaryingRate> metricsList = Collections.synchronizedMap(new HashMap<String, MetricsTimeVaryingRate>());
/**
* Push the metrics to the monitoring subsystem on doUpdate() call.
*/
public void doUpdates(MetricsContext context) {
rpcQueueTime.pushMetric(metricsRecord);
rpcProcessingTime.pushMetric(metricsRecord);
synchronized (metricsList) {
// Iterate through the rpcMetrics hashmap to propogate the different rpc metrics.
Set keys = metricsList.keySet();
Iterator keyIter = keys.iterator();
while (keyIter.hasNext()) {
Object key = keyIter.next();
MetricsTimeVaryingRate value = metricsList.get(key);
value.pushMetric(metricsRecord);
}
}
metricsRecord.update();
}
public void shutdown() {
// Nothing to do
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,24 @@
<html>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<body>
Tools to help define network clients and servers.
This is the hadoop copied local so can fix bugs and make hbase-specific optimizations.
</body>
</html>

View File

@ -60,10 +60,11 @@ import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.ipc.HbaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
@ -76,7 +77,6 @@ import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
/**
* HMaster is the "master server" for a HBase.
@ -91,8 +91,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
static final Log LOG = LogFactory.getLog(HMaster.class.getName());
public long getProtocolVersion(@SuppressWarnings("unused") String protocol,
@SuppressWarnings("unused") long clientVersion) {
public long getProtocolVersion(String protocol, long clientVersion) {
return HBaseRPCProtocolVersion.versionID;
}
@ -117,7 +116,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
volatile BlockingQueue<RegionServerOperation> toDoQueue =
new LinkedBlockingQueue<RegionServerOperation>();
private final Server server;
private final HBaseServer server;
private final HServerAddress address;
final ServerConnection connection;
@ -222,7 +221,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
conf.getLong("hbase.hbasemaster.maxregionopen", 120 * 1000);
this.leaseTimeout = conf.getInt("hbase.master.lease.period", 120 * 1000);
this.server = HbaseRPC.getServer(this, address.getBindAddress(),
this.server = HBaseRPC.getServer(this, address.getBindAddress(),
address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
false, conf);
@ -530,13 +529,11 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
/*
* HMasterRegionInterface
*/
@SuppressWarnings("unused")
public MapWritable regionServerStartup(HServerInfo serverInfo)
throws IOException {
// Set the address for now even tho it will not be persisted on
// the HRS side.
String rsAddress = Server.getRemoteAddress();
String rsAddress = HBaseServer.getRemoteAddress();
serverInfo.setServerAddress(new HServerAddress
(rsAddress, serverInfo.getServerAddress().getPort()));
// register with server manager
@ -552,7 +549,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
protected MapWritable createConfigurationSubset() {
MapWritable mw = addConfig(new MapWritable(), HConstants.HBASE_DIR);
// Get the real address of the HRS.
String rsAddress = Server.getRemoteAddress();
String rsAddress = HBaseServer.getRemoteAddress();
if (rsAddress != null) {
mw.put(new Text("hbase.regionserver.address"), new Text(rsAddress));
}

View File

@ -72,8 +72,8 @@ import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionHistorian;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.UnknownRowLockException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.ValueOverMaxLengthException;
import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.client.ServerConnection;
@ -84,10 +84,11 @@ import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.HbaseMapWritable;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.ipc.HbaseRPC;
import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@ -96,7 +97,6 @@ import org.apache.hadoop.hbase.util.Sleeper;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
@ -154,7 +154,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// Server to handle client requests. Default access so can be accessed by
// unit tests.
final Server server;
final HBaseServer server;
// Leases
private final Leases leases;
@ -258,7 +258,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
this.workerThread = new Thread(worker);
// Server to handle client requests
this.server = HbaseRPC.getServer(this, address.getBindAddress(),
this.server = HBaseRPC.getServer(this, address.getBindAddress(),
address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
false, conf);
// Address is givin a default IP for the moment. Will be changed after
@ -518,7 +518,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
serverInfo.getServerAddress().toString());
}
if (this.hbaseMaster != null) {
HbaseRPC.stopProxy(this.hbaseMaster);
HBaseRPC.stopProxy(this.hbaseMaster);
this.hbaseMaster = null;
}
join();
@ -959,7 +959,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
try {
// Do initial RPC setup. The final argument indicates that the RPC
// should retry indefinitely.
master = (HMasterRegionInterface)HbaseRPC.waitForProxy(
master = (HMasterRegionInterface)HBaseRPC.waitForProxy(
HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
this.conf, -1);

View File

@ -823,7 +823,7 @@ public class HStore implements HConstants {
List<HStoreFile> filesToCompact = null;
synchronized (storefiles) {
if (this.storefiles.size() <= 0) {
LOG.debug("no store files to compact");
LOG.debug(this.storeNameStr + ": no store files to compact");
return null;
}
// filesToCompact are sorted oldest to newest.

View File

@ -1,59 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import javax.net.SocketFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
/**
* Subclass of hadoop's Client just so we can make some methods accessible
* in {@link org.apache.hadoop.hbase.ipc.HbaseRPC}
*/
public class HBaseClient extends Client {
/**
* @param valueClass
* @param conf
* @param factory
*/
public HBaseClient(Class<? extends Writable> valueClass, Configuration conf,
SocketFactory factory) {
super(valueClass, conf, factory);
}
@Override
public void incCount() {
super.incCount();
}
@Override
public void decCount() {
super.decCount();
}
@Override
public boolean isZeroReference() {
return super.isZeroReference();
}
@Override
public SocketFactory getSocketFactory() {
return super.getSocketFactory();
}
}