HBASE-4798 Sleeps and synchronisation improvements for tests
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1204769 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bba38d9a5f
commit
23532b3538
|
@ -409,7 +409,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
synchronized(this.regionsInTransition) {
|
synchronized(this.regionsInTransition) {
|
||||||
while (!this.master.isStopped() &&
|
while (!this.master.isStopped() &&
|
||||||
this.regionsInTransition.containsKey(hri.getEncodedName())) {
|
this.regionsInTransition.containsKey(hri.getEncodedName())) {
|
||||||
this.regionsInTransition.wait();
|
// We expect a notify, but by security we set a timout
|
||||||
|
this.regionsInTransition.wait(100);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return intransistion;
|
return intransistion;
|
||||||
|
@ -1824,7 +1825,10 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
throws InterruptedException {
|
throws InterruptedException {
|
||||||
synchronized(regions) {
|
synchronized(regions) {
|
||||||
while(!regions.containsKey(regionInfo)) {
|
while(!regions.containsKey(regionInfo)) {
|
||||||
regions.wait();
|
// We should receive a notification, but it's
|
||||||
|
// better to have a timeout to recheck the condition here:
|
||||||
|
// it lowers the impact of a race condition if any
|
||||||
|
regions.wait(100);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -420,11 +420,11 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
||||||
", cluster-up flag was=" + wasUp);
|
", cluster-up flag was=" + wasUp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if we should stop every second.
|
||||||
|
private Sleeper stopSleeper = new Sleeper(1000, this);
|
||||||
private void loop() {
|
private void loop() {
|
||||||
// Check if we should stop every second.
|
|
||||||
Sleeper sleeper = new Sleeper(1000, this);
|
|
||||||
while (!this.stopped) {
|
while (!this.stopped) {
|
||||||
sleeper.sleep();
|
stopSleeper.sleep();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1504,6 +1504,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
||||||
public void stop(final String why) {
|
public void stop(final String why) {
|
||||||
LOG.info(why);
|
LOG.info(why);
|
||||||
this.stopped = true;
|
this.stopped = true;
|
||||||
|
// We wake up the stopSleeper to stop immediately
|
||||||
|
stopSleeper.skipSleepCycle();
|
||||||
// If we are a backup master, we need to interrupt wait
|
// If we are a backup master, we need to interrupt wait
|
||||||
if (this.activeMasterManager != null) {
|
if (this.activeMasterManager != null) {
|
||||||
synchronized (this.activeMasterManager.clusterHasActiveMaster) {
|
synchronized (this.activeMasterManager.clusterHasActiveMaster) {
|
||||||
|
|
|
@ -967,6 +967,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
stop("Failed initialization");
|
stop("Failed initialization");
|
||||||
throw convertThrowableToIOE(cleanup(e, "Failed init"),
|
throw convertThrowableToIOE(cleanup(e, "Failed init"),
|
||||||
"Region server startup failed");
|
"Region server startup failed");
|
||||||
|
} finally {
|
||||||
|
sleeper.skipSleepCycle();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1552,9 +1554,14 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
public void stop(final String msg) {
|
public void stop(final String msg) {
|
||||||
this.stopped = true;
|
this.stopped = true;
|
||||||
LOG.info("STOPPED: " + msg);
|
LOG.info("STOPPED: " + msg);
|
||||||
synchronized (this) {
|
// Wakes run() if it is sleeping
|
||||||
// Wakes run() if it is sleeping
|
//sleeper.skipSleepCycle();
|
||||||
notifyAll(); // FindBugs NN_NAKED_NOTIFY
|
//will be uncommented later, see discussion in jira 4798
|
||||||
|
}
|
||||||
|
|
||||||
|
public void waitForServerOnline(){
|
||||||
|
while (!isOnline() && !isStopped()){
|
||||||
|
sleeper.sleep();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1722,10 +1729,17 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
*/
|
*/
|
||||||
private ServerName getMaster() {
|
private ServerName getMaster() {
|
||||||
ServerName masterServerName = null;
|
ServerName masterServerName = null;
|
||||||
|
long previousLogTime = 0;
|
||||||
while ((masterServerName = this.masterAddressManager.getMasterAddress()) == null) {
|
while ((masterServerName = this.masterAddressManager.getMasterAddress()) == null) {
|
||||||
if (!keepLooping()) return null;
|
if (!keepLooping()) return null;
|
||||||
LOG.debug("No master found; retry");
|
if (System.currentTimeMillis() > (previousLogTime+1000)){
|
||||||
sleeper.sleep();
|
LOG.debug("No master found; retry");
|
||||||
|
previousLogTime = System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException ignored) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
InetSocketAddress isa =
|
InetSocketAddress isa =
|
||||||
new InetSocketAddress(masterServerName.getHostname(), masterServerName.getPort());
|
new InetSocketAddress(masterServerName.getHostname(), masterServerName.getPort());
|
||||||
|
@ -1744,11 +1758,20 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
e = e instanceof RemoteException ?
|
e = e instanceof RemoteException ?
|
||||||
((RemoteException)e).unwrapRemoteException() : e;
|
((RemoteException)e).unwrapRemoteException() : e;
|
||||||
if (e instanceof ServerNotRunningYetException) {
|
if (e instanceof ServerNotRunningYetException) {
|
||||||
LOG.info("Master isn't available yet, retrying");
|
if (System.currentTimeMillis() > (previousLogTime+1000)){
|
||||||
|
LOG.info("Master isn't available yet, retrying");
|
||||||
|
previousLogTime = System.currentTimeMillis();
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Unable to connect to master. Retrying. Error was:", e);
|
if (System.currentTimeMillis() > (previousLogTime + 1000)) {
|
||||||
|
LOG.warn("Unable to connect to master. Retrying. Error was:", e);
|
||||||
|
previousLogTime = System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep(200);
|
||||||
|
} catch (InterruptedException ignored) {
|
||||||
}
|
}
|
||||||
sleeper.sleep();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.info("Connected to master at " + isa);
|
LOG.info("Connected to master at " + isa);
|
||||||
|
|
|
@ -62,14 +62,7 @@ public class JVMClusterUtil {
|
||||||
// the HRS#run method. HRS#init can fail for whatever region. In those
|
// the HRS#run method. HRS#init can fail for whatever region. In those
|
||||||
// cases, we'll jump out of the run without setting online flag. Check
|
// cases, we'll jump out of the run without setting online flag. Check
|
||||||
// stopRequested so we don't wait here a flag that will never be flipped.
|
// stopRequested so we don't wait here a flag that will never be flipped.
|
||||||
while (!this.regionServer.isOnline() &&
|
regionServer.waitForServerOnline();
|
||||||
!this.regionServer.isStopped()) {
|
|
||||||
try {
|
|
||||||
Thread.sleep(1000);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
// continue waiting
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,22 +111,6 @@ public class JVMClusterUtil {
|
||||||
public HMaster getMaster() {
|
public HMaster getMaster() {
|
||||||
return this.master;
|
return this.master;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Block until the master has come online, indicating it is ready
|
|
||||||
* to be used.
|
|
||||||
*/
|
|
||||||
public void waitForServerOnline() {
|
|
||||||
// The server is marked online after init begins but before race to become
|
|
||||||
// the active master.
|
|
||||||
while (!this.master.isMasterRunning() && !this.master.isStopped()) {
|
|
||||||
try {
|
|
||||||
Thread.sleep(1000);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
// continue waiting
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -165,20 +142,49 @@ public class JVMClusterUtil {
|
||||||
return new JVMClusterUtil.MasterThread(server, index);
|
return new JVMClusterUtil.MasterThread(server, index);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static JVMClusterUtil.MasterThread findActiveMaster(
|
||||||
|
List<JVMClusterUtil.MasterThread> masters) {
|
||||||
|
for (JVMClusterUtil.MasterThread t : masters) {
|
||||||
|
if (t.master.isActiveMaster()) {
|
||||||
|
return t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start the cluster. Waits until there is a primary master and returns its
|
* Start the cluster. Waits until there is a primary master initialized
|
||||||
* address.
|
* and returns its address.
|
||||||
* @param masters
|
* @param masters
|
||||||
* @param regionservers
|
* @param regionservers
|
||||||
* @return Address to use contacting primary master.
|
* @return Address to use contacting primary master.
|
||||||
*/
|
*/
|
||||||
public static String startup(final List<JVMClusterUtil.MasterThread> masters,
|
public static String startup(final List<JVMClusterUtil.MasterThread> masters,
|
||||||
final List<JVMClusterUtil.RegionServerThread> regionservers) throws IOException {
|
final List<JVMClusterUtil.RegionServerThread> regionservers) throws IOException {
|
||||||
if (masters != null) {
|
|
||||||
for (JVMClusterUtil.MasterThread t : masters) {
|
if (masters == null || masters.isEmpty()) {
|
||||||
t.start();
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (JVMClusterUtil.MasterThread t : masters) {
|
||||||
|
t.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for an active master
|
||||||
|
// having an active master before starting the region threads allows
|
||||||
|
// then to succeed on their connection to master
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
while (findActiveMaster(masters) == null) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException ignored) {
|
||||||
|
}
|
||||||
|
if (System.currentTimeMillis() > startTime + 30000) {
|
||||||
|
throw new RuntimeException("Master not active after 30 seconds");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (regionservers != null) {
|
if (regionservers != null) {
|
||||||
for (JVMClusterUtil.RegionServerThread t: regionservers) {
|
for (JVMClusterUtil.RegionServerThread t: regionservers) {
|
||||||
HRegionServer hrs = t.getRegionServer();
|
HRegionServer hrs = t.getRegionServer();
|
||||||
|
@ -187,19 +193,21 @@ public class JVMClusterUtil {
|
||||||
t.start();
|
t.start();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (masters == null || masters.isEmpty()) {
|
|
||||||
return null;
|
// Wait for an active master to be initialized (implies being master)
|
||||||
}
|
// with this, when we return the cluster is complete
|
||||||
// Wait for an active master
|
startTime = System.currentTimeMillis();
|
||||||
while (true) {
|
while (true) {
|
||||||
for (JVMClusterUtil.MasterThread t : masters) {
|
JVMClusterUtil.MasterThread t = findActiveMaster(masters);
|
||||||
if (t.master.isActiveMaster()) {
|
if (t != null && t.master.isInitialized()) {
|
||||||
return t.master.getServerName().toString();
|
return t.master.getServerName().toString();
|
||||||
}
|
}
|
||||||
|
if (System.currentTimeMillis() > startTime + 200000) {
|
||||||
|
throw new RuntimeException("Master not initialized after 200 seconds");
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
Thread.sleep(1000);
|
Thread.sleep(100);
|
||||||
} catch(InterruptedException e) {
|
} catch (InterruptedException ignored) {
|
||||||
// Keep waiting
|
// Keep waiting
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,7 +62,7 @@ public class Sleeper {
|
||||||
public void skipSleepCycle() {
|
public void skipSleepCycle() {
|
||||||
synchronized (sleepLock) {
|
synchronized (sleepLock) {
|
||||||
triggerWake = true;
|
triggerWake = true;
|
||||||
sleepLock.notify();
|
sleepLock.notifyAll();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -127,11 +127,9 @@ public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
while (!this.stopped && (notimeout || remaining > 0) && this.data == null) {
|
while (!this.stopped && (notimeout || remaining > 0) && this.data == null) {
|
||||||
if (notimeout) {
|
// We expect a notification; but we wait with a
|
||||||
wait();
|
// a timeout to lower the impact of a race condition if any
|
||||||
continue;
|
wait(100);
|
||||||
}
|
|
||||||
wait(remaining);
|
|
||||||
remaining = timeout - (System.currentTimeMillis() - startTime);
|
remaining = timeout - (System.currentTimeMillis() - startTime);
|
||||||
}
|
}
|
||||||
return this.data;
|
return this.data;
|
||||||
|
|
|
@ -1245,7 +1245,7 @@ public class HBaseTestingUtility {
|
||||||
boolean checkStatus) throws Exception {
|
boolean checkStatus) throws Exception {
|
||||||
Configuration c = new Configuration(this.conf);
|
Configuration c = new Configuration(this.conf);
|
||||||
String quorumServers = ZKConfig.getZKQuorumServersString(c);
|
String quorumServers = ZKConfig.getZKQuorumServersString(c);
|
||||||
int sessionTimeout = 5 * 1000; // 5 seconds
|
int sessionTimeout = 500;
|
||||||
ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
|
ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
|
||||||
byte[] password = zk.getSessionPasswd();
|
byte[] password = zk.getSessionPasswd();
|
||||||
long sessionID = zk.getSessionId();
|
long sessionID = zk.getSessionId();
|
||||||
|
@ -1253,7 +1253,7 @@ public class HBaseTestingUtility {
|
||||||
ZooKeeper newZK = new ZooKeeper(quorumServers,
|
ZooKeeper newZK = new ZooKeeper(quorumServers,
|
||||||
sessionTimeout, EmptyWatcher.instance, sessionID, password);
|
sessionTimeout, EmptyWatcher.instance, sessionID, password);
|
||||||
newZK.close();
|
newZK.close();
|
||||||
final long sleep = sessionTimeout * 5L;
|
final long sleep = 7000; // 7s seems enough to manage the timeout
|
||||||
LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID) +
|
LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID) +
|
||||||
"; sleeping=" + sleep);
|
"; sleeping=" + sleep);
|
||||||
|
|
||||||
|
|
|
@ -293,7 +293,6 @@ public class MiniHBaseCluster {
|
||||||
try {
|
try {
|
||||||
t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user);
|
t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user);
|
||||||
t.start();
|
t.start();
|
||||||
t.waitForServerOnline();
|
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
throw new IOException("Interrupted adding master to cluster", ie);
|
throw new IOException("Interrupted adding master to cluster", ie);
|
||||||
}
|
}
|
||||||
|
@ -382,7 +381,7 @@ public class MiniHBaseCluster {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Thread.sleep(200);
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -212,7 +212,7 @@ public class TestRegionRebalancing {
|
||||||
// while (!cluster.getMaster().allRegionsAssigned()) {
|
// while (!cluster.getMaster().allRegionsAssigned()) {
|
||||||
LOG.debug("Waiting for there to be 22 regions, but there are " + getRegionCount() + " right now.");
|
LOG.debug("Waiting for there to be 22 regions, but there are " + getRegionCount() + " right now.");
|
||||||
try {
|
try {
|
||||||
Thread.sleep(1000);
|
Thread.sleep(200);
|
||||||
} catch (InterruptedException e) {}
|
} catch (InterruptedException e) {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.*;
|
import org.apache.hadoop.hbase.*;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@ -37,7 +38,7 @@ public class TestCatalogTrackerOnCluster {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
* @see https://issues.apache.org/jira/browse/HBASE-3445
|
* @see {https://issues.apache.org/jira/browse/HBASE-3445}
|
||||||
*/
|
*/
|
||||||
@Test public void testBadOriginalRootLocation() throws Exception {
|
@Test public void testBadOriginalRootLocation() throws Exception {
|
||||||
UTIL.getConfiguration().setInt("ipc.socket.timeout", 3000);
|
UTIL.getConfiguration().setInt("ipc.socket.timeout", 3000);
|
||||||
|
@ -61,9 +62,15 @@ public class TestCatalogTrackerOnCluster {
|
||||||
ServerName nonsense =
|
ServerName nonsense =
|
||||||
new ServerName("example.org", 1234, System.currentTimeMillis());
|
new ServerName("example.org", 1234, System.currentTimeMillis());
|
||||||
RootLocationEditor.setRootLocation(zookeeper, nonsense);
|
RootLocationEditor.setRootLocation(zookeeper, nonsense);
|
||||||
|
|
||||||
// Bring back up the hbase cluster. See if it can deal with nonsense root
|
// Bring back up the hbase cluster. See if it can deal with nonsense root
|
||||||
// location.
|
// location. The cluster should start and be fully available.
|
||||||
UTIL.startMiniHBaseCluster(1, 1);
|
UTIL.startMiniHBaseCluster(1, 1);
|
||||||
|
|
||||||
|
// if we can create a table, it's a good sign that it's working
|
||||||
|
UTIL.createTable(
|
||||||
|
getClass().getSimpleName().getBytes(), "family".getBytes());
|
||||||
|
|
||||||
UTIL.shutdownMiniCluster();
|
UTIL.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -94,8 +94,11 @@ public class TestRestartCluster {
|
||||||
LOG.info("\n\nCreating tables");
|
LOG.info("\n\nCreating tables");
|
||||||
for(byte [] TABLE : TABLES) {
|
for(byte [] TABLE : TABLES) {
|
||||||
UTIL.createTable(TABLE, FAMILY);
|
UTIL.createTable(TABLE, FAMILY);
|
||||||
|
}
|
||||||
|
for(byte [] TABLE : TABLES) {
|
||||||
UTIL.waitTableAvailable(TABLE, 30000);
|
UTIL.waitTableAvailable(TABLE, 30000);
|
||||||
}
|
}
|
||||||
|
|
||||||
List<HRegionInfo> allRegions =
|
List<HRegionInfo> allRegions =
|
||||||
MetaScanner.listAllRegions(UTIL.getConfiguration());
|
MetaScanner.listAllRegions(UTIL.getConfiguration());
|
||||||
assertEquals(3, allRegions.size());
|
assertEquals(3, allRegions.size());
|
||||||
|
|
|
@ -127,7 +127,7 @@ public class TestHLog {
|
||||||
}
|
}
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDownAfterClass() throws Exception {
|
public static void tearDownAfterClass() throws Exception {
|
||||||
TEST_UTIL.shutdownMiniDFSCluster();
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String getName() {
|
private static String getName() {
|
||||||
|
|
|
@ -59,7 +59,7 @@ public class TestFSTableDescriptors {
|
||||||
assertTrue(FSTableDescriptors.createTableDescriptor(fs, testdir, htd));
|
assertTrue(FSTableDescriptors.createTableDescriptor(fs, testdir, htd));
|
||||||
assertFalse(FSTableDescriptors.createTableDescriptor(fs, testdir, htd));
|
assertFalse(FSTableDescriptors.createTableDescriptor(fs, testdir, htd));
|
||||||
FileStatus [] statuses = fs.listStatus(testdir);
|
FileStatus [] statuses = fs.listStatus(testdir);
|
||||||
assertTrue(statuses.length == 1);
|
assertTrue("statuses.length="+statuses.length, statuses.length == 1);
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
FSTableDescriptors.updateHTableDescriptor(fs, testdir, htd);
|
FSTableDescriptors.updateHTableDescriptor(fs, testdir, htd);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue