HBASE-24578 [WAL] Add a parameter to config RingBufferEventHandler's SyncFuture count (#1923)

Signed-off-by: Reid Chan <reidchan@apache.org>
This commit is contained in:
WenFeiYi 2020-07-06 11:24:09 +08:00 committed by GitHub
parent fe2ae809d1
commit 44691ce23c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 20 additions and 11 deletions

View File

@ -105,6 +105,16 @@ public class FSHLog extends AbstractFSWAL<Writer> {
// We use ring buffer sequence as txid of FSWALEntry and SyncFuture.
private static final Logger LOG = LoggerFactory.getLogger(FSHLog.class);
private static final String TOLERABLE_LOW_REPLICATION = "hbase.regionserver.hlog.tolerable.lowreplication";
private static final String LOW_REPLICATION_ROLL_LIMIT = "hbase.regionserver.hlog.lowreplication.rolllimit";
private static final int DEFAULT_LOW_REPLICATION_ROLL_LIMIT = 5;
private static final String ROLL_ERRORS_TOLERATED = "hbase.regionserver.logroll.errors.tolerated";
private static final int DEFAULT_ROLL_ERRORS_TOLERATED = 2;
private static final String SYNCER_COUNT = "hbase.regionserver.hlog.syncer.count";
private static final int DEFAULT_SYNCER_COUNT = 5;
private static final String MAX_BATCH_COUNT = "hbase.regionserver.wal.sync.batch.count";
private static final int DEFAULT_MAX_BATCH_COUNT = 200;
/**
* The nexus at which all incoming handlers meet. Does appends and sync with an ordering. Appends
* and syncs are each put on the ring which means handlers need to smash up against the ring twice
@ -210,12 +220,10 @@ public class FSHLog extends AbstractFSWAL<Writer> {
final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
final boolean failIfWALExists, final String prefix, final String suffix) throws IOException {
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
this.minTolerableReplication = conf.getInt("hbase.regionserver.hlog.tolerable.lowreplication",
this.minTolerableReplication = conf.getInt(TOLERABLE_LOW_REPLICATION,
CommonFSUtils.getDefaultReplication(fs, this.walDir));
this.lowReplicationRollLimit = conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit",
5);
this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 2);
this.lowReplicationRollLimit = conf.getInt(LOW_REPLICATION_ROLL_LIMIT, DEFAULT_LOW_REPLICATION_ROLL_LIMIT);
this.closeErrorsTolerated = conf.getInt(ROLL_ERRORS_TOLERATED, DEFAULT_ROLL_ERRORS_TOLERATED);
// This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is
// put on the ring buffer.
String hostingThreadName = Thread.currentThread().getName();
@ -228,9 +236,10 @@ public class FSHLog extends AbstractFSWAL<Writer> {
// Advance the ring buffer sequence so that it starts from 1 instead of 0,
// because SyncFuture.NOT_DONE = 0.
this.disruptor.getRingBuffer().next();
int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200);
this.ringBufferEventHandler = new RingBufferEventHandler(
conf.getInt("hbase.regionserver.hlog.syncer.count", 5), maxHandlersCount);
int syncerCount = conf.getInt(SYNCER_COUNT, DEFAULT_SYNCER_COUNT);
int maxBatchCount = conf.getInt(MAX_BATCH_COUNT,
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, DEFAULT_MAX_BATCH_COUNT));
this.ringBufferEventHandler = new RingBufferEventHandler(syncerCount, maxBatchCount);
this.disruptor.setDefaultExceptionHandler(new RingBufferExceptionHandler());
this.disruptor.handleEventsWith(new RingBufferEventHandler[] { this.ringBufferEventHandler });
// Starting up threads in constructor is a no no; Interface should have an init call.
@ -922,11 +931,11 @@ public class FSHLog extends AbstractFSWAL<Writer> {
*/
private int syncRunnerIndex;
RingBufferEventHandler(final int syncRunnerCount, final int maxHandlersCount) {
this.syncFutures = new SyncFuture[maxHandlersCount];
RingBufferEventHandler(final int syncRunnerCount, final int maxBatchCount) {
this.syncFutures = new SyncFuture[maxBatchCount];
this.syncRunners = new SyncRunner[syncRunnerCount];
for (int i = 0; i < syncRunnerCount; i++) {
this.syncRunners[i] = new SyncRunner("sync." + i, maxHandlersCount);
this.syncRunners[i] = new SyncRunner("sync." + i, maxBatchCount);
}
}