HBASE-8729: distributedLogReplay may hang during chained region server failure

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1499925 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jeffreyz 2013-07-05 07:44:00 +00:00
parent 2f6599ef01
commit 8e78536a3b
11 changed files with 183 additions and 32 deletions

View File

@ -223,6 +223,13 @@ public enum EventType {
* Master is processing recovery of regions found in ZK RIT
*/
M_MASTER_RECOVERY (73, ExecutorType.MASTER_SERVER_OPERATIONS),
/**
* Master controlled events to be executed on the master.<br>
*
* M_LOG_REPLAY<br>
* Master is processing log replay of failed region server
*/
M_LOG_REPLAY (74, ExecutorType.M_LOG_REPLAY_OPS),
/**
* RS controlled events to be executed on the RS.<br>

View File

@ -35,6 +35,7 @@ public enum ExecutorType {
MASTER_TABLE_OPERATIONS (4),
MASTER_RS_SHUTDOWN (5),
MASTER_META_SERVER_OPERATIONS (6),
M_LOG_REPLAY_OPS (7),
// RegionServer executor services
RS_OPEN_REGION (20),

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.ChaosMonkey;
import org.junit.After;
import org.junit.Before;
@ -43,8 +44,18 @@ public class IntegrationTestDataIngestWithChaosMonkey extends IngestIntegrationT
@Before
public void setUp() throws Exception {
util= getTestingUtil(null);
Configuration conf = util.getConfiguration();
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 25);
if (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG)) {
// when distributedLogReplay is enabled, we need to make sure rpc timeout & retires are
// smaller enough in order for the replay can complete before ChaosMonkey kills another region
// server
conf.setInt("hbase.regionserver.handler.count", 20);
conf.setInt("hbase.log.replay.retries.number", 2);
conf.setBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, true);
}
if(!util.isDistributedCluster()) {
// In MiniCluster mode, we increase number of RS a little bit to speed the test
NUM_SLAVES_BASE = 5;
}
super.setUp(NUM_SLAVES_BASE);

View File

@ -1096,9 +1096,11 @@ MasterServices, Server {
this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
conf.getInt("hbase.master.executor.closeregion.threads", 5));
this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
conf.getInt("hbase.master.executor.serverops.threads", 3));
conf.getInt("hbase.master.executor.serverops.threads", 5));
this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
conf.getInt("hbase.master.executor.serverops.threads", 5));
this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
conf.getInt("hbase.master.executor.logreplayops.threads", 10));
// We depend on there being only one instance of this executor running
// at a time. To do concurrency, would need fencing of enable/disable of

View File

@ -319,21 +319,33 @@ public class MasterFileSystem {
private List<Path> getLogDirs(final Set<ServerName> serverNames) throws IOException {
List<Path> logDirs = new ArrayList<Path>();
for (ServerName serverName: serverNames) {
Path logDir = new Path(this.rootdir, HLogUtil.getHLogDirectoryName(serverName.toString()));
Path splitDir = logDir.suffix(HLog.SPLITTING_EXT);
// Rename the directory so a rogue RS doesn't create more HLogs
if (fs.exists(logDir)) {
if (!this.fs.rename(logDir, splitDir)) {
throw new IOException("Failed fs.rename for log split: " + logDir);
boolean needReleaseLock = false;
if (!this.services.isInitialized()) {
// during master initialization, we could have multiple places splitting a same wal
this.splitLogLock.lock();
needReleaseLock = true;
}
try {
for (ServerName serverName : serverNames) {
Path logDir = new Path(this.rootdir, HLogUtil.getHLogDirectoryName(serverName.toString()));
Path splitDir = logDir.suffix(HLog.SPLITTING_EXT);
// Rename the directory so a rogue RS doesn't create more HLogs
if (fs.exists(logDir)) {
if (!this.fs.rename(logDir, splitDir)) {
throw new IOException("Failed fs.rename for log split: " + logDir);
}
logDir = splitDir;
LOG.debug("Renamed region directory: " + splitDir);
} else if (!fs.exists(splitDir)) {
LOG.info("Log dir for server " + serverName + " does not exist");
continue;
}
logDir = splitDir;
LOG.debug("Renamed region directory: " + splitDir);
} else if (!fs.exists(splitDir)) {
LOG.info("Log dir for server " + serverName + " does not exist");
continue;
logDirs.add(splitDir);
}
} finally {
if (needReleaseLock) {
this.splitLogLock.unlock();
}
logDirs.add(splitDir);
}
return logDirs;
}

View File

@ -0,0 +1,88 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.master.handler;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.DeadServer;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
/**
* Handle logReplay work from SSH. Having a separate handler is not to block SSH in re-assigning
* regions from dead servers. Otherwise, available SSH handlers could be blocked by logReplay work
* (from {@link MasterFileSystem#splitLog(ServerName)}). During logReplay, if a receiving RS(say A)
* fails again, regions on A won't be able to be assigned to another live RS which causes the log
* replay unable to complete because WAL edits replay depends on receiving RS to be live
*/
@InterfaceAudience.Private
public class LogReplayHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(LogReplayHandler.class);
private final ServerName serverName;
protected final Server master;
protected final MasterServices services;
protected final DeadServer deadServers;
public LogReplayHandler(final Server server, final MasterServices services,
final DeadServer deadServers, final ServerName serverName) {
super(server, EventType.M_LOG_REPLAY);
this.master = server;
this.services = services;
this.deadServers = deadServers;
this.serverName = serverName;
this.deadServers.add(serverName);
}
@Override
public String toString() {
String name = serverName.toString();
return getClass().getSimpleName() + "-" + name + "-" + getSeqid();
}
@Override
public void process() throws IOException {
try {
if (this.master != null && this.master.isStopped()) {
// we're exiting ...
return;
}
this.services.getMasterFileSystem().splitLog(serverName);
} catch (Exception ex) {
if (ex instanceof IOException) {
// resubmit log replay work when failed
this.services.getExecutorService().submit((LogReplayHandler) this);
this.deadServers.add(serverName);
throw new IOException("failed log replay for " + serverName + ", will retry", ex);
} else {
throw new IOException(ex);
}
} finally {
this.deadServers.finish(serverName);
}
// logReplay is the last step of SSH so log a line to indicate that
LOG.info("Finished processing of shutdown of " + serverName);
}
}

