HBASE-6738 Too aggressive task resubmission from the distributed log manager
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1393537 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e64430e970
commit
90e0910a48
|
@ -110,7 +110,7 @@ public class MasterFileSystem {
|
||||||
conf.getBoolean(HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, true);
|
conf.getBoolean(HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, true);
|
||||||
if (this.distributedLogSplitting) {
|
if (this.distributedLogSplitting) {
|
||||||
this.splitLogManager = new SplitLogManager(master.getZooKeeper(),
|
this.splitLogManager = new SplitLogManager(master.getZooKeeper(),
|
||||||
master.getConfiguration(), master, master.getServerName());
|
master.getConfiguration(), master, services, master.getServerName());
|
||||||
this.splitLogManager.finishInitialization(masterRecovery);
|
this.splitLogManager.finishInitialization(masterRecovery);
|
||||||
} else {
|
} else {
|
||||||
this.splitLogManager = null;
|
this.splitLogManager = null;
|
||||||
|
|
|
@ -98,12 +98,13 @@ import org.apache.zookeeper.data.Stat;
|
||||||
public class SplitLogManager extends ZooKeeperListener {
|
public class SplitLogManager extends ZooKeeperListener {
|
||||||
private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
|
private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
|
||||||
|
|
||||||
public static final int DEFAULT_TIMEOUT = 25000; // 25 sec
|
public static final int DEFAULT_TIMEOUT = 120000;
|
||||||
public static final int DEFAULT_ZK_RETRIES = 3;
|
public static final int DEFAULT_ZK_RETRIES = 3;
|
||||||
public static final int DEFAULT_MAX_RESUBMIT = 3;
|
public static final int DEFAULT_MAX_RESUBMIT = 3;
|
||||||
public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min
|
public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min
|
||||||
|
|
||||||
private final Stoppable stopper;
|
private final Stoppable stopper;
|
||||||
|
private final MasterServices master;
|
||||||
private final ServerName serverName;
|
private final ServerName serverName;
|
||||||
private final TaskFinisher taskFinisher;
|
private final TaskFinisher taskFinisher;
|
||||||
private FileSystem fs;
|
private FileSystem fs;
|
||||||
|
@ -116,11 +117,11 @@ public class SplitLogManager extends ZooKeeperListener {
|
||||||
private long lastNodeCreateTime = Long.MAX_VALUE;
|
private long lastNodeCreateTime = Long.MAX_VALUE;
|
||||||
public boolean ignoreZKDeleteForTesting = false;
|
public boolean ignoreZKDeleteForTesting = false;
|
||||||
|
|
||||||
private ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
|
private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
|
||||||
private TimeoutMonitor timeoutMonitor;
|
private TimeoutMonitor timeoutMonitor;
|
||||||
|
|
||||||
private volatile Set<ServerName> deadWorkers = null;
|
private volatile Set<ServerName> deadWorkers = null;
|
||||||
private Object deadWorkersLock = new Object();
|
private final Object deadWorkersLock = new Object();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wrapper around {@link #SplitLogManager(ZooKeeperWatcher, Configuration,
|
* Wrapper around {@link #SplitLogManager(ZooKeeperWatcher, Configuration,
|
||||||
|
@ -135,8 +136,8 @@ public class SplitLogManager extends ZooKeeperListener {
|
||||||
* @param serverName
|
* @param serverName
|
||||||
*/
|
*/
|
||||||
public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
|
public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
|
||||||
Stoppable stopper, ServerName serverName) {
|
Stoppable stopper, MasterServices master, ServerName serverName) {
|
||||||
this(zkw, conf, stopper, serverName, new TaskFinisher() {
|
this(zkw, conf, stopper, master, serverName, new TaskFinisher() {
|
||||||
@Override
|
@Override
|
||||||
public Status finish(ServerName workerName, String logfile) {
|
public Status finish(ServerName workerName, String logfile) {
|
||||||
try {
|
try {
|
||||||
|
@ -162,18 +163,19 @@ public class SplitLogManager extends ZooKeeperListener {
|
||||||
* @param tf task finisher
|
* @param tf task finisher
|
||||||
*/
|
*/
|
||||||
public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
|
public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
|
||||||
Stoppable stopper, ServerName serverName, TaskFinisher tf) {
|
Stoppable stopper, MasterServices master, ServerName serverName, TaskFinisher tf) {
|
||||||
super(zkw);
|
super(zkw);
|
||||||
this.taskFinisher = tf;
|
this.taskFinisher = tf;
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.stopper = stopper;
|
this.stopper = stopper;
|
||||||
|
this.master = master;
|
||||||
this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES);
|
this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES);
|
||||||
this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT);
|
this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT);
|
||||||
this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT);
|
this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT);
|
||||||
this.unassignedTimeout =
|
this.unassignedTimeout =
|
||||||
conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
|
conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
|
||||||
LOG.debug("timeout = " + timeout);
|
LOG.info("timeout = " + timeout);
|
||||||
LOG.debug("unassigned timeout = " + unassignedTimeout);
|
LOG.info("unassigned timeout = " + unassignedTimeout);
|
||||||
|
|
||||||
this.serverName = serverName;
|
this.serverName = serverName;
|
||||||
this.timeoutMonitor =
|
this.timeoutMonitor =
|
||||||
|
@ -551,8 +553,18 @@ public class SplitLogManager extends ZooKeeperListener {
|
||||||
}
|
}
|
||||||
int version;
|
int version;
|
||||||
if (directive != FORCE) {
|
if (directive != FORCE) {
|
||||||
if ((EnvironmentEdgeManager.currentTimeMillis() - task.last_update) <
|
// We're going to resubmit:
|
||||||
timeout) {
|
// 1) immediately if the worker server is now marked as dead
|
||||||
|
// 2) after a configurable timeout if the server is not marked as dead but has still not
|
||||||
|
// finished the task. This allows to continue if the worker cannot actually handle it,
|
||||||
|
// for any reason.
|
||||||
|
final long time = EnvironmentEdgeManager.currentTimeMillis() - task.last_update;
|
||||||
|
final boolean alive = master.getServerManager() != null ?
|
||||||
|
master.getServerManager().isServerOnline(task.cur_worker_name) : true;
|
||||||
|
if (alive && time < timeout) {
|
||||||
|
LOG.trace("Skipping the resubmit of " + task.toString() + " because the server " +
|
||||||
|
task.cur_worker_name + " is not marked as dead, we waited for " + time +
|
||||||
|
" while the timeout is " + timeout);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (task.unforcedResubmits >= resubmit_threshold) {
|
if (task.unforcedResubmits >= resubmit_threshold) {
|
||||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.MediumTests;
|
import org.apache.hadoop.hbase.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.SplitLogTask;
|
import org.apache.hadoop.hbase.SplitLogTask;
|
||||||
import org.apache.hadoop.hbase.Stoppable;
|
import org.apache.hadoop.hbase.Stoppable;
|
||||||
|
@ -64,15 +65,20 @@ import org.apache.zookeeper.KeeperException;
|
||||||
import org.apache.zookeeper.ZooDefs.Ids;
|
import org.apache.zookeeper.ZooDefs.Ids;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
@Category(MediumTests.class)
|
@Category(MediumTests.class)
|
||||||
public class TestSplitLogManager {
|
public class TestSplitLogManager {
|
||||||
private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
|
private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
|
||||||
private final ServerName DUMMY_MASTER = new ServerName("dummy-master,1,1");
|
private final ServerName DUMMY_MASTER = new ServerName("dummy-master,1,1");
|
||||||
|
private final ServerManager sm = Mockito.mock(ServerManager.class);
|
||||||
|
private final MasterServices master = Mockito.mock(MasterServices.class);
|
||||||
|
|
||||||
static {
|
static {
|
||||||
Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
|
Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
|
||||||
}
|
}
|
||||||
|
@ -103,14 +109,6 @@ public class TestSplitLogManager {
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void setUpBeforeClass() throws Exception {
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void tearDownAfterClass() throws Exception {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
TEST_UTIL = new HBaseTestingUtility();
|
TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
@ -128,6 +126,11 @@ public class TestSplitLogManager {
|
||||||
|
|
||||||
stopped = false;
|
stopped = false;
|
||||||
resetCounters();
|
resetCounters();
|
||||||
|
|
||||||
|
// By default, we let the test manage the error as before, so the server
|
||||||
|
// does not appear as dead from the master point of view, only from the split log pov.
|
||||||
|
Mockito.when(sm.isServerOnline(Mockito.any(ServerName.class))).thenReturn(true);
|
||||||
|
Mockito.when(master.getServerManager()).thenReturn(sm);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -194,8 +197,9 @@ public class TestSplitLogManager {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testTaskCreation() throws Exception {
|
public void testTaskCreation() throws Exception {
|
||||||
|
|
||||||
LOG.info("TestTaskCreation - test the creation of a task in zk");
|
LOG.info("TestTaskCreation - test the creation of a task in zk");
|
||||||
slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
|
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
|
||||||
slm.finishInitialization();
|
slm.finishInitialization();
|
||||||
TaskBatch batch = new TaskBatch();
|
TaskBatch batch = new TaskBatch();
|
||||||
|
|
||||||
|
@ -221,7 +225,7 @@ public class TestSplitLogManager {
|
||||||
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
|
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
|
||||||
to = to + 2 * 100;
|
to = to + 2 * 100;
|
||||||
|
|
||||||
slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
|
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
|
||||||
slm.finishInitialization();
|
slm.finishInitialization();
|
||||||
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
|
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
|
||||||
Task task = slm.findOrCreateOrphanTask(tasknode);
|
Task task = slm.findOrCreateOrphanTask(tasknode);
|
||||||
|
@ -248,7 +252,7 @@ public class TestSplitLogManager {
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
int version = ZKUtil.checkExists(zkw, tasknode);
|
int version = ZKUtil.checkExists(zkw, tasknode);
|
||||||
|
|
||||||
slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
|
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
|
||||||
slm.finishInitialization();
|
slm.finishInitialization();
|
||||||
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
|
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
|
||||||
Task task = slm.findOrCreateOrphanTask(tasknode);
|
Task task = slm.findOrCreateOrphanTask(tasknode);
|
||||||
|
@ -277,7 +281,7 @@ public class TestSplitLogManager {
|
||||||
to = to + 2 * 100;
|
to = to + 2 * 100;
|
||||||
|
|
||||||
conf.setInt("hbase.splitlog.max.resubmit", 2);
|
conf.setInt("hbase.splitlog.max.resubmit", 2);
|
||||||
slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
|
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
|
||||||
slm.finishInitialization();
|
slm.finishInitialization();
|
||||||
TaskBatch batch = new TaskBatch();
|
TaskBatch batch = new TaskBatch();
|
||||||
|
|
||||||
|
@ -312,7 +316,7 @@ public class TestSplitLogManager {
|
||||||
|
|
||||||
conf.setInt("hbase.splitlog.manager.timeout", 1000);
|
conf.setInt("hbase.splitlog.manager.timeout", 1000);
|
||||||
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
|
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
|
||||||
slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
|
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
|
||||||
slm.finishInitialization();
|
slm.finishInitialization();
|
||||||
TaskBatch batch = new TaskBatch();
|
TaskBatch batch = new TaskBatch();
|
||||||
|
|
||||||
|
@ -328,7 +332,7 @@ public class TestSplitLogManager {
|
||||||
return (tot_mgr_resubmit.get() + tot_mgr_resubmit_failed.get());
|
return (tot_mgr_resubmit.get() + tot_mgr_resubmit_failed.get());
|
||||||
}
|
}
|
||||||
}, 0, 1, 5*60000); // wait long enough
|
}, 0, 1, 5*60000); // wait long enough
|
||||||
if (tot_mgr_resubmit_failed.get() == 0) {
|
Assert.assertEquals("Could not run test. Lost ZK connection?", 0, tot_mgr_resubmit_failed.get());
|
||||||
int version1 = ZKUtil.checkExists(zkw, tasknode);
|
int version1 = ZKUtil.checkExists(zkw, tasknode);
|
||||||
assertTrue(version1 > version);
|
assertTrue(version1 > version);
|
||||||
byte[] taskstate = ZKUtil.getData(zkw, tasknode);
|
byte[] taskstate = ZKUtil.getData(zkw, tasknode);
|
||||||
|
@ -336,18 +340,13 @@ public class TestSplitLogManager {
|
||||||
assertTrue(slt.isUnassigned(DUMMY_MASTER));
|
assertTrue(slt.isUnassigned(DUMMY_MASTER));
|
||||||
|
|
||||||
waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000);
|
waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000);
|
||||||
} else {
|
|
||||||
LOG.warn("Could not run test. Lost ZK connection?");
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTaskDone() throws Exception {
|
public void testTaskDone() throws Exception {
|
||||||
LOG.info("TestTaskDone - cleanup task node once in DONE state");
|
LOG.info("TestTaskDone - cleanup task node once in DONE state");
|
||||||
|
|
||||||
slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
|
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
|
||||||
slm.finishInitialization();
|
slm.finishInitialization();
|
||||||
TaskBatch batch = new TaskBatch();
|
TaskBatch batch = new TaskBatch();
|
||||||
String tasknode = submitTaskAndWait(batch, "foo/1");
|
String tasknode = submitTaskAndWait(batch, "foo/1");
|
||||||
|
@ -368,7 +367,7 @@ public class TestSplitLogManager {
|
||||||
LOG.info("TestTaskErr - cleanup task node once in ERR state");
|
LOG.info("TestTaskErr - cleanup task node once in ERR state");
|
||||||
|
|
||||||
conf.setInt("hbase.splitlog.max.resubmit", 0);
|
conf.setInt("hbase.splitlog.max.resubmit", 0);
|
||||||
slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
|
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
|
||||||
slm.finishInitialization();
|
slm.finishInitialization();
|
||||||
TaskBatch batch = new TaskBatch();
|
TaskBatch batch = new TaskBatch();
|
||||||
|
|
||||||
|
@ -376,6 +375,7 @@ public class TestSplitLogManager {
|
||||||
final ServerName worker1 = new ServerName("worker1,1,1");
|
final ServerName worker1 = new ServerName("worker1,1,1");
|
||||||
SplitLogTask slt = new SplitLogTask.Err(worker1);
|
SplitLogTask slt = new SplitLogTask.Err(worker1);
|
||||||
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
|
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
|
||||||
|
|
||||||
synchronized (batch) {
|
synchronized (batch) {
|
||||||
while (batch.installed != batch.error) {
|
while (batch.installed != batch.error) {
|
||||||
batch.wait();
|
batch.wait();
|
||||||
|
@ -390,7 +390,7 @@ public class TestSplitLogManager {
|
||||||
public void testTaskResigned() throws Exception {
|
public void testTaskResigned() throws Exception {
|
||||||
LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
|
LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
|
||||||
assertEquals(tot_mgr_resubmit.get(), 0);
|
assertEquals(tot_mgr_resubmit.get(), 0);
|
||||||
slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
|
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
|
||||||
slm.finishInitialization();
|
slm.finishInitialization();
|
||||||
assertEquals(tot_mgr_resubmit.get(), 0);
|
assertEquals(tot_mgr_resubmit.get(), 0);
|
||||||
TaskBatch batch = new TaskBatch();
|
TaskBatch batch = new TaskBatch();
|
||||||
|
@ -431,7 +431,7 @@ public class TestSplitLogManager {
|
||||||
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
|
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
|
||||||
|
|
||||||
|
|
||||||
slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
|
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
|
||||||
slm.finishInitialization();
|
slm.finishInitialization();
|
||||||
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
|
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
|
||||||
|
|
||||||
|
@ -462,7 +462,7 @@ public class TestSplitLogManager {
|
||||||
LOG.info("testDeadWorker");
|
LOG.info("testDeadWorker");
|
||||||
|
|
||||||
conf.setLong("hbase.splitlog.max.resubmit", 0);
|
conf.setLong("hbase.splitlog.max.resubmit", 0);
|
||||||
slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
|
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
|
||||||
slm.finishInitialization();
|
slm.finishInitialization();
|
||||||
TaskBatch batch = new TaskBatch();
|
TaskBatch batch = new TaskBatch();
|
||||||
|
|
||||||
|
@ -486,10 +486,35 @@ public class TestSplitLogManager {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWorkerCrash() throws Exception {
|
||||||
|
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
|
||||||
|
slm.finishInitialization();
|
||||||
|
TaskBatch batch = new TaskBatch();
|
||||||
|
|
||||||
|
String tasknode = submitTaskAndWait(batch, "foo/1");
|
||||||
|
final ServerName worker1 = new ServerName("worker1,1,1");
|
||||||
|
|
||||||
|
SplitLogTask slt = new SplitLogTask.Owned(worker1);
|
||||||
|
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
|
||||||
|
if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
|
||||||
|
|
||||||
|
// Not yet resubmitted.
|
||||||
|
Assert.assertEquals(0, tot_mgr_resubmit.get());
|
||||||
|
|
||||||
|
// This server becomes dead
|
||||||
|
Mockito.when(sm.isServerOnline(worker1)).thenReturn(false);
|
||||||
|
|
||||||
|
Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded).
|
||||||
|
|
||||||
|
// It has been resubmitted
|
||||||
|
Assert.assertEquals(1, tot_mgr_resubmit.get());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEmptyLogDir() throws Exception {
|
public void testEmptyLogDir() throws Exception {
|
||||||
LOG.info("testEmptyLogDir");
|
LOG.info("testEmptyLogDir");
|
||||||
slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
|
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
|
||||||
slm.finishInitialization();
|
slm.finishInitialization();
|
||||||
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||||
Path emptyLogDirPath = new Path(fs.getWorkingDirectory(),
|
Path emptyLogDirPath = new Path(fs.getWorkingDirectory(),
|
||||||
|
@ -508,7 +533,7 @@ public class TestSplitLogManager {
|
||||||
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
|
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
|
||||||
conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0);
|
conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0);
|
||||||
|
|
||||||
slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
|
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
|
||||||
slm.finishInitialization();
|
slm.finishInitialization();
|
||||||
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||||
final Path logDir = new Path(fs.getWorkingDirectory(),
|
final Path logDir = new Path(fs.getWorkingDirectory(),
|
||||||
|
|
Loading…
Reference in New Issue