HBASE-8680: distributedLogReplay performance regression
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1489679 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a4d58c4061
commit
f5ecf2c83c
|
@ -261,7 +261,9 @@ public class RegionStates {
|
||||||
RegionState regionState = new RegionState(
|
RegionState regionState = new RegionState(
|
||||||
hri, state, System.currentTimeMillis(), newServerName);
|
hri, state, System.currentTimeMillis(), newServerName);
|
||||||
RegionState oldState = regionStates.put(regionName, regionState);
|
RegionState oldState = regionStates.put(regionName, regionState);
|
||||||
LOG.info("Region " + hri + " transitioned from " + oldState + " to " + regionState);
|
if (oldState == null || oldState.getState() != regionState.getState()) {
|
||||||
|
LOG.info("Region " + hri + " transitioned from " + oldState + " to " + regionState);
|
||||||
|
}
|
||||||
if (state != State.SPLITTING && (newServerName != null
|
if (state != State.SPLITTING && (newServerName != null
|
||||||
|| (state != State.PENDING_CLOSE && state != State.CLOSING))) {
|
|| (state != State.PENDING_CLOSE && state != State.CLOSING))) {
|
||||||
regionsInTransition.put(regionName, regionState);
|
regionsInTransition.put(regionName, regionState);
|
||||||
|
|
|
@ -1566,7 +1566,14 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||||
this.rpcServer.start();
|
this.rpcServer.start();
|
||||||
|
|
||||||
// Create the log splitting worker and start it
|
// Create the log splitting worker and start it
|
||||||
this.splitLogWorker = new SplitLogWorker(this.zooKeeper, this.getConfiguration(), this, this);
|
// set a smaller retries to fast fail otherwise splitlogworker could be blocked for
|
||||||
|
// quite a while inside HConnection layer. The worker won't be available for other
|
||||||
|
// tasks even after current task is preempted after a split task times out.
|
||||||
|
Configuration sinkConf = HBaseConfiguration.create(conf);
|
||||||
|
sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
||||||
|
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER - 2);
|
||||||
|
sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
|
||||||
|
this.splitLogWorker = new SplitLogWorker(this.zooKeeper, sinkConf, this, this);
|
||||||
splitLogWorker.start();
|
splitLogWorker.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.SplitLogCounters;
|
import org.apache.hadoop.hbase.SplitLogCounters;
|
||||||
import org.apache.hadoop.hbase.SplitLogTask;
|
import org.apache.hadoop.hbase.SplitLogTask;
|
||||||
|
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||||
import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
|
import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
|
||||||
|
@ -92,6 +93,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
|
||||||
private boolean workerInGrabTask = false;
|
private boolean workerInGrabTask = false;
|
||||||
private final int report_period;
|
private final int report_period;
|
||||||
private RegionServerServices server = null;
|
private RegionServerServices server = null;
|
||||||
|
private Configuration conf = null;
|
||||||
|
|
||||||
public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf,
|
public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf,
|
||||||
RegionServerServices server, TaskExecutor splitTaskExecutor) {
|
RegionServerServices server, TaskExecutor splitTaskExecutor) {
|
||||||
|
@ -101,6 +103,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
|
||||||
this.splitTaskExecutor = splitTaskExecutor;
|
this.splitTaskExecutor = splitTaskExecutor;
|
||||||
report_period = conf.getInt("hbase.splitlog.report.period",
|
report_period = conf.getInt("hbase.splitlog.report.period",
|
||||||
conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
|
conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
|
||||||
|
this.conf = conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, ServerName serverName,
|
public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, ServerName serverName,
|
||||||
|
@ -110,6 +113,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
|
||||||
this.splitTaskExecutor = splitTaskExecutor;
|
this.splitTaskExecutor = splitTaskExecutor;
|
||||||
report_period = conf.getInt("hbase.splitlog.report.period",
|
report_period = conf.getInt("hbase.splitlog.report.period",
|
||||||
conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
|
conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
|
||||||
|
this.conf = conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf,
|
public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf,
|
||||||
|
@ -165,6 +169,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
|
||||||
try {
|
try {
|
||||||
LOG.info("SplitLogWorker " + this.serverName + " starting");
|
LOG.info("SplitLogWorker " + this.serverName + " starting");
|
||||||
this.watcher.registerListener(this);
|
this.watcher.registerListener(this);
|
||||||
|
// initialize a new connection for splitlogworker configuration
|
||||||
|
HConnectionManager.getConnection(conf);
|
||||||
int res;
|
int res;
|
||||||
// wait for master to create the splitLogZnode
|
// wait for master to create the splitLogZnode
|
||||||
res = -1;
|
res = -1;
|
||||||
|
|
|
@ -1650,18 +1650,8 @@ public class HLogSplitter {
|
||||||
private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
|
private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
|
||||||
private boolean hasEditsInDisablingOrDisabledTables = false;
|
private boolean hasEditsInDisablingOrDisabledTables = false;
|
||||||
|
|
||||||
private Configuration sinkConf;
|
|
||||||
public LogReplayOutputSink(int numWriters) {
|
public LogReplayOutputSink(int numWriters) {
|
||||||
super(numWriters);
|
super(numWriters);
|
||||||
// set a smaller retries to fast fail otherwise splitlogworker could be blocked for
|
|
||||||
// quite a while inside HConnection layer. The worker won't available for other
|
|
||||||
// tasks even after current task is preempted after a split task times out.
|
|
||||||
sinkConf = HBaseConfiguration.create(conf);
|
|
||||||
sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
|
||||||
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER - 2);
|
|
||||||
sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT / 2);
|
|
||||||
sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
|
|
||||||
|
|
||||||
this.waitRegionOnlineTimeOut = conf.getInt("hbase.splitlog.manager.timeout",
|
this.waitRegionOnlineTimeOut = conf.getInt("hbase.splitlog.manager.timeout",
|
||||||
SplitLogManager.DEFAULT_TIMEOUT);
|
SplitLogManager.DEFAULT_TIMEOUT);
|
||||||
this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters);
|
this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters);
|
||||||
|
@ -1809,9 +1799,6 @@ public class HLogSplitter {
|
||||||
// skip current kv if column family doesn't exist anymore or already flushed
|
// skip current kv if column family doesn't exist anymore or already flushed
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
LOG.warn("Can't find store max sequence ids map for region:"
|
|
||||||
+ loc.getRegionInfo().getEncodedName());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1861,21 +1848,29 @@ public class HLogSplitter {
|
||||||
*/
|
*/
|
||||||
private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
|
private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
|
||||||
byte[] table, byte[] row, String originalEncodedRegionName) throws IOException {
|
byte[] table, byte[] row, String originalEncodedRegionName) throws IOException {
|
||||||
|
|
||||||
|
// fetch location from cache
|
||||||
HRegionLocation loc = onlineRegions.get(originalEncodedRegionName);
|
HRegionLocation loc = onlineRegions.get(originalEncodedRegionName);
|
||||||
if(loc != null) return loc;
|
if(loc != null) return loc;
|
||||||
|
// fetch location from .META.
|
||||||
loc = hconn.getRegionLocation(table, row, false);
|
loc = hconn.getRegionLocation(table, row, false);
|
||||||
if (loc == null) {
|
if (loc == null) {
|
||||||
throw new IOException("Can't locate location for row:" + Bytes.toString(row)
|
throw new IOException("Can't locate location for row:" + Bytes.toString(row)
|
||||||
+ " of table:" + Bytes.toString(table));
|
+ " of table:" + Bytes.toString(table));
|
||||||
}
|
}
|
||||||
|
// check if current row moves to a different region due to region merge/split
|
||||||
|
if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) {
|
||||||
|
// originalEncodedRegionName should have already flushed
|
||||||
|
lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE);
|
||||||
|
HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName());
|
||||||
|
if (tmpLoc != null) return tmpLoc;
|
||||||
|
}
|
||||||
|
|
||||||
Long lastFlushedSequenceId = -1l;
|
Long lastFlushedSequenceId = -1l;
|
||||||
loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut);
|
loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut);
|
||||||
Long cachedLastFlushedSequenceId = lastFlushedSequenceIds.get(loc.getRegionInfo()
|
Long cachedLastFlushedSequenceId = lastFlushedSequenceIds.get(loc.getRegionInfo()
|
||||||
.getEncodedName());
|
.getEncodedName());
|
||||||
|
|
||||||
onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
|
|
||||||
// retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
|
// retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
|
||||||
// update the value for the region
|
// update the value for the region
|
||||||
RegionStoreSequenceIds ids =
|
RegionStoreSequenceIds ids =
|
||||||
|
@ -1902,6 +1897,7 @@ public class HLogSplitter {
|
||||||
+ " because it's not in recovering.");
|
+ " because it's not in recovering.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
|
||||||
return loc;
|
return loc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2068,6 +2064,7 @@ public class HLogSplitter {
|
||||||
for (byte[] tableName : this.tableNameToHConnectionMap.keySet()) {
|
for (byte[] tableName : this.tableNameToHConnectionMap.keySet()) {
|
||||||
HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
|
HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
|
||||||
try {
|
try {
|
||||||
|
hconn.clearRegionCache();
|
||||||
hconn.close();
|
hconn.close();
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
result.add(ioe);
|
result.add(ioe);
|
||||||
|
@ -2128,7 +2125,7 @@ public class HLogSplitter {
|
||||||
synchronized (this.tableNameToHConnectionMap) {
|
synchronized (this.tableNameToHConnectionMap) {
|
||||||
hconn = this.tableNameToHConnectionMap.get(tableName);
|
hconn = this.tableNameToHConnectionMap.get(tableName);
|
||||||
if (hconn == null) {
|
if (hconn == null) {
|
||||||
hconn = HConnectionManager.createConnection(sinkConf);
|
hconn = HConnectionManager.getConnection(conf);
|
||||||
this.tableNameToHConnectionMap.put(tableName, hconn);
|
this.tableNameToHConnectionMap.put(tableName, hconn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -200,14 +200,14 @@ public class TestSplitLogWorker {
|
||||||
slw.start();
|
slw.start();
|
||||||
try {
|
try {
|
||||||
Thread.yield(); // let the worker start
|
Thread.yield(); // let the worker start
|
||||||
Thread.sleep(100);
|
Thread.sleep(1000);
|
||||||
|
|
||||||
// this time create a task node after starting the splitLogWorker
|
// this time create a task node after starting the splitLogWorker
|
||||||
zkw.getRecoverableZooKeeper().create(PATH,
|
zkw.getRecoverableZooKeeper().create(PATH,
|
||||||
new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
|
new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
|
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1500);
|
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 8000);
|
||||||
assertEquals(1, slw.taskReadySeq);
|
assertEquals(1, slw.taskReadySeq);
|
||||||
byte [] bytes = ZKUtil.getData(zkw, PATH);
|
byte [] bytes = ZKUtil.getData(zkw, PATH);
|
||||||
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
|
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
|
||||||
|
|
Loading…
Reference in New Issue