diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java index a65ff881101..1cb026a1524 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java @@ -261,7 +261,9 @@ public class RegionStates { RegionState regionState = new RegionState( hri, state, System.currentTimeMillis(), newServerName); RegionState oldState = regionStates.put(regionName, regionState); - LOG.info("Region " + hri + " transitioned from " + oldState + " to " + regionState); + if (oldState == null || oldState.getState() != regionState.getState()) { + LOG.info("Region " + hri + " transitioned from " + oldState + " to " + regionState); + } if (state != State.SPLITTING && (newServerName != null || (state != State.PENDING_CLOSE && state != State.CLOSING))) { regionsInTransition.put(regionName, regionState); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 2ac2d491051..b85aff11d6a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1566,7 +1566,14 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa this.rpcServer.start(); // Create the log splitting worker and start it - this.splitLogWorker = new SplitLogWorker(this.zooKeeper, this.getConfiguration(), this, this); + // set a smaller retries to fast fail otherwise splitlogworker could be blocked for + // quite a while inside HConnection layer. The worker won't be available for other + // tasks even after current task is preempted after a split task times out. + Configuration sinkConf = HBaseConfiguration.create(conf); + sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER - 2); + sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1); + this.splitLogWorker = new SplitLogWorker(this.zooKeeper, sinkConf, this, this); splitLogWorker.start(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index 1e2e554e5b0..a2676fbf89e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogTask; +import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.NotServingRegionException; @@ -92,6 +93,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { private boolean workerInGrabTask = false; private final int report_period; private RegionServerServices server = null; + private Configuration conf = null; public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, RegionServerServices server, TaskExecutor splitTaskExecutor) { @@ -101,6 +103,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { this.splitTaskExecutor = splitTaskExecutor; report_period = conf.getInt("hbase.splitlog.report.period", conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3); + this.conf = conf; } public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, ServerName serverName, @@ -110,6 +113,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { this.splitTaskExecutor = splitTaskExecutor; report_period = conf.getInt("hbase.splitlog.report.period", conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3); + this.conf = conf; } public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf, @@ -165,6 +169,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { try { LOG.info("SplitLogWorker " + this.serverName + " starting"); this.watcher.registerListener(this); + // initialize a new connection for splitlogworker configuration + HConnectionManager.getConnection(conf); int res; // wait for master to create the splitLogZnode res = -1; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index 2ebb041124d..bbac87f3204 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -1650,18 +1650,8 @@ public class HLogSplitter { private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink; private boolean hasEditsInDisablingOrDisabledTables = false; - private Configuration sinkConf; public LogReplayOutputSink(int numWriters) { super(numWriters); - // set a smaller retries to fast fail otherwise splitlogworker could be blocked for - // quite a while inside HConnection layer. The worker won't available for other - // tasks even after current task is preempted after a split task times out. - sinkConf = HBaseConfiguration.create(conf); - sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER - 2); - sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT / 2); - sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1); - this.waitRegionOnlineTimeOut = conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT); this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters); @@ -1809,9 +1799,6 @@ public class HLogSplitter { // skip current kv if column family doesn't exist anymore or already flushed continue; } - } else { - LOG.warn("Can't find store max sequence ids map for region:" - + loc.getRegionInfo().getEncodedName()); } } @@ -1861,21 +1848,29 @@ public class HLogSplitter { */ private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn, byte[] table, byte[] row, String originalEncodedRegionName) throws IOException { + + // fetch location from cache HRegionLocation loc = onlineRegions.get(originalEncodedRegionName); if(loc != null) return loc; - + // fetch location from .META. loc = hconn.getRegionLocation(table, row, false); if (loc == null) { throw new IOException("Can't locate location for row:" + Bytes.toString(row) + " of table:" + Bytes.toString(table)); } + // check if current row moves to a different region due to region merge/split + if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) { + // originalEncodedRegionName should have already flushed + lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE); + HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName()); + if (tmpLoc != null) return tmpLoc; + } Long lastFlushedSequenceId = -1l; loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut); Long cachedLastFlushedSequenceId = lastFlushedSequenceIds.get(loc.getRegionInfo() .getEncodedName()); - onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc); // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will // update the value for the region RegionStoreSequenceIds ids = @@ -1901,7 +1896,8 @@ public class HLogSplitter { LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName() + " because it's not in recovering."); } - + + onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc); return loc; } @@ -2068,6 +2064,7 @@ public class HLogSplitter { for (byte[] tableName : this.tableNameToHConnectionMap.keySet()) { HConnection hconn = this.tableNameToHConnectionMap.get(tableName); try { + hconn.clearRegionCache(); hconn.close(); } catch (IOException ioe) { result.add(ioe); @@ -2128,7 +2125,7 @@ public class HLogSplitter { synchronized (this.tableNameToHConnectionMap) { hconn = this.tableNameToHConnectionMap.get(tableName); if (hconn == null) { - hconn = HConnectionManager.createConnection(sinkConf); + hconn = HConnectionManager.getConnection(conf); this.tableNameToHConnectionMap.put(tableName, hconn); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java index c1bee3eaa5c..127829f308d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java @@ -200,14 +200,14 @@ public class TestSplitLogWorker { slw.start(); try { Thread.yield(); // let the worker start - Thread.sleep(100); + Thread.sleep(1000); // this time create a task node after starting the splitLogWorker zkw.getRecoverableZooKeeper().create(PATH, new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1500); + waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 8000); assertEquals(1, slw.taskReadySeq); byte [] bytes = ZKUtil.getData(zkw, PATH); SplitLogTask slt = SplitLogTask.parseFrom(bytes);