diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index e3964dc453d..d5bf20d101e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -128,6 +128,7 @@ public class Replication implements WALActionsListener, public void join() { if (this.replication) { this.replicationManager.join(); + this.replicationSink.stopReplicationSinkServices(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index a359f782a8a..d2a932a8c70 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -23,11 +23,16 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DaemonThreadFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.regionserver.wal.HLog; @@ -41,6 +46,10 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * This class is responsible for replicating the edits coming @@ -63,8 +72,8 @@ public class ReplicationSink { // Name of the HDFS directory that contains the temporary rep logs public static final String REPLICATION_LOG_DIR = ".replogs"; private final Configuration conf; - // Pool used to replicated - private final HTablePool pool; + private final ExecutorService sharedThreadPool; + private final HConnection sharedHtableCon; private final ReplicationSinkMetrics metrics; /** @@ -76,12 +85,28 @@ public class ReplicationSink { */ public ReplicationSink(Configuration conf, Stoppable stopper) throws IOException { - this.conf = conf; - this.pool = new HTablePool(this.conf, - conf.getInt("replication.sink.htablepool.capacity", 10)); + this.conf = HBaseConfiguration.create(conf); + decorateConf(); this.metrics = new ReplicationSinkMetrics(); + this.sharedHtableCon = HConnectionManager.createConnection(this.conf); + this.sharedThreadPool = new ThreadPoolExecutor(1, + conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE), + conf.getLong("hbase.htable.threads.keepalivetime", 60), TimeUnit.SECONDS, + new SynchronousQueue(), new DaemonThreadFactory("hbase-repl")); + ((ThreadPoolExecutor) this.sharedThreadPool).allowCoreThreadTimeOut(true); } + /** + * decorate the Configuration object to make replication more receptive to delays: + * lessen the timeout and numTries. + */ + private void decorateConf() { + this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + this.conf.getInt("replication.sink.client.retries.number", 1)); + this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + this.conf.getInt("replication.sink.client.ops.timeout", 20)); + } + /** * Replicate this array of entries directly into the local cluster * using the native client. @@ -160,6 +185,27 @@ public class ReplicationSink { return values; } + /** + * stop the thread pool executor. It is called when the regionserver is stopped. + */ + public void stopReplicationSinkServices() { + try { + this.sharedThreadPool.shutdown(); + if (!this.sharedThreadPool.awaitTermination(60000, TimeUnit.MILLISECONDS)) { + this.sharedThreadPool.shutdownNow(); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while closing the table pool", e); // ignoring it as we are closing. + Thread.currentThread().interrupt(); + } + try { + this.sharedHtableCon.close(); + } catch (IOException e) { + LOG.warn("IOException while closing the connection", e); // ignoring as we are closing. + } + } + + /** * Do the changes and handle the pool * @param tableName table to insert into @@ -172,7 +218,7 @@ public class ReplicationSink { } HTableInterface table = null; try { - table = this.pool.getTable(tableName); + table = new HTable(tableName, this.sharedHtableCon, this.sharedThreadPool); table.batch(rows); } catch (InterruptedException ix) { throw new IOException(ix);