HBASE-6550 Refactoring ReplicationSink to make it more responsive of cluster health (Himanshu Vashishtha)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1379227 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
larsh 2012-08-30 23:37:23 +00:00
parent 6dce5b8059
commit 849e17fc1a
2 changed files with 54 additions and 7 deletions

View File

@ -128,6 +128,7 @@ public class Replication implements WALActionsListener,
public void join() {
if (this.replication) {
this.replicationManager.join();
this.replicationSink.stopReplicationSinkServices();
}
}

View File

@ -23,11 +23,16 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@ -41,6 +46,10 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* This class is responsible for replicating the edits coming
@ -63,8 +72,8 @@ public class ReplicationSink {
// Name of the HDFS directory that contains the temporary rep logs
public static final String REPLICATION_LOG_DIR = ".replogs";
private final Configuration conf;
// Pool used to replicated
private final HTablePool pool;
private final ExecutorService sharedThreadPool;
private final HConnection sharedHtableCon;
private final ReplicationSinkMetrics metrics;
/**
@ -76,12 +85,28 @@ public class ReplicationSink {
*/
public ReplicationSink(Configuration conf, Stoppable stopper)
throws IOException {
this.conf = conf;
this.pool = new HTablePool(this.conf,
conf.getInt("replication.sink.htablepool.capacity", 10));
this.conf = HBaseConfiguration.create(conf);
decorateConf();
this.metrics = new ReplicationSinkMetrics();
this.sharedHtableCon = HConnectionManager.createConnection(this.conf);
this.sharedThreadPool = new ThreadPoolExecutor(1,
conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE),
conf.getLong("hbase.htable.threads.keepalivetime", 60), TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new DaemonThreadFactory("hbase-repl"));
((ThreadPoolExecutor) this.sharedThreadPool).allowCoreThreadTimeOut(true);
}
/**
* decorate the Configuration object to make replication more receptive to delays:
* lessen the timeout and numTries.
*/
private void decorateConf() {
this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
this.conf.getInt("replication.sink.client.retries.number", 1));
this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
this.conf.getInt("replication.sink.client.ops.timeout", 20));
}
/**
* Replicate this array of entries directly into the local cluster
* using the native client.
@ -160,6 +185,27 @@ public class ReplicationSink {
return values;
}
/**
* stop the thread pool executor. It is called when the regionserver is stopped.
*/
public void stopReplicationSinkServices() {
try {
this.sharedThreadPool.shutdown();
if (!this.sharedThreadPool.awaitTermination(60000, TimeUnit.MILLISECONDS)) {
this.sharedThreadPool.shutdownNow();
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while closing the table pool", e); // ignoring it as we are closing.
Thread.currentThread().interrupt();
}
try {
this.sharedHtableCon.close();
} catch (IOException e) {
LOG.warn("IOException while closing the connection", e); // ignoring as we are closing.
}
}
/**
* Do the changes and handle the pool
* @param tableName table to insert into
@ -172,7 +218,7 @@ public class ReplicationSink {
}
HTableInterface table = null;
try {
table = this.pool.getTable(tableName);
table = new HTable(tableName, this.sharedHtableCon, this.sharedThreadPool);
table.batch(rows);
} catch (InterruptedException ix) {
throw new IOException(ix);