View File

@ -115,6 +115,7 @@ public class ServerShutdownHandler extends EventHandler {
@Override
public void process() throws IOException {
boolean hasLogReplayWork = false;
final ServerName serverName = this.serverName;
try {
@ -280,7 +281,10 @@ public class ServerShutdownHandler extends EventHandler {
+ " didn't complete assignment in time");
}
}
this.services.getMasterFileSystem().splitLog(serverName);
// submit logReplay work
this.services.getExecutorService().submit(
new LogReplayHandler(this.server, this.services, this.deadServers, this.serverName));
hasLogReplayWork = true;
}
} catch (Exception ex) {
if (ex instanceof IOException) {
@ -293,7 +297,9 @@ public class ServerShutdownHandler extends EventHandler {
this.deadServers.finish(serverName);
}
LOG.info("Finished processing of shutdown of " + serverName);
if (!hasLogReplayWork) {
LOG.info("Finished processing of shutdown of " + serverName);
}
}
private void resubmit(final ServerName serverName, IOException ex) throws IOException {

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HealthCheckChore;
@ -1577,8 +1578,10 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
// quite a while inside HConnection layer. The worker won't be available for other
// tasks even after current task is preempted after a split task times out.
Configuration sinkConf = HBaseConfiguration.create(conf);
sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER - 2);
sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
this.splitLogWorker = new SplitLogWorker(this.zooKeeper, sinkConf, this, this);
splitLogWorker.start();
@ -3976,11 +3979,21 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
case SUCCESS:
break;
}
if (isReplay && codes[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
// in replay mode, we only need to catpure the first error because we will retry the whole
// batch when an error happens
break;
}
}
} catch (IOException ie) {
ActionResult result = ResponseConverter.buildActionResult(ie);
for (int i = 0; i < mutations.size(); i++) {
builder.setResult(i, result);
if (isReplay) {
// in replay mode, we only need to catpure the first error because we will retry the whole
// batch when an error happens
break;
}
}
}
long after = EnvironmentEdgeManager.currentTimeMillis();

View File

@ -171,9 +171,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
this.watcher.registerListener(this);
// initialize a new connection for splitlogworker configuration
HConnectionManager.getConnection(conf);
int res;
// wait for master to create the splitLogZnode
res = -1;
int res = -1;
while (res == -1 && !exitWorker) {
try {
res = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
@ -386,12 +385,9 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
case RESIGNED:
if (exitWorker) {
LOG.info("task execution interrupted because worker is exiting " + path);
endTask(new SplitLogTask.Resigned(this.serverName),
SplitLogCounters.tot_wkr_task_resigned);
} else {
SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
LOG.info("task execution interrupted via zk by manager " + path);
}
endTask(new SplitLogTask.Resigned(this.serverName),
SplitLogCounters.tot_wkr_task_resigned);
break;
}
} finally {

View File

@ -546,7 +546,6 @@ public class HLogSplitter {
lastFlushedSequenceId = lastFlushedSequenceIds.get(key);
if (lastFlushedSequenceId == null) {
if (this.distributedLogReplay) {
lastFlushedSequenceId = -1L;
RegionStoreSequenceIds ids =
SplitLogManager.getRegionFlushedSequenceId(this.watcher, failedServerName, key);
if (ids != null) {
@ -555,11 +554,10 @@ public class HLogSplitter {
} else if (sequenceIdChecker != null) {
lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region);
}
if (lastFlushedSequenceId != null && lastFlushedSequenceId >= 0) {
lastFlushedSequenceIds.put(key, lastFlushedSequenceId);
} else {
if (lastFlushedSequenceId == null) {
lastFlushedSequenceId = -1L;
}
lastFlushedSequenceIds.put(key, lastFlushedSequenceId);
}
if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
editsSkipped++;

View File

@ -38,6 +38,8 @@ import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.ServerCallable;
import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@ -60,6 +62,7 @@ import com.google.protobuf.ServiceException;
public class WALEditsReplaySink {
private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class);
private static final int MAX_BATCH_SIZE = 3000;
private final Configuration conf;
private final HConnection conn;
@ -67,6 +70,7 @@ public class WALEditsReplaySink {
private final MetricsWALEditsReplay metrics;
private final AtomicLong totalReplayedEdits = new AtomicLong();
private final boolean skipErrors;
private final int replayTimeout;
/**
* Create a sink for WAL log entries replay
@ -83,6 +87,8 @@ public class WALEditsReplaySink {
this.tableName = tableName;
this.skipErrors = conf.getBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS);
// a single replay operation time out and default is 60 seconds
this.replayTimeout = conf.getInt("hbase.regionserver.logreplay.timeout", 60000);
}
/**
@ -121,7 +127,18 @@ public class WALEditsReplaySink {
// replaying edits by region
for (HRegionInfo curRegion : actionsByRegion.keySet()) {
replayEdits(loc, curRegion, actionsByRegion.get(curRegion));
List<Action<Row>> allActions = actionsByRegion.get(curRegion);
// send edits in chunks
int totalActions = allActions.size();
int replayedActions = 0;
int curBatchSize = 0;
for (; replayedActions < totalActions;) {
curBatchSize = (totalActions > (MAX_BATCH_SIZE + replayedActions)) ? MAX_BATCH_SIZE
: (totalActions - replayedActions);
replayEdits(loc, curRegion, allActions.subList(replayedActions,
replayedActions + curBatchSize));
replayedActions += curBatchSize;
}
}
long endTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
@ -173,7 +190,7 @@ public class WALEditsReplaySink {
ReplayServerCallable(final HConnection connection, final byte [] tableName,
final HRegionLocation regionLoc, final HRegionInfo regionInfo,
final List<Action<Row>> actions) {
super(connection, tableName, null);
super(connection, tableName, null, replayTimeout);
this.actions = actions;
this.regionInfo = regionInfo;
this.location = regionLoc;