HBASE-24578 [WAL] Add a parameter to config RingBufferEventHandler's SyncFuture count
Signed-off-by: Reid Chan <reidchan@apache.org>
This commit is contained in:
parent
a3567037c6
commit
e1d1304cb1
|
@ -105,6 +105,16 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
// We use ring buffer sequence as txid of FSWALEntry and SyncFuture.
|
||||
private static final Logger LOG = LoggerFactory.getLogger(FSHLog.class);
|
||||
|
||||
private static final String TOLERABLE_LOW_REPLICATION = "hbase.regionserver.hlog.tolerable.lowreplication";
|
||||
private static final String LOW_REPLICATION_ROLL_LIMIT = "hbase.regionserver.hlog.lowreplication.rolllimit";
|
||||
private static final int DEFAULT_LOW_REPLICATION_ROLL_LIMIT = 5;
|
||||
private static final String ROLL_ERRORS_TOLERATED = "hbase.regionserver.logroll.errors.tolerated";
|
||||
private static final int DEFAULT_ROLL_ERRORS_TOLERATED = 2;
|
||||
private static final String SYNCER_COUNT = "hbase.regionserver.hlog.syncer.count";
|
||||
private static final int DEFAULT_SYNCER_COUNT = 5;
|
||||
private static final String MAX_BATCH_COUNT = "hbase.regionserver.wal.sync.batch.count";
|
||||
private static final int DEFAULT_MAX_BATCH_COUNT = 200;
|
||||
|
||||
/**
|
||||
* The nexus at which all incoming handlers meet. Does appends and sync with an ordering. Appends
|
||||
* and syncs are each put on the ring which means handlers need to smash up against the ring twice
|
||||
|
@ -210,11 +220,10 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
|
||||
final boolean failIfWALExists, final String prefix, final String suffix) throws IOException {
|
||||
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
|
||||
this.minTolerableReplication = conf.getInt("hbase.regionserver.hlog.tolerable.lowreplication",
|
||||
this.minTolerableReplication = conf.getInt(TOLERABLE_LOW_REPLICATION,
|
||||
CommonFSUtils.getDefaultReplication(fs, this.walDir));
|
||||
this.lowReplicationRollLimit = conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit",
|
||||
5);
|
||||
this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 2);
|
||||
this.lowReplicationRollLimit = conf.getInt(LOW_REPLICATION_ROLL_LIMIT, DEFAULT_LOW_REPLICATION_ROLL_LIMIT);
|
||||
this.closeErrorsTolerated = conf.getInt(ROLL_ERRORS_TOLERATED, DEFAULT_ROLL_ERRORS_TOLERATED);
|
||||
|
||||
// This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is
|
||||
// put on the ring buffer.
|
||||
|
@ -228,9 +237,10 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
// Advance the ring buffer sequence so that it starts from 1 instead of 0,
|
||||
// because SyncFuture.NOT_DONE = 0.
|
||||
this.disruptor.getRingBuffer().next();
|
||||
int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200);
|
||||
this.ringBufferEventHandler = new RingBufferEventHandler(
|
||||
conf.getInt("hbase.regionserver.hlog.syncer.count", 5), maxHandlersCount);
|
||||
int syncerCount = conf.getInt(SYNCER_COUNT, DEFAULT_SYNCER_COUNT);
|
||||
int maxBatchCount = conf.getInt(MAX_BATCH_COUNT,
|
||||
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, DEFAULT_MAX_BATCH_COUNT));
|
||||
this.ringBufferEventHandler = new RingBufferEventHandler(syncerCount, maxBatchCount);
|
||||
this.disruptor.setDefaultExceptionHandler(new RingBufferExceptionHandler());
|
||||
this.disruptor.handleEventsWith(new RingBufferEventHandler[] { this.ringBufferEventHandler });
|
||||
// Starting up threads in constructor is a no no; Interface should have an init call.
|
||||
|
@ -918,11 +928,11 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
*/
|
||||
private int syncRunnerIndex;
|
||||
|
||||
RingBufferEventHandler(final int syncRunnerCount, final int maxHandlersCount) {
|
||||
this.syncFutures = new SyncFuture[maxHandlersCount];
|
||||
RingBufferEventHandler(final int syncRunnerCount, final int maxBatchCount) {
|
||||
this.syncFutures = new SyncFuture[maxBatchCount];
|
||||
this.syncRunners = new SyncRunner[syncRunnerCount];
|
||||
for (int i = 0; i < syncRunnerCount; i++) {
|
||||
this.syncRunners[i] = new SyncRunner("sync." + i, maxHandlersCount);
|
||||
this.syncRunners[i] = new SyncRunner("sync." + i, maxBatchCount);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue