HBASE-11072 Abstract WAL splitting from ZK (Sergey Soldatov)

This commit is contained in:
stack 2014-08-29 16:46:32 -07:00
parent b7f7514762
commit 1abaacffb5
17 changed files with 665 additions and 1857 deletions

View File

@ -1017,6 +1017,9 @@ public final class HConstants {
public static final String HBASE_COORDINATED_STATE_MANAGER_CLASS =
"hbase.coordinated.state.manager.class";
/** Configuration key for SplitLog manager timeout */
public static final String HBASE_SPLITLOG_MANAGER_TIMEOUT = "hbase.splitlog.manager.timeout";
/**
* Configuration keys for Bucket cache
*/

View File

@ -52,4 +52,13 @@ public abstract class BaseCoordinatedStateManager implements CoordinatedStateMan
@Override
public abstract TableStateManager getTableStateManager() throws InterruptedException,
CoordinatedStateException;
/**
* Method to retrieve coordination for split log worker
*/
public abstract SplitLogWorkerCoordination getSplitLogWorkerCoordination();
/**
* Method to retrieve coordination for split log manager
*/
public abstract SplitLogManagerCoordination getSplitLogManagerCoordination();
}

View File

@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CoordinatedStateException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableStateManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.zookeeper.ZKTableStateManager;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@ -32,11 +33,16 @@ import org.apache.zookeeper.KeeperException;
public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
protected Server server;
protected ZooKeeperWatcher watcher;
protected SplitLogWorkerCoordination splitLogWorkerCoordination;
protected SplitLogManagerCoordination splitLogManagerCoordination;
@Override
public void initialize(Server server) {
this.server = server;
this.watcher = server.getZooKeeper();
splitLogWorkerCoordination = new ZkSplitLogWorkerCoordination(this, watcher);
splitLogManagerCoordination = new ZKSplitLogManagerCoordination(this, watcher);
}
@Override
@ -53,4 +59,13 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
throw new CoordinatedStateException(e);
}
}
@Override
public SplitLogWorkerCoordination getSplitLogWorkerCoordination() {
return splitLogWorkerCoordination;
}
@Override
public SplitLogManagerCoordination getSplitLogManagerCoordination() {
return splitLogManagerCoordination;
}
}

View File

@ -56,7 +56,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException;
/**
* This class abstracts a bunch of operations the HMaster needs to interact with
@ -91,12 +90,14 @@ public class MasterFileSystem {
private final MasterServices services;
final static PathFilter META_FILTER = new PathFilter() {
@Override
public boolean accept(Path p) {
return HLogUtil.isMetaFile(p);
}
};
final static PathFilter NON_META_FILTER = new PathFilter() {
@Override
public boolean accept(Path p) {
return !HLogUtil.isMetaFile(p);
}
@ -123,14 +124,10 @@ public class MasterFileSystem {
// set up the archived logs path
this.oldLogDir = createInitialFileSystemLayout();
HFileSystem.addLocationsOrderInterceptor(conf);
try {
this.splitLogManager = new SplitLogManager(master.getZooKeeper(),
master.getConfiguration(), master, services,
this.splitLogManager =
new SplitLogManager(master, master.getConfiguration(), master, services,
master.getServerName());
} catch (KeeperException e) {
throw new IOException(e);
}
this.distributedLogReplay = (this.splitLogManager.getRecoveryMode() == RecoveryMode.LOG_REPLAY);
this.distributedLogReplay = this.splitLogManager.isLogReplaying();
}
/**
@ -350,11 +347,7 @@ public class MasterFileSystem {
if (regions == null || regions.isEmpty()) {
return;
}
try {
this.splitLogManager.markRegionsRecoveringInZK(serverName, regions);
} catch (KeeperException e) {
throw new IOException(e);
}
this.splitLogManager.markRegionsRecovering(serverName, regions);
}
public void splitLog(final Set<ServerName> serverNames) throws IOException {
@ -362,13 +355,13 @@ public class MasterFileSystem {
}
/**
* Wrapper function on {@link SplitLogManager#removeStaleRecoveringRegionsFromZK(Set)}
* Wrapper function on {@link SplitLogManager#removeStaleRecoveringRegions(Set)}
* @param failedServers
* @throws KeeperException
* @throws IOException
*/
void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers)
throws KeeperException, InterruptedIOException {
this.splitLogManager.removeStaleRecoveringRegionsFromZK(failedServers);
throws IOException, InterruptedIOException {
this.splitLogManager.removeStaleRecoveringRegions(failedServers);
}
/**
@ -651,15 +644,10 @@ public class MasterFileSystem {
/**
* The function is used in SSH to set recovery mode based on configuration after all outstanding
* log split tasks drained.
* @throws KeeperException
* @throws InterruptedIOException
* @throws IOException
*/
public void setLogRecoveryMode() throws IOException {
try {
this.splitLogManager.setRecoveryMode(false);
} catch (KeeperException e) {
throw new IOException(e);
}
}
public RecoveryMode getLogRecoveryMode() {

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
@ -89,7 +90,6 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -136,6 +136,7 @@ import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -1523,8 +1524,8 @@ public class HRegionServer extends HasThread implements
this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
}
this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS,
conf.getInt("hbase.regionserver.wal.max.splitters", SplitLogWorker.DEFAULT_MAX_SPLITTERS));
this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
"hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), getName() + ".logRoller",
uncaughtExceptionHandler);
@ -1577,7 +1578,7 @@ public class HRegionServer extends HasThread implements
sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
this.splitLogWorker = new SplitLogWorker(this.zooKeeper, sinkConf, this, this);
this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this);
splitLogWorker.start();
}
@ -2880,7 +2881,7 @@ public class HRegionServer extends HasThread implements
throw new InterruptedIOException();
}
if (data != null) {
lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
lastRecordedFlushedSequenceId = ZKSplitLog.parseLastFlushedSequenceIdFrom(data);
}
if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));

View File

@ -82,7 +82,6 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
@ -159,6 +158,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.net.DNS;
import org.apache.zookeeper.KeeperException;
@ -1294,7 +1294,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (previous == null) {
// check if the region to be opened is marked in recovering state in ZK
if (SplitLogManager.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(),
if (ZKSplitLog.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(),
region.getEncodedName())) {
// check if current region open is for distributedLogReplay. This check is to support
// rolling restart/upgrade where we want to Master/RS see same configuration
@ -1306,7 +1306,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// could happen when turn distributedLogReplay off from on.
List<String> tmpRegions = new ArrayList<String>();
tmpRegions.add(region.getEncodedName());
SplitLogManager.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(), tmpRegions);
ZKSplitLog.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(),
tmpRegions);
}
}
// If there is no action in progress, we can submit a specific handler.

View File

@ -22,111 +22,69 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import com.google.common.annotations.VisibleForTesting;
/**
* This worker is spawned in every regionserver (should we also spawn one in
* the master?). The Worker waits for log splitting tasks to be put up by the
* {@link SplitLogManager} running in the master and races with other workers
* in other serves to acquire those tasks. The coordination is done via
* zookeeper. All the action takes place at /hbase/splitlog znode.
* This worker is spawned in every regionserver, including master. The Worker waits for log
* splitting tasks to be put up by the {@link SplitLogManager} running in the master and races with
* other workers in other serves to acquire those tasks. The coordination is done via coordination
* engine.
* <p>
* If a worker has successfully moved the task from state UNASSIGNED to
* OWNED then it owns the task. It keeps heart beating the manager by
* periodically moving the task from UNASSIGNED to OWNED state. On success it
* moves the task to TASK_DONE. On unrecoverable error it moves task state to
* ERR. If it cannot continue but wants the master to retry the task then it
* moves the task state to RESIGNED.
* If a worker has successfully moved the task from state UNASSIGNED to OWNED then it owns the task.
* It keeps heart beating the manager by periodically moving the task from UNASSIGNED to OWNED
* state. On success it moves the task to TASK_DONE. On unrecoverable error it moves task state to
* ERR. If it cannot continue but wants the master to retry the task then it moves the task state to
* RESIGNED.
* <p>
* The manager can take a task away from a worker by moving the task from
* OWNED to UNASSIGNED. In the absence of a global lock there is a
* unavoidable race here - a worker might have just finished its task when it
* is stripped of its ownership. Here we rely on the idempotency of the log
* The manager can take a task away from a worker by moving the task from OWNED to UNASSIGNED. In
* the absence of a global lock there is a unavoidable race here - a worker might have just finished
* its task when it is stripped of its ownership. Here we rely on the idempotency of the log
* splitting task for correctness
*/
@InterfaceAudience.Private
public class SplitLogWorker extends ZooKeeperListener implements Runnable {
public static final int DEFAULT_MAX_SPLITTERS = 2;
public class SplitLogWorker implements Runnable {
private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
private static final int checkInterval = 5000; // 5 seconds
private static final int FAILED_TO_OWN_TASK = -1;
Thread worker;
private final ServerName serverName;
private final TaskExecutor splitTaskExecutor;
// thread pool which executes recovery work
private final ExecutorService executorService;
private final Object taskReadyLock = new Object();
volatile int taskReadySeq = 0;
private volatile String currentTask = null;
private int currentVersion;
private volatile boolean exitWorker;
private final Object grabTaskLock = new Object();
private boolean workerInGrabTask = false;
private final int report_period;
private RegionServerServices server = null;
private Configuration conf = null;
protected final AtomicInteger tasksInProgress = new AtomicInteger(0);
private int maxConcurrentTasks = 0;
public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, RegionServerServices server,
private SplitLogWorkerCoordination coordination;
private Configuration conf;
private RegionServerServices server;
public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server,
TaskExecutor splitTaskExecutor) {
super(watcher);
this.server = server;
this.serverName = server.getServerName();
this.splitTaskExecutor = splitTaskExecutor;
report_period = conf.getInt("hbase.splitlog.report.period",
conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
this.conf = conf;
this.executorService = this.server.getExecutorService();
this.maxConcurrentTasks =
conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS);
this.coordination =
((BaseCoordinatedStateManager) hserver.getCoordinatedStateManager())
.getSplitLogWorkerCoordination();
this.server = server;
coordination.init(server, conf, splitTaskExecutor, this);
}
public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf,
public SplitLogWorker(final Server hserver, final Configuration conf,
final RegionServerServices server, final LastSequenceId sequenceIdChecker) {
this(watcher, conf, server, new TaskExecutor() {
this(server, conf, server, new TaskExecutor() {
@Override
public Status exec(String filename, RecoveryMode mode, CancelableProgressable p) {
Path rootdir;
@ -143,7 +101,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
// encountered a bad non-retry-able persistent error.
try {
if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
fs, conf, p, sequenceIdChecker, watcher, server.getCoordinatedStateManager(), mode)) {
fs, conf, p, sequenceIdChecker, server.getCoordinatedStateManager(), mode)) {
return Status.PREEMPTED;
}
} catch (InterruptedIOException iioe) {
@ -160,9 +118,6 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
} else if (cause instanceof InterruptedException) {
LOG.warn("log splitting of " + filename + " interrupted, resigning", e);
return Status.RESIGNED;
} else if(cause instanceof KeeperException) {
LOG.warn("log splitting of " + filename + " hit ZooKeeper issue, resigning", e);
return Status.RESIGNED;
}
LOG.warn("log splitting of " + filename + " failed, returning error", e);
return Status.ERR;
@ -175,32 +130,22 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
@Override
public void run() {
try {
LOG.info("SplitLogWorker " + this.serverName + " starting");
this.watcher.registerListener(this);
LOG.info("SplitLogWorker " + server.getServerName() + " starting");
coordination.registerListener();
// pre-initialize a new connection for splitlogworker configuration
HConnectionManager.getConnection(conf);
// wait for master to create the splitLogZnode
int res = -1;
while (res == -1 && !exitWorker) {
try {
res = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
} catch (KeeperException e) {
// ignore
LOG.warn("Exception when checking for " + watcher.splitLogZNode + " ... retrying", e);
// wait for Coordination Engine is ready
boolean res = false;
while (!res && !coordination.isStop()) {
res = coordination.isReady();
}
if (res == -1) {
LOG.info(watcher.splitLogZNode + " znode does not exist, waiting for master to create");
Thread.sleep(1000);
}
}
if (!exitWorker) {
taskLoop();
if (!coordination.isStop()) {
coordination.taskLoop();
}
} catch (Throwable t) {
if (ExceptionUtil.isInterrupt(t)) {
LOG.info("SplitLogWorker interrupted. Exiting. " + (exitWorker ? "" :
LOG.info("SplitLogWorker interrupted. Exiting. " + (coordination.isStop() ? "" :
" (ERROR: exitWorker is not set, exiting anyway)"));
} else {
// only a logical error can cause here. Printing it out
@ -208,394 +153,24 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
LOG.error("unexpected error ", t);
}
} finally {
LOG.info("SplitLogWorker " + this.serverName + " exiting");
coordination.removeListener();
LOG.info("SplitLogWorker " + server.getServerName() + " exiting");
}
}
/**
* Wait for tasks to become available at /hbase/splitlog zknode. Grab a task
* one at a time. This policy puts an upper-limit on the number of
* simultaneous log splitting that could be happening in a cluster.
* <p>
* Synchronization using {@link #taskReadyLock} ensures that it will
* try to grab every task that has been put up
*/
private void taskLoop() throws InterruptedException {
while (!exitWorker) {
int seq_start = taskReadySeq;
List<String> paths = getTaskList();
if (paths == null) {
LOG.warn("Could not get tasks, did someone remove " +
this.watcher.splitLogZNode + " ... worker thread exiting.");
return;
}
// pick meta wal firstly
int offset = (int) (Math.random() * paths.size());
for(int i = 0; i < paths.size(); i ++){
if(HLogUtil.isMetaFile(paths.get(i))) {
offset = i;
break;
}
}
int numTasks = paths.size();
for (int i = 0; i < numTasks; i++) {
int idx = (i + offset) % paths.size();
// don't call ZKSplitLog.getNodeName() because that will lead to
// double encoding of the path name
if (this.calculateAvailableSplitters(numTasks) > 0) {
grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
} else {
LOG.debug("Current region server " + this.serverName + " has "
+ this.tasksInProgress.get() + " tasks in progress and can't take more.");
break;
}
if (exitWorker) {
return;
}
}
SplitLogCounters.tot_wkr_task_grabing.incrementAndGet();
synchronized (taskReadyLock) {
while (seq_start == taskReadySeq) {
taskReadyLock.wait(checkInterval);
if (this.server != null) {
// check to see if we have stale recovering regions in our internal memory state
Map<String, HRegion> recoveringRegions = this.server.getRecoveringRegions();
if (!recoveringRegions.isEmpty()) {
// Make a local copy to prevent ConcurrentModificationException when other threads
// modify recoveringRegions
List<String> tmpCopy = new ArrayList<String>(recoveringRegions.keySet());
for (String region : tmpCopy) {
String nodePath = ZKUtil.joinZNode(this.watcher.recoveringRegionsZNode, region);
try {
if (ZKUtil.checkExists(this.watcher, nodePath) == -1) {
HRegion r = recoveringRegions.remove(region);
if (r != null) {
r.setRecovering(false);
}
LOG.debug("Mark recovering region:" + region + " up.");
} else {
// current check is a defensive(or redundant) mechanism to prevent us from
// having stale recovering regions in our internal RS memory state while
// zookeeper(source of truth) says differently. We stop at the first good one
// because we should not have a single instance such as this in normal case so
// check the first one is good enough.
break;
}
} catch (KeeperException e) {
// ignore zookeeper error
LOG.debug("Got a zookeeper when trying to open a recovering region", e);
break;
}
}
}
}
}
}
}
}
/**
* try to grab a 'lock' on the task zk node to own and execute the task.
* <p>
* @param path zk node for the task
*/
private void grabTask(String path) {
Stat stat = new Stat();
byte[] data;
synchronized (grabTaskLock) {
currentTask = path;
workerInGrabTask = true;
if (Thread.interrupted()) {
return;
}
}
try {
try {
if ((data = ZKUtil.getDataNoWatch(this.watcher, path, stat)) == null) {
SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
return;
}
} catch (KeeperException e) {
LOG.warn("Failed to get data for znode " + path, e);
SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
return;
}
SplitLogTask slt;
try {
slt = SplitLogTask.parseFrom(data);
} catch (DeserializationException e) {
LOG.warn("Failed parse data for znode " + path, e);
SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
return;
}
if (!slt.isUnassigned()) {
SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
return;
}
currentVersion = attemptToOwnTask(true, watcher, serverName, path, slt.getMode(),
stat.getVersion());
if (currentVersion < 0) {
SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
return;
}
if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
HLogSplitterHandler.endTask(watcher, new SplitLogTask.Done(this.serverName, slt.getMode()),
SplitLogCounters.tot_wkr_task_acquired_rescan, currentTask, currentVersion);
return;
}
LOG.info("worker " + serverName + " acquired task " + path);
SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
getDataSetWatchAsync();
submitTask(path, slt.getMode(), currentVersion, this.report_period);
// after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks
try {
int sleepTime = RandomUtils.nextInt(500) + 500;
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
LOG.warn("Interrupted while yielding for other region servers", e);
Thread.currentThread().interrupt();
}
} finally {
synchronized (grabTaskLock) {
workerInGrabTask = false;
// clear the interrupt from stopTask() otherwise the next task will
// suffer
Thread.interrupted();
}
}
}
/**
* Try to own the task by transitioning the zk node data from UNASSIGNED to OWNED.
* <p>
* This method is also used to periodically heartbeat the task progress by transitioning the node
* from OWNED to OWNED.
* <p>
* @param isFirstTime
* @param zkw
* @param server
* @param task
* @param taskZKVersion
* @return non-negative integer value when task can be owned by current region server otherwise -1
*/
protected static int attemptToOwnTask(boolean isFirstTime, ZooKeeperWatcher zkw,
ServerName server, String task, RecoveryMode mode, int taskZKVersion) {
int latestZKVersion = FAILED_TO_OWN_TASK;
try {
SplitLogTask slt = new SplitLogTask.Owned(server, mode);
Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion);
if (stat == null) {
LOG.warn("zk.setData() returned null for path " + task);
SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
return FAILED_TO_OWN_TASK;
}
latestZKVersion = stat.getVersion();
SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet();
return latestZKVersion;
} catch (KeeperException e) {
if (!isFirstTime) {
if (e.code().equals(KeeperException.Code.NONODE)) {
LOG.warn("NONODE failed to assert ownership for " + task, e);
} else if (e.code().equals(KeeperException.Code.BADVERSION)) {
LOG.warn("BADVERSION failed to assert ownership for " + task, e);
} else {
LOG.warn("failed to assert ownership for " + task, e);
}
}
} catch (InterruptedException e1) {
LOG.warn("Interrupted while trying to assert ownership of " +
task + " " + StringUtils.stringifyException(e1));
Thread.currentThread().interrupt();
}
SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
return FAILED_TO_OWN_TASK;
}
/**
* This function calculates how many splitters it could create based on expected average tasks per
* RS and the hard limit upper bound(maxConcurrentTasks) set by configuration. <br>
* At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound)
* @param numTasks current total number of available tasks
*/
private int calculateAvailableSplitters(int numTasks) {
// at lease one RS(itself) available
int availableRSs = 1;
try {
List<String> regionServers = ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode);
availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size());
} catch (KeeperException e) {
// do nothing
LOG.debug("getAvailableRegionServers got ZooKeeper exception", e);
}
int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1);
expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one
// calculate how many more splitters we could spawn
return Math.min(expectedTasksPerRS, this.maxConcurrentTasks) - this.tasksInProgress.get();
}
/**
* Submit a log split task to executor service
* @param curTask
* @param curTaskZKVersion
*/
void submitTask(final String curTask, final RecoveryMode mode, final int curTaskZKVersion,
final int reportPeriod) {
final MutableInt zkVersion = new MutableInt(curTaskZKVersion);
CancelableProgressable reporter = new CancelableProgressable() {
private long last_report_at = 0;
@Override
public boolean progress() {
long t = EnvironmentEdgeManager.currentTimeMillis();
if ((t - last_report_at) > reportPeriod) {
last_report_at = t;
int latestZKVersion = attemptToOwnTask(false, watcher, serverName, curTask, mode,
zkVersion.intValue());
if (latestZKVersion < 0) {
LOG.warn("Failed to heartbeat the task" + curTask);
return false;
}
zkVersion.setValue(latestZKVersion);
}
return true;
}
};
HLogSplitterHandler hsh = new HLogSplitterHandler(this.server, curTask, zkVersion, reporter,
this.tasksInProgress, this.splitTaskExecutor, mode);
this.executorService.submit(hsh);
}
void getDataSetWatchAsync() {
this.watcher.getRecoverableZooKeeper().getZooKeeper().
getData(currentTask, this.watcher,
new GetDataAsyncCallback(), null);
SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet();
}
void getDataSetWatchSuccess(String path, byte[] data) {
SplitLogTask slt;
try {
slt = SplitLogTask.parseFrom(data);
} catch (DeserializationException e) {
LOG.warn("Failed parse", e);
return;
}
synchronized (grabTaskLock) {
if (workerInGrabTask) {
// currentTask can change but that's ok
String taskpath = currentTask;
if (taskpath != null && taskpath.equals(path)) {
// have to compare data. cannot compare version because then there
// will be race with attemptToOwnTask()
// cannot just check whether the node has been transitioned to
// UNASSIGNED because by the time this worker sets the data watch
// the node might have made two transitions - from owned by this
// worker to unassigned to owned by another worker
if (! slt.isOwned(this.serverName) &&
! slt.isDone(this.serverName) &&
! slt.isErr(this.serverName) &&
! slt.isResigned(this.serverName)) {
LOG.info("task " + taskpath + " preempted from " +
serverName + ", current task state and owner=" + slt.toString());
stopTask();
}
}
}
}
}
void getDataSetWatchFailure(String path) {
synchronized (grabTaskLock) {
if (workerInGrabTask) {
// currentTask can change but that's ok
String taskpath = currentTask;
if (taskpath != null && taskpath.equals(path)) {
LOG.info("retrying data watch on " + path);
SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet();
getDataSetWatchAsync();
} else {
// no point setting a watch on the task which this worker is not
// working upon anymore
}
}
}
}
@Override
public void nodeDataChanged(String path) {
// there will be a self generated dataChanged event every time attemptToOwnTask()
// heartbeats the task znode by upping its version
synchronized (grabTaskLock) {
if (workerInGrabTask) {
// currentTask can change
String taskpath = currentTask;
if (taskpath!= null && taskpath.equals(path)) {
getDataSetWatchAsync();
}
}
}
}
private List<String> getTaskList() throws InterruptedException {
List<String> childrenPaths = null;
long sleepTime = 1000;
// It will be in loop till it gets the list of children or
// it will come out if worker thread exited.
while (!exitWorker) {
try {
childrenPaths = ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
this.watcher.splitLogZNode);
if (childrenPaths != null) {
return childrenPaths;
}
} catch (KeeperException e) {
LOG.warn("Could not get children of znode "
+ this.watcher.splitLogZNode, e);
}
LOG.debug("Retry listChildren of znode " + this.watcher.splitLogZNode
+ " after sleep for " + sleepTime + "ms!");
Thread.sleep(sleepTime);
}
return childrenPaths;
}
@Override
public void nodeChildrenChanged(String path) {
if(path.equals(watcher.splitLogZNode)) {
LOG.debug("tasks arrived or departed");
synchronized (taskReadyLock) {
taskReadySeq++;
taskReadyLock.notify();
}
}
}
/**
* If the worker is doing a task i.e. splitting a log file then stop the task.
* It doesn't exit the worker thread.
*/
void stopTask() {
public void stopTask() {
LOG.info("Sending interrupt to stop the worker thread");
worker.interrupt(); // TODO interrupt often gets swallowed, do what else?
}
/**
* start the SplitLogWorker thread
*/
public void start() {
worker = new Thread(null, this, "SplitLogWorker-" + serverName);
exitWorker = false;
worker = new Thread(null, this, "SplitLogWorker-" + server.getServerName());
worker.start();
}
@ -603,29 +178,10 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
* stop the SplitLogWorker thread
*/
public void stop() {
exitWorker = true;
coordination.stopProcessingTasks();
stopTask();
}
/**
* Asynchronous handler for zk get-data-set-watch on node results.
*/
class GetDataAsyncCallback implements AsyncCallback.DataCallback {
private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
SplitLogCounters.tot_wkr_get_data_result.incrementAndGet();
if (rc != 0) {
LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
getDataSetWatchFailure(path);
return;
}
data = watcher.getRecoverableZooKeeper().removeMetaData(data);
getDataSetWatchSuccess(path, data);
}
}
/**
* Objects implementing this interface actually do the task that has been
* acquired by a {@link SplitLogWorker}. Since there isn't a water-tight
@ -642,4 +198,13 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
}
Status exec(String name, RecoveryMode mode, CancelableProgressable p);
}
/**
* Returns the number of tasks processed by coordination.
* This method is used by tests only
*/
@VisibleForTesting
public int getTaskReadySeq() {
return coordination.getTaskReadySeq();
}
}

View File

@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.regionserver.handler;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -30,17 +28,13 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
/**
* Handles log splitting a wal
@ -49,28 +43,24 @@ import org.apache.zookeeper.KeeperException;
public class HLogSplitterHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(HLogSplitterHandler.class);
private final ServerName serverName;
private final String curTask;
private final String wal;
private final ZooKeeperWatcher zkw;
private final CancelableProgressable reporter;
private final AtomicInteger inProgressTasks;
private final MutableInt curTaskZKVersion;
private final TaskExecutor splitTaskExecutor;
private final RecoveryMode mode;
private final SplitLogWorkerCoordination.SplitTaskDetails splitTaskDetails;
private final SplitLogWorkerCoordination coordination;
public HLogSplitterHandler(final Server server, String curTask,
final MutableInt curTaskZKVersion,
CancelableProgressable reporter,
public HLogSplitterHandler(final Server server, SplitLogWorkerCoordination coordination,
SplitLogWorkerCoordination.SplitTaskDetails splitDetails, CancelableProgressable reporter,
AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor, RecoveryMode mode) {
super(server, EventType.RS_LOG_REPLAY);
this.curTask = curTask;
this.wal = ZKSplitLog.getFileName(curTask);
this.splitTaskDetails = splitDetails;
this.coordination = coordination;
this.reporter = reporter;
this.inProgressTasks = inProgressTasks;
this.inProgressTasks.incrementAndGet();
this.serverName = server.getServerName();
this.zkw = server.getZooKeeper();
this.curTaskZKVersion = curTaskZKVersion;
this.splitTaskExecutor = splitTaskExecutor;
this.mode = mode;
}
@ -79,20 +69,20 @@ public class HLogSplitterHandler extends EventHandler {
public void process() throws IOException {
long startTime = System.currentTimeMillis();
try {
Status status = this.splitTaskExecutor.exec(wal, mode, reporter);
Status status = this.splitTaskExecutor.exec(splitTaskDetails.getWALFile(), mode, reporter);
switch (status) {
case DONE:
endTask(zkw, new SplitLogTask.Done(this.serverName, this.mode),
SplitLogCounters.tot_wkr_task_done, curTask, curTaskZKVersion.intValue());
coordination.endTask(new SplitLogTask.Done(this.serverName,this.mode),
SplitLogCounters.tot_wkr_task_done, splitTaskDetails);
break;
case PREEMPTED:
SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
LOG.warn("task execution prempted " + wal);
LOG.warn("task execution prempted " + splitTaskDetails.getWALFile());
break;
case ERR:
if (server != null && !server.isStopped()) {
endTask(zkw, new SplitLogTask.Err(this.serverName, this.mode),
SplitLogCounters.tot_wkr_task_err, curTask, curTaskZKVersion.intValue());
coordination.endTask(new SplitLogTask.Err(this.serverName, this.mode),
SplitLogCounters.tot_wkr_task_err, splitTaskDetails);
break;
}
// if the RS is exiting then there is probably a tons of stuff
@ -100,45 +90,17 @@ public class HLogSplitterHandler extends EventHandler {
//$FALL-THROUGH$
case RESIGNED:
if (server != null && server.isStopped()) {
LOG.info("task execution interrupted because worker is exiting " + curTask);
LOG.info("task execution interrupted because worker is exiting "
+ splitTaskDetails.toString());
}
endTask(zkw, new SplitLogTask.Resigned(this.serverName, this.mode),
SplitLogCounters.tot_wkr_task_resigned, curTask, curTaskZKVersion.intValue());
coordination.endTask(new SplitLogTask.Resigned(this.serverName, this.mode),
SplitLogCounters.tot_wkr_task_resigned, splitTaskDetails);
break;
}
} finally {
LOG.info("worker " + serverName + " done with task " + curTask + " in "
LOG.info("worker " + serverName + " done with task " + splitTaskDetails.toString() + " in "
+ (System.currentTimeMillis() - startTime) + "ms");
this.inProgressTasks.decrementAndGet();
}
}
/**
* endTask() can fail and the only way to recover out of it is for the
* {@link SplitLogManager} to timeout the task node.
* @param slt
* @param ctr
*/
public static void endTask(ZooKeeperWatcher zkw, SplitLogTask slt, AtomicLong ctr, String task,
int taskZKVersion) {
try {
if (ZKUtil.setData(zkw, task, slt.toByteArray(), taskZKVersion)) {
LOG.info("successfully transitioned task " + task + " to final state " + slt);
ctr.incrementAndGet();
return;
}
LOG.warn("failed to transistion task " + task + " to end state " + slt
+ " because of version mismatch ");
} catch (KeeperException.BadVersionException bve) {
LOG.warn("transisition task " + task + " to " + slt
+ " failed because of version mismatch", bve);
} catch (KeeperException.NoNodeException e) {
LOG.fatal(
"logic error - end task " + task + " " + slt
+ " failed because task doesn't exist", e);
} catch (KeeperException e) {
LOG.warn("failed to end task, " + task + " " + slt, e);
}
SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet();
}
}

