From 44691ce23c735310538616c19b052d3793404fbc Mon Sep 17 00:00:00 2001 From: WenFeiYi Date: Mon, 6 Jul 2020 11:24:09 +0800 Subject: [PATCH] HBASE-24578 [WAL] Add a parameter to config RingBufferEventHandler's SyncFuture count (#1923) Signed-off-by: Reid Chan --- .../hadoop/hbase/regionserver/wal/FSHLog.java | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 5040a226b9e..9a4887da4ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -105,6 +105,16 @@ public class FSHLog extends AbstractFSWAL { // We use ring buffer sequence as txid of FSWALEntry and SyncFuture. private static final Logger LOG = LoggerFactory.getLogger(FSHLog.class); + private static final String TOLERABLE_LOW_REPLICATION = "hbase.regionserver.hlog.tolerable.lowreplication"; + private static final String LOW_REPLICATION_ROLL_LIMIT = "hbase.regionserver.hlog.lowreplication.rolllimit"; + private static final int DEFAULT_LOW_REPLICATION_ROLL_LIMIT = 5; + private static final String ROLL_ERRORS_TOLERATED = "hbase.regionserver.logroll.errors.tolerated"; + private static final int DEFAULT_ROLL_ERRORS_TOLERATED = 2; + private static final String SYNCER_COUNT = "hbase.regionserver.hlog.syncer.count"; + private static final int DEFAULT_SYNCER_COUNT = 5; + private static final String MAX_BATCH_COUNT = "hbase.regionserver.wal.sync.batch.count"; + private static final int DEFAULT_MAX_BATCH_COUNT = 200; + /** * The nexus at which all incoming handlers meet. Does appends and sync with an ordering. Appends * and syncs are each put on the ring which means handlers need to smash up against the ring twice @@ -210,12 +220,10 @@ public class FSHLog extends AbstractFSWAL { final String archiveDir, final Configuration conf, final List listeners, final boolean failIfWALExists, final String prefix, final String suffix) throws IOException { super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); - this.minTolerableReplication = conf.getInt("hbase.regionserver.hlog.tolerable.lowreplication", + this.minTolerableReplication = conf.getInt(TOLERABLE_LOW_REPLICATION, CommonFSUtils.getDefaultReplication(fs, this.walDir)); - this.lowReplicationRollLimit = conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit", - 5); - this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 2); - + this.lowReplicationRollLimit = conf.getInt(LOW_REPLICATION_ROLL_LIMIT, DEFAULT_LOW_REPLICATION_ROLL_LIMIT); + this.closeErrorsTolerated = conf.getInt(ROLL_ERRORS_TOLERATED, DEFAULT_ROLL_ERRORS_TOLERATED); // This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is // put on the ring buffer. String hostingThreadName = Thread.currentThread().getName(); @@ -228,9 +236,10 @@ public class FSHLog extends AbstractFSWAL { // Advance the ring buffer sequence so that it starts from 1 instead of 0, // because SyncFuture.NOT_DONE = 0. this.disruptor.getRingBuffer().next(); - int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200); - this.ringBufferEventHandler = new RingBufferEventHandler( - conf.getInt("hbase.regionserver.hlog.syncer.count", 5), maxHandlersCount); + int syncerCount = conf.getInt(SYNCER_COUNT, DEFAULT_SYNCER_COUNT); + int maxBatchCount = conf.getInt(MAX_BATCH_COUNT, + conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, DEFAULT_MAX_BATCH_COUNT)); + this.ringBufferEventHandler = new RingBufferEventHandler(syncerCount, maxBatchCount); this.disruptor.setDefaultExceptionHandler(new RingBufferExceptionHandler()); this.disruptor.handleEventsWith(new RingBufferEventHandler[] { this.ringBufferEventHandler }); // Starting up threads in constructor is a no no; Interface should have an init call. @@ -922,11 +931,11 @@ public class FSHLog extends AbstractFSWAL { */ private int syncRunnerIndex; - RingBufferEventHandler(final int syncRunnerCount, final int maxHandlersCount) { - this.syncFutures = new SyncFuture[maxHandlersCount]; + RingBufferEventHandler(final int syncRunnerCount, final int maxBatchCount) { + this.syncFutures = new SyncFuture[maxBatchCount]; this.syncRunners = new SyncRunner[syncRunnerCount]; for (int i = 0; i < syncRunnerCount; i++) { - this.syncRunners[i] = new SyncRunner("sync." + i, maxHandlersCount); + this.syncRunners[i] = new SyncRunner("sync." + i, maxBatchCount); } }