View File

@ -77,9 +77,10 @@ import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -109,7 +110,6 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.ipc.RemoteException;
@ -139,8 +139,7 @@ public class HLogSplitter {
private Set<TableName> disablingOrDisabledTables =
new HashSet<TableName>();
private ZooKeeperWatcher watcher;
private CoordinatedStateManager csm;
private BaseCoordinatedStateManager csm;
private MonitoredTask status;
@ -166,7 +165,7 @@ public class HLogSplitter {
private final int minBatchSize;
HLogSplitter(Configuration conf, Path rootDir,
FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw,
FileSystem fs, LastSequenceId idChecker,
CoordinatedStateManager csm, RecoveryMode mode) {
this.conf = HBaseConfiguration.create(conf);
String codecClassName = conf
@ -175,8 +174,7 @@ public class HLogSplitter {
this.rootDir = rootDir;
this.fs = fs;
this.sequenceIdChecker = idChecker;
this.watcher = zkw;
this.csm = csm;
this.csm = (BaseCoordinatedStateManager)csm;
this.controller = new PipelineController();
entryBuffers = new EntryBuffers(controller,
@ -189,7 +187,7 @@ public class HLogSplitter {
this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
if (zkw != null && csm != null && this.distributedLogReplay) {
if (csm != null && this.distributedLogReplay) {
outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads);
} else {
if (this.distributedLogReplay) {
@ -213,15 +211,14 @@ public class HLogSplitter {
* @param conf
* @param reporter
* @param idChecker
* @param zkw ZooKeeperWatcher if it's null, we will back to the old-style log splitting where we
* dump out recoved.edits files for regions to replay on.
* @param cp coordination state manager
* @return false if it is interrupted by the progress-able.
* @throws IOException
*/
public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
ZooKeeperWatcher zkw, CoordinatedStateManager cp, RecoveryMode mode) throws IOException {
HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw, cp, mode);
CoordinatedStateManager cp, RecoveryMode mode) throws IOException {
HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, cp, mode);
return s.splitLogFile(logfile, reporter);
}
@ -235,8 +232,8 @@ public class HLogSplitter {
List<Path> splits = new ArrayList<Path>();
if (logfiles != null && logfiles.length > 0) {
for (FileStatus logfile: logfiles) {
HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null,
RecoveryMode.LOG_SPLITTING);
HLogSplitter s =
new HLogSplitter(conf, rootDir, fs, null, null, RecoveryMode.LOG_SPLITTING);
if (s.splitLogFile(logfile, null)) {
finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
if (s.outputSink.splits != null) {
@ -289,7 +286,7 @@ public class HLogSplitter {
LOG.warn("Nothing to split in log file " + logPath);
return true;
}
if(watcher != null && csm != null) {
if(csm != null) {
try {
TableStateManager tsm = csm.getTableStateManager();
disablingOrDisabledTables = tsm.getTablesInStates(
@ -314,7 +311,8 @@ public class HLogSplitter {
if (lastFlushedSequenceId == null) {
if (this.distributedLogReplay) {
RegionStoreSequenceIds ids =
SplitLogManager.getRegionFlushedSequenceId(this.watcher, failedServerName, key);
csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
key);
if (ids != null) {
lastFlushedSequenceId = ids.getLastFlushedSequenceId();
}
@ -352,7 +350,8 @@ public class HLogSplitter {
throw iie;
} catch (CorruptedLogFileException e) {
LOG.warn("Could not parse, corrupted log file " + logPath, e);
ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
csm.getSplitLogWorkerCoordination().markCorrupted(rootDir,
logfile.getPath().getName(), fs);
isCorrupted = true;
} catch (IOException e) {
e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
@ -1417,8 +1416,9 @@ public class HLogSplitter {
public LogReplayOutputSink(PipelineController controller, EntryBuffers entryBuffers,
int numWriters) {
super(controller, entryBuffers, numWriters);
this.waitRegionOnlineTimeOut = conf.getInt("hbase.splitlog.manager.timeout",
SplitLogManager.DEFAULT_TIMEOUT);
this.waitRegionOnlineTimeOut =
conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT);
this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(controller,
entryBuffers, numWriters);
this.logRecoveredEditsOutputSink.setReporter(reporter);
@ -1640,8 +1640,8 @@ public class HLogSplitter {
// retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
// update the value for the region
RegionStoreSequenceIds ids =
SplitLogManager.getRegionFlushedSequenceId(watcher, failedServerName, loc
.getRegionInfo().getEncodedName());
csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
loc.getRegionInfo().getEncodedName());
if (ids != null) {
lastFlushedSequenceId = ids.getLastFlushedSequenceId();
Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);

View File

@ -18,9 +18,11 @@
package org.apache.hadoop.hbase.zookeeper;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -28,8 +30,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
import org.apache.zookeeper.KeeperException;
/**
* Common methods and attributes used by {@link SplitLogManager} and {@link SplitLogWorker}
@ -120,4 +125,100 @@ public class ZKSplitLog {
return isCorrupt;
}
/*
* Following methods come from SplitLogManager
*/
/**
* check if /hbase/recovering-regions/<current region encoded name> exists. Returns true if exists
* and set watcher as well.
* @param zkw
* @param regionEncodedName region encode name
* @return true when /hbase/recovering-regions/<current region encoded name> exists
* @throws KeeperException
*/
public static boolean
isRegionMarkedRecoveringInZK(ZooKeeperWatcher zkw, String regionEncodedName)
throws KeeperException {
boolean result = false;
String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, regionEncodedName);
byte[] node = ZKUtil.getDataAndWatch(zkw, nodePath);
if (node != null) {
result = true;
}
return result;
}
/**
* @param bytes - Content of a failed region server or recovering region znode.
* @return long - The last flushed sequence Id for the region server
*/
public static long parseLastFlushedSequenceIdFrom(final byte[] bytes) {
long lastRecordedFlushedSequenceId = -1l;
try {
lastRecordedFlushedSequenceId = ZKUtil.parseHLogPositionFrom(bytes);
} catch (DeserializationException e) {
lastRecordedFlushedSequenceId = -1l;
LOG.warn("Can't parse last flushed sequence Id", e);
}
return lastRecordedFlushedSequenceId;
}
public static void deleteRecoveringRegionZNodes(ZooKeeperWatcher watcher, List<String> regions) {
try {
if (regions == null) {
// remove all children under /home/recovering-regions
LOG.debug("Garbage collecting all recovering region znodes");
ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode);
} else {
for (String curRegion : regions) {
String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, curRegion);
ZKUtil.deleteNodeRecursively(watcher, nodePath);
}
}
} catch (KeeperException e) {
LOG.warn("Cannot remove recovering regions from ZooKeeper", e);
}
}
/**
* This function is used in distributedLogReplay to fetch last flushed sequence id from ZK
* @param zkw
* @param serverName
* @param encodedRegionName
* @return the last flushed sequence ids recorded in ZK of the region for <code>serverName<code>
* @throws IOException
*/
public static RegionStoreSequenceIds getRegionFlushedSequenceId(ZooKeeperWatcher zkw,
String serverName, String encodedRegionName) throws IOException {
// when SplitLogWorker recovers a region by directly replaying unflushed WAL edits,
// last flushed sequence Id changes when newly assigned RS flushes writes to the region.
// If the newly assigned RS fails again(a chained RS failures scenario), the last flushed
// sequence Id name space (sequence Id only valid for a particular RS instance), changes
// when different newly assigned RS flushes the region.
// Therefore, in this mode we need to fetch last sequence Ids from ZK where we keep history of
// last flushed sequence Id for each failed RS instance.
RegionStoreSequenceIds result = null;
String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
nodePath = ZKUtil.joinZNode(nodePath, serverName);
try {
byte[] data;
try {
data = ZKUtil.getData(zkw, nodePath);
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
if (data != null) {
result = ZKUtil.parseRegionStoreSequenceIds(data);
}
} catch (KeeperException e) {
throw new IOException("Cannot get lastFlushedSequenceId from ZooKeeper for server="
+ serverName + "; region=" + encodedRegionName, e);
} catch (DeserializationException e) {
LOG.warn("Can't parse last flushed sequence Id from znode:" + nodePath, e);
}
return result;
}
}

View File

@ -77,6 +77,9 @@ import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
import org.apache.hadoop.hbase.exceptions.OperationConflictException;
import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
@ -651,8 +654,8 @@ public class TestDistributedLogSplitting {
break;
}
slm.markRegionsRecoveringInZK(firstFailedServer, regionSet);
slm.markRegionsRecoveringInZK(secondFailedServer, regionSet);
slm.markRegionsRecovering(firstFailedServer, regionSet);
slm.markRegionsRecovering(secondFailedServer, regionSet);
List<String> recoveringRegions = ZKUtil.listChildrenNoWatch(zkw,
ZKUtil.joinZNode(zkw.recoveringRegionsZNode, region.getEncodedName()));
@ -880,7 +883,7 @@ public class TestDistributedLogSplitting {
break;
}
slm.markRegionsRecoveringInZK(hrs.getServerName(), regionSet);
slm.markRegionsRecovering(hrs.getServerName(), regionSet);
// move region in order for the region opened in recovering state
final HRegionInfo hri = region;
final HRegionServer tmpRS = dstRS;
@ -1064,7 +1067,10 @@ public class TestDistributedLogSplitting {
out.write(0);
out.write(Bytes.toBytes("corrupted bytes"));
out.close();
slm.ignoreZKDeleteForTesting = true;
ZKSplitLogManagerCoordination coordination =
(ZKSplitLogManagerCoordination) ((BaseCoordinatedStateManager) master
.getCoordinatedStateManager()).getSplitLogManagerCoordination();
coordination.setIgnoreDeleteForTesting(true);
executor = Executors.newSingleThreadExecutor();
Runnable runnable = new Runnable() {
@Override

View File

@ -19,11 +19,8 @@
package org.apache.hadoop.hbase.master;
import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_get_data_nonode;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_log_split_batch_success;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_result;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted;
@ -48,22 +45,26 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
import org.apache.hadoop.hbase.master.SplitLogManager.Task;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -91,6 +92,7 @@ public class TestSplitLogManager {
}
private ZooKeeperWatcher zkw;
private DummyServer ds;
private static boolean stopped = false;
private SplitLogManager slm;
private Configuration conf;
@ -99,6 +101,68 @@ public class TestSplitLogManager {
private static HBaseTestingUtility TEST_UTIL;
class DummyServer implements Server {
private ZooKeeperWatcher zkw;
private Configuration conf;
private CoordinatedStateManager cm;
public DummyServer(ZooKeeperWatcher zkw, Configuration conf) {
this.zkw = zkw;
this.conf = conf;
cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
cm.initialize(this);
}
@Override
public void abort(String why, Throwable e) {
}
@Override
public boolean isAborted() {
return false;
}
@Override
public void stop(String why) {
}
@Override
public boolean isStopped() {
return false;
}
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public ZooKeeperWatcher getZooKeeper() {
return zkw;
}
@Override
public ServerName getServerName() {
return null;
}
@Override
public CoordinatedStateManager getCoordinatedStateManager() {
return cm;
}
@Override
public HConnection getShortCircuitConnection() {
return null;
}
@Override
public MetaTableLocator getMetaTableLocator() {
return null;
}
}
static Stoppable stopper = new Stoppable() {
@Override
public void stop(String why) {
@ -109,7 +173,6 @@ public class TestSplitLogManager {
public boolean isStopped() {
return stopped;
}
};
@Before
@ -118,7 +181,10 @@ public class TestSplitLogManager {
TEST_UTIL.startMiniZKCluster();
conf = TEST_UTIL.getConfiguration();
// Use a different ZK wrapper instance for each tests.
zkw = new ZooKeeperWatcher(conf, "split-log-manager-tests" + UUID.randomUUID().toString(), null);
zkw =
new ZooKeeperWatcher(conf, "split-log-manager-tests" + UUID.randomUUID().toString(), null);
ds = new DummyServer(zkw, conf);
ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
ZKUtil.createAndFailSilent(zkw, zkw.baseZNode);
assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1);
@ -136,13 +202,15 @@ public class TestSplitLogManager {
Mockito.when(master.getServerManager()).thenReturn(sm);
to = 6000;
conf.setInt("hbase.splitlog.manager.timeout", to);
conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to);
conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
to = to + 4 * 100;
this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
this.mode =
(conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode.LOG_REPLAY
: RecoveryMode.LOG_SPLITTING);
}
@After
@ -180,8 +248,8 @@ public class TestSplitLogManager {
assertEquals(newval, e.eval());
}
private String submitTaskAndWait(TaskBatch batch, String name)
throws KeeperException, InterruptedException {
private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException,
InterruptedException {
String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name);
NodeCreationListener listener = new NodeCreationListener(zkw, tasknode);
zkw.registerListener(listener);
@ -206,7 +274,7 @@ public class TestSplitLogManager {
public void testTaskCreation() throws Exception {
LOG.info("TestTaskCreation - test the creation of a task in zk");
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1");
@ -226,7 +294,7 @@ public class TestSplitLogManager {
zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
Task task = slm.findOrCreateOrphanTask(tasknode);
assertTrue(task.isOrphan());
@ -252,7 +320,7 @@ public class TestSplitLogManager {
CreateMode.PERSISTENT);
int version = ZKUtil.checkExists(zkw, tasknode);
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
Task task = slm.findOrCreateOrphanTask(tasknode);
assertTrue(task.isOrphan());
@ -273,9 +341,8 @@ public class TestSplitLogManager {
@Test
public void testMultipleResubmits() throws Exception {
LOG.info("TestMultipleResbmits - no indefinite resubmissions");
conf.setInt("hbase.splitlog.max.resubmit", 2);
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1");
@ -307,7 +374,7 @@ public class TestSplitLogManager {
public void testRescanCleanup() throws Exception {
LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1");
@ -336,7 +403,7 @@ public class TestSplitLogManager {
public void testTaskDone() throws Exception {
LOG.info("TestTaskDone - cleanup task node once in DONE state");
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1");
final ServerName worker1 = ServerName.valueOf("worker1,1,1");
@ -356,7 +423,7 @@ public class TestSplitLogManager {
LOG.info("TestTaskErr - cleanup task node once in ERR state");
conf.setInt("hbase.splitlog.max.resubmit", 0);
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1");
@ -371,14 +438,14 @@ public class TestSplitLogManager {
}
waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
conf.setInt("hbase.splitlog.max.resubmit", SplitLogManager.DEFAULT_MAX_RESUBMIT);
conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT);
}
@Test
public void testTaskResigned() throws Exception {
LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
assertEquals(tot_mgr_resubmit.get(), 0);
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
assertEquals(tot_mgr_resubmit.get(), 0);
TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1");
@ -412,7 +479,7 @@ public class TestSplitLogManager {
zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
// submit another task which will stay in unassigned mode
@ -441,7 +508,7 @@ public class TestSplitLogManager {
LOG.info("testDeadWorker");
conf.setLong("hbase.splitlog.max.resubmit", 0);
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1");
@ -466,7 +533,7 @@ public class TestSplitLogManager {
@Test
public void testWorkerCrash() throws Exception {
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1");
@ -491,7 +558,7 @@ public class TestSplitLogManager {
@Test
public void testEmptyLogDir() throws Exception {
LOG.info("testEmptyLogDir");
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
FileSystem fs = TEST_UTIL.getTestFileSystem();
Path emptyLogDirPath = new Path(fs.getWorkingDirectory(),
UUID.randomUUID().toString());
@ -514,8 +581,8 @@ public class TestSplitLogManager {
HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
ZKUtil.createSetData(zkw, nodePath, ZKUtil.positionToByteArray(0L));
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
slm.removeStaleRecoveringRegionsFromZK(null);
slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
slm.removeStaleRecoveringRegions(null);
List<String> recoveringRegions =
zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
@ -535,12 +602,12 @@ public class TestSplitLogManager {
ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
slm = new SplitLogManager(zkw, testConf, stopper, master, DUMMY_MASTER);
assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_SPLITTING);
slm = new SplitLogManager(ds, testConf, stopper, master, DUMMY_MASTER);
assertTrue(slm.isLogSplitting());
zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1);
slm.setRecoveryMode(false);
assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_REPLAY);
assertTrue(slm.isLogReplaying());
}
}

View File

@ -19,8 +19,9 @@
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertThat;
import static org.hamcrest.CoreMatchers.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -30,19 +31,23 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -65,11 +70,74 @@ public class TestSplitLogWorker {
}
private final static HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
private DummyServer ds;
private ZooKeeperWatcher zkw;
private SplitLogWorker slw;
private ExecutorService executorService;
private RecoveryMode mode;
class DummyServer implements Server {
private ZooKeeperWatcher zkw;
private Configuration conf;
private CoordinatedStateManager cm;
public DummyServer(ZooKeeperWatcher zkw, Configuration conf) {
this.zkw = zkw;
this.conf = conf;
cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
cm.initialize(this);
}
@Override
public void abort(String why, Throwable e) {
}
@Override
public boolean isAborted() {
return false;
}
@Override
public void stop(String why) {
}
@Override
public boolean isStopped() {
return false;
}
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public ZooKeeperWatcher getZooKeeper() {
return zkw;
}
@Override
public ServerName getServerName() {
return null;
}
@Override
public CoordinatedStateManager getCoordinatedStateManager() {
return cm;
}
@Override
public HConnection getShortCircuitConnection() {
return null;
}
@Override
public MetaTableLocator getMetaTableLocator() {
return null;
}
}
private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems)
throws Exception {
assertTrue("ctr=" + ctr.get() + ", oldval=" + oldval + ", newval=" + newval,
@ -106,15 +174,18 @@ public class TestSplitLogWorker {
Configuration conf = TEST_UTIL.getConfiguration();
zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
"split-log-worker-tests", null);
ds = new DummyServer(zkw, conf);
ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
ZKUtil.createAndFailSilent(zkw, zkw.baseZNode);
assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1);
assertThat(ZKUtil.checkExists(zkw, zkw.baseZNode), not (is(-1)));
LOG.debug(zkw.baseZNode + " created");
ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode);
assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1);
assertThat(ZKUtil.checkExists(zkw, zkw.splitLogZNode), not (is(-1)));
LOG.debug(zkw.splitLogZNode + " created");
ZKUtil.createAndFailSilent(zkw, zkw.rsZNode);
assertTrue(ZKUtil.checkExists(zkw, zkw.rsZNode) != -1);
assertThat(ZKUtil.checkExists(zkw, zkw.rsZNode), not (is(-1)));
SplitLogCounters.resetCounters();
executorService = new ExecutorService("TestSplitLogWorker");
executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10);
@ -162,7 +233,7 @@ public class TestSplitLogWorker {
CreateMode.PERSISTENT);
SplitLogWorker slw =
new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
slw.start();
try {
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
@ -198,9 +269,9 @@ public class TestSplitLogWorker {
RegionServerServices mockedRS1 = getRegionServer(SVR1);
RegionServerServices mockedRS2 = getRegionServer(SVR2);
SplitLogWorker slw1 =
new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask);
new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask);
SplitLogWorker slw2 =
new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask);
new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask);
slw1.start();
slw2.start();
try {
@ -227,7 +298,7 @@ public class TestSplitLogWorker {
final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task");
RegionServerServices mockedRS = getRegionServer(SRV);
SplitLogWorker slw =
new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
slw.start();
try {
Thread.yield(); // let the worker start
@ -240,7 +311,7 @@ public class TestSplitLogWorker {
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
assertEquals(1, slw.taskReadySeq);
assertEquals(1, slw.getTaskReadySeq());
byte [] bytes = ZKUtil.getData(zkw, PATH);
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
assertTrue(slt.isOwned(SRV));
@ -260,7 +331,7 @@ public class TestSplitLogWorker {
final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task");
RegionServerServices mockedRS = getRegionServer(SRV);
SplitLogWorker slw =
new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
slw.start();
try {
Thread.yield(); // let the worker start
@ -287,7 +358,7 @@ public class TestSplitLogWorker {
waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
assertEquals(2, slw.taskReadySeq);
assertEquals(2, slw.getTaskReadySeq());
byte [] bytes = ZKUtil.getData(zkw, PATH2);
slt = SplitLogTask.parseFrom(bytes);
assertTrue(slt.isOwned(SRV));
@ -302,7 +373,7 @@ public class TestSplitLogWorker {
SplitLogCounters.resetCounters();
final ServerName SRV = ServerName.valueOf("svr,1,1");
RegionServerServices mockedRS = getRegionServer(SRV);
slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
slw.start();
Thread.yield(); // let the worker start
Thread.sleep(100);
@ -358,14 +429,13 @@ public class TestSplitLogWorker {
Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
RegionServerServices mockedRS = getRegionServer(RS);
for (int i = 0; i < maxTasks; i++) {
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);
SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask);
slw.start();
try {
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME);
@ -408,7 +478,7 @@ public class TestSplitLogWorker {
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);
SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask);
slw.start();
try {
int acquiredTasks = 0;

View File

@ -138,7 +138,7 @@ public class TestHLogReaderOnSecureHLog {
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
Path rootdir = FSUtils.getRootDir(conf);
try {
HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, null, mode);
HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, mode);
s.splitLogFile(listStatus[0], null);
Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
"corrupt");
@ -181,7 +181,7 @@ public class TestHLogReaderOnSecureHLog {
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
Path rootdir = FSUtils.getRootDir(conf);
try {
HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, null, mode);
HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, mode);
s.splitLogFile(listStatus[0], null);
Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
"corrupt");

View File

@ -809,7 +809,7 @@ public class TestHLogSplit {
logfiles != null && logfiles.length > 0);
// Set up a splitter that will throw an IOE on the output side
HLogSplitter logSplitter = new HLogSplitter(
conf, HBASEDIR, fs, null, null, null, this.mode) {
conf, HBASEDIR, fs, null, null, this.mode) {
protected HLog.Writer createWriter(FileSystem fs,
Path logfile, Configuration conf) throws IOException {
HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
@ -942,7 +942,7 @@ public class TestHLogSplit {
try {
conf.setInt("hbase.splitlog.report.period", 1000);
boolean ret = HLogSplitter.splitLogFile(
HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, null, this.mode);
HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, this.mode);
assertFalse("Log splitting should failed", ret);
assertTrue(count.get() > 0);
} catch (IOException e) {
@ -1001,7 +1001,7 @@ public class TestHLogSplit {
// Create a splitter that reads and writes the data without touching disk
HLogSplitter logSplitter = new HLogSplitter(
localConf, HBASEDIR, fs, null, null, null, this.mode) {
localConf, HBASEDIR, fs, null, null, this.mode) {
/* Produce a mock writer that doesn't write anywhere */
protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
@ -1222,7 +1222,7 @@ public class TestHLogSplit {
logfiles != null && logfiles.length > 0);
HLogSplitter logSplitter = new HLogSplitter(
conf, HBASEDIR, fs, null, null, null, this.mode) {
conf, HBASEDIR, fs, null, null, this.mode) {
protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
throws IOException {
HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, logfile, conf);

View File

@ -885,7 +885,7 @@ public class TestWALReplay {
wal.close();
FileStatus[] listStatus = this.fs.listStatus(wal.getDir());
HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0],
this.fs, this.conf, null, null, null, null, mode);
this.fs, this.conf, null, null, null, mode);
FileStatus[] listStatus1 = this.fs.listStatus(
new Path(FSUtils.getTableDir(hbaseRootDir, tableName),
new Path(hri.getEncodedName(), "recovered.edits")));