HBASE-3098 TestMetaReaderEditor is broken in TRUNK; hangs; part2 of the patch

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1006252 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-10-10 06:04:59 +00:00
parent 52bbd22187
commit 924f28e5c3
11 changed files with 275 additions and 115 deletions

View File

@ -53,7 +53,8 @@ import org.apache.zookeeper.KeeperException;
* the location of <code>.META.</code> If not available in <code>-ROOT-</code>, * the location of <code>.META.</code> If not available in <code>-ROOT-</code>,
* ZooKeeper is used to monitor for a new location of <code>.META.</code>. * ZooKeeper is used to monitor for a new location of <code>.META.</code>.
* *
* <p>Call {@link #start()} to start up operation. * <p>Call {@link #start()} to start up operation. Call {@link #stop()}} to
* interrupt waits and close up shop.
*/ */
public class CatalogTracker { public class CatalogTracker {
private static final Log LOG = LogFactory.getLog(CatalogTracker.class); private static final Log LOG = LogFactory.getLog(CatalogTracker.class);
@ -64,6 +65,7 @@ public class CatalogTracker {
private final AtomicBoolean metaAvailable = new AtomicBoolean(false); private final AtomicBoolean metaAvailable = new AtomicBoolean(false);
private HServerAddress metaLocation; private HServerAddress metaLocation;
private final int defaultTimeout; private final int defaultTimeout;
private boolean stopped = false;
public static final byte [] ROOT_REGION = public static final byte [] ROOT_REGION =
HRegionInfo.ROOT_REGIONINFO.getRegionName(); HRegionInfo.ROOT_REGIONINFO.getRegionName();
@ -129,6 +131,22 @@ public class CatalogTracker {
this.metaNodeTracker.start(); this.metaNodeTracker.start();
} }
/**
* Stop working.
* Interrupts any ongoing waits.
*/
public void stop() {
LOG.debug("Stopping catalog tracker " + this.connection.toString() +
"; will interrupt blocked waits on root and meta");
this.stopped = true;
this.rootRegionTracker.stop();
this.metaNodeTracker.stop();
// Call this and it will interrupt any ongoing waits on meta.
synchronized (this.metaAvailable) {
this.metaAvailable.notifyAll();
}
}
/** /**
* Gets the current location for <code>-ROOT-</code> or null if location is * Gets the current location for <code>-ROOT-</code> or null if location is
* not currently available. * not currently available.
@ -274,8 +292,8 @@ public class CatalogTracker {
* @throws InterruptedException if interrupted while waiting * @throws InterruptedException if interrupted while waiting
*/ */
public void waitForMeta() throws InterruptedException { public void waitForMeta() throws InterruptedException {
synchronized(metaAvailable) { synchronized (metaAvailable) {
while (!metaAvailable.get()) { while (!stopped && !metaAvailable.get()) {
metaAvailable.wait(); metaAvailable.wait();
} }
} }
@ -301,7 +319,7 @@ public class CatalogTracker {
if (getMetaServerConnection(true) != null) { if (getMetaServerConnection(true) != null) {
return metaLocation; return metaLocation;
} }
while(!metaAvailable.get() && while(!stopped && !metaAvailable.get() &&
(timeout == 0 || System.currentTimeMillis() < stop)) { (timeout == 0 || System.currentTimeMillis() < stop)) {
metaAvailable.wait(timeout); metaAvailable.wait(timeout);
} }
@ -486,4 +504,8 @@ public class CatalogTracker {
MetaNodeTracker getMetaNodeTracker() { MetaNodeTracker getMetaNodeTracker() {
return this.metaNodeTracker; return this.metaNodeTracker;
} }
public HConnection getConnection() {
return this.connection;
}
} }

View File

@ -66,11 +66,6 @@ public class HBaseAdmin implements Abortable {
private volatile Configuration conf; private volatile Configuration conf;
private final long pause; private final long pause;
private final int numRetries; private final int numRetries;
/**
* Lazily instantiated. Use {@link #getCatalogTracker()} to ensure you get
* an instance rather than a null.
*/
private CatalogTracker catalogTracker = null;
/** /**
* Constructor * Constructor
@ -88,21 +83,32 @@ public class HBaseAdmin implements Abortable {
this.connection.getMaster(); this.connection.getMaster();
} }
/**
* @return A new CatalogTracker instance; call {@link #cleanupCatalogTracker(CatalogTracker)}
* to cleanup the returned catalog tracker.
* @throws ZooKeeperConnectionException
* @throws IOException
* @see #cleanupCatalogTracker(CatalogTracker);
*/
private synchronized CatalogTracker getCatalogTracker() private synchronized CatalogTracker getCatalogTracker()
throws ZooKeeperConnectionException, IOException { throws ZooKeeperConnectionException, IOException {
if (this.catalogTracker == null) { CatalogTracker ct = null;
this.catalogTracker = new CatalogTracker(this.connection.getZooKeeperWatcher(), try {
HConnectionManager.getConnection(conf), this, HConnection connection =
this.conf.getInt("hbase.admin.catalog.timeout", 10 * 1000)); HConnectionManager.getConnection(new Configuration(this.conf));
try { ct = new CatalogTracker(connection);
this.catalogTracker.start(); ct.start();
} catch (InterruptedException e) { } catch (InterruptedException e) {
// Let it out as an IOE for now until we redo all so tolerate IEs // Let it out as an IOE for now until we redo all so tolerate IEs
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new IOException("Interrupted", e); throw new IOException("Interrupted", e);
}
} }
return this.catalogTracker; return ct;
}
private void cleanupCatalogTracker(final CatalogTracker ct) {
ct.stop();
HConnectionManager.deleteConnection(ct.getConnection());
} }
@Override @Override
@ -142,7 +148,14 @@ public class HBaseAdmin implements Abortable {
*/ */
public boolean tableExists(final String tableName) public boolean tableExists(final String tableName)
throws IOException { throws IOException {
return MetaReader.tableExists(getCatalogTracker(), tableName); boolean b = false;
CatalogTracker ct = getCatalogTracker();
try {
b = MetaReader.tableExists(ct, tableName);
} finally {
cleanupCatalogTracker(ct);
}
return b;
} }
/** /**
@ -718,15 +731,20 @@ public class HBaseAdmin implements Abortable {
*/ */
public void closeRegion(final byte [] regionname, final String hostAndPort) public void closeRegion(final byte [] regionname, final String hostAndPort)
throws IOException { throws IOException {
if (hostAndPort != null) { CatalogTracker ct = getCatalogTracker();
HServerAddress hsa = new HServerAddress(hostAndPort); try {
Pair<HRegionInfo, HServerAddress> pair = if (hostAndPort != null) {
MetaReader.getRegion(getCatalogTracker(), regionname); HServerAddress hsa = new HServerAddress(hostAndPort);
closeRegion(hsa, pair.getFirst()); Pair<HRegionInfo, HServerAddress> pair =
} else { MetaReader.getRegion(ct, regionname);
Pair<HRegionInfo, HServerAddress> pair = closeRegion(hsa, pair.getFirst());
MetaReader.getRegion(getCatalogTracker(), regionname); } else {
closeRegion(pair.getSecond(), pair.getFirst()); Pair<HRegionInfo, HServerAddress> pair =
MetaReader.getRegion(ct, regionname);
closeRegion(pair.getSecond(), pair.getFirst());
}
} finally {
cleanupCatalogTracker(ct);
} }
} }
@ -760,17 +778,22 @@ public class HBaseAdmin implements Abortable {
public void flush(final byte [] tableNameOrRegionName) public void flush(final byte [] tableNameOrRegionName)
throws IOException, InterruptedException { throws IOException, InterruptedException {
boolean isRegionName = isRegionName(tableNameOrRegionName); boolean isRegionName = isRegionName(tableNameOrRegionName);
if (isRegionName) { CatalogTracker ct = getCatalogTracker();
Pair<HRegionInfo, HServerAddress> pair = try {
MetaReader.getRegion(getCatalogTracker(), tableNameOrRegionName); if (isRegionName) {
flush(pair.getSecond(), pair.getFirst()); Pair<HRegionInfo, HServerAddress> pair =
} else { MetaReader.getRegion(getCatalogTracker(), tableNameOrRegionName);
List<Pair<HRegionInfo, HServerAddress>> pairs =
MetaReader.getTableRegionsAndLocations(getCatalogTracker(),
Bytes.toString(tableNameOrRegionName));
for (Pair<HRegionInfo, HServerAddress> pair: pairs) {
flush(pair.getSecond(), pair.getFirst()); flush(pair.getSecond(), pair.getFirst());
} else {
List<Pair<HRegionInfo, HServerAddress>> pairs =
MetaReader.getTableRegionsAndLocations(getCatalogTracker(),
Bytes.toString(tableNameOrRegionName));
for (Pair<HRegionInfo, HServerAddress> pair: pairs) {
flush(pair.getSecond(), pair.getFirst());
}
} }
} finally {
cleanupCatalogTracker(ct);
} }
} }
@ -843,17 +866,22 @@ public class HBaseAdmin implements Abortable {
*/ */
private void compact(final byte [] tableNameOrRegionName, final boolean major) private void compact(final byte [] tableNameOrRegionName, final boolean major)
throws IOException, InterruptedException { throws IOException, InterruptedException {
if (isRegionName(tableNameOrRegionName)) { CatalogTracker ct = getCatalogTracker();
Pair<HRegionInfo, HServerAddress> pair = try {
MetaReader.getRegion(getCatalogTracker(), tableNameOrRegionName); if (isRegionName(tableNameOrRegionName)) {
compact(pair.getSecond(), pair.getFirst(), major); Pair<HRegionInfo, HServerAddress> pair =
} else { MetaReader.getRegion(ct, tableNameOrRegionName);
List<Pair<HRegionInfo, HServerAddress>> pairs =
MetaReader.getTableRegionsAndLocations(getCatalogTracker(),
Bytes.toString(tableNameOrRegionName));
for (Pair<HRegionInfo, HServerAddress> pair: pairs) {
compact(pair.getSecond(), pair.getFirst(), major); compact(pair.getSecond(), pair.getFirst(), major);
} else {
List<Pair<HRegionInfo, HServerAddress>> pairs =
MetaReader.getTableRegionsAndLocations(ct,
Bytes.toString(tableNameOrRegionName));
for (Pair<HRegionInfo, HServerAddress> pair: pairs) {
compact(pair.getSecond(), pair.getFirst(), major);
}
} }
} finally {
cleanupCatalogTracker(ct);
} }
} }
@ -920,19 +948,25 @@ public class HBaseAdmin implements Abortable {
* @throws IOException if a remote or network exception occurs * @throws IOException if a remote or network exception occurs
* @throws InterruptedException * @throws InterruptedException
*/ */
public void split(final byte [] tableNameOrRegionName) throws IOException, InterruptedException { public void split(final byte [] tableNameOrRegionName)
if (isRegionName(tableNameOrRegionName)) { throws IOException, InterruptedException {
// Its a possible region name. CatalogTracker ct = getCatalogTracker();
Pair<HRegionInfo, HServerAddress> pair = try {
MetaReader.getRegion(getCatalogTracker(), tableNameOrRegionName); if (isRegionName(tableNameOrRegionName)) {
split(pair.getSecond(), pair.getFirst()); // Its a possible region name.
} else { Pair<HRegionInfo, HServerAddress> pair =
List<Pair<HRegionInfo, HServerAddress>> pairs = MetaReader.getRegion(getCatalogTracker(), tableNameOrRegionName);
MetaReader.getTableRegionsAndLocations(getCatalogTracker(),
Bytes.toString(tableNameOrRegionName));
for (Pair<HRegionInfo, HServerAddress> pair: pairs) {
split(pair.getSecond(), pair.getFirst()); split(pair.getSecond(), pair.getFirst());
} else {
List<Pair<HRegionInfo, HServerAddress>> pairs =
MetaReader.getTableRegionsAndLocations(getCatalogTracker(),
Bytes.toString(tableNameOrRegionName));
for (Pair<HRegionInfo, HServerAddress> pair: pairs) {
split(pair.getSecond(), pair.getFirst());
}
} }
} finally {
cleanupCatalogTracker(ct);
} }
} }

View File

@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
@ -41,6 +42,11 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
* {@link HConnectionManager} manages instances of this class. * {@link HConnectionManager} manages instances of this class.
*/ */
public interface HConnection extends Abortable { public interface HConnection extends Abortable {
/**
* @return Configuration instance being used by this HConnection instance.
*/
public Configuration getConfiguration();
/** /**
* Retrieve ZooKeeperWatcher used by the connection. * Retrieve ZooKeeperWatcher used by the connection.
* @return ZooKeeperWatcher handle being used by the connection. * @return ZooKeeperWatcher handle being used by the connection.

View File

@ -139,6 +139,14 @@ public class HConnectionManager {
} }
} }
/**
* Delete connection information for the instance
* @param connection configuration
*/
public static void deleteConnection(HConnection connection) {
deleteConnection(connection.getConfiguration(), false);
}
/** /**
* Delete information for all connections. * Delete information for all connections.
* @param stopProxy stop the proxy as well * @param stopProxy stop the proxy as well
@ -231,17 +239,12 @@ public class HConnectionManager {
public HConnectionImplementation(Configuration conf) public HConnectionImplementation(Configuration conf)
throws ZooKeeperConnectionException { throws ZooKeeperConnectionException {
this.conf = conf; this.conf = conf;
String serverClassName = conf.get(HConstants.REGION_SERVER_CLASS,
String serverClassName = HConstants.DEFAULT_REGION_SERVER_CLASS);
conf.get(HConstants.REGION_SERVER_CLASS,
HConstants.DEFAULT_REGION_SERVER_CLASS);
this.closed = false; this.closed = false;
try { try {
this.serverInterfaceClass = this.serverInterfaceClass =
(Class<? extends HRegionInterface>) Class.forName(serverClassName); (Class<? extends HRegionInterface>) Class.forName(serverClassName);
} catch (ClassNotFoundException e) { } catch (ClassNotFoundException e) {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
"Unable to find region server interface " + serverClassName, e); "Unable to find region server interface " + serverClassName, e);
@ -271,6 +274,10 @@ public class HConnectionManager {
this.masterChecked = false; this.masterChecked = false;
} }
public Configuration getConfiguration() {
return this.conf;
}
@Override @Override
public String toString() { public String toString() {
return this.identifier; return this.identifier;

View File

@ -287,6 +287,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
// Stop services started for both backup and active masters // Stop services started for both backup and active masters
this.activeMasterManager.stop(); this.activeMasterManager.stop();
this.catalogTracker.stop();
HConnectionManager.deleteConnection(this.conf, true); HConnectionManager.deleteConnection(this.conf, true);
this.zooKeeper.close(); this.zooKeeper.close();
LOG.info("HMaster main thread exiting"); LOG.info("HMaster main thread exiting");

View File

@ -592,6 +592,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
closeAllScanners(); closeAllScanners();
LOG.info("stopping server at: " + this.serverInfo.getServerName()); LOG.info("stopping server at: " + this.serverInfo.getServerName());
} }
// Interrupt catalog tracker here in case any regions being opened out in
// handlers are stuck waiting on meta or root.
this.catalogTracker.stop();
waitOnAllRegionsToClose(); waitOnAllRegionsToClose();
// Make sure the proxy is down. // Make sure the proxy is down.
@ -1240,8 +1243,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
r.hasReferences()? "Region has references on open" : r.hasReferences()? "Region has references on open" :
"Region has too many store files"); "Region has too many store files");
} }
// Add to online regions
addToOnlineRegions(r);
// Update ZK, ROOT or META // Update ZK, ROOT or META
if (r.getRegionInfo().isRootRegion()) { if (r.getRegionInfo().isRootRegion()) {
RootLocationEditor.setRootLocation(getZooKeeper(), RootLocationEditor.setRootLocation(getZooKeeper(),
@ -1256,6 +1257,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
MetaEditor.updateRegionLocation(ct, r.getRegionInfo(), getServerInfo()); MetaEditor.updateRegionLocation(ct, r.getRegionInfo(), getServerInfo());
} }
} }
// Add to online regions if all above was successful.
addToOnlineRegions(r);
} }
/** /**

View File

@ -65,7 +65,8 @@ public class OpenRegionHandler extends EventHandler {
@Override @Override
public void process() throws IOException { public void process() throws IOException {
LOG.debug("Processing open of " + regionInfo.getRegionNameAsString()); final String name = regionInfo.getRegionNameAsString();
LOG.debug("Processing open of " + name);
final String encodedName = regionInfo.getEncodedName(); final String encodedName = regionInfo.getEncodedName();
// TODO: Previously we would check for root region availability (but only that it // TODO: Previously we would check for root region availability (but only that it
@ -76,7 +77,7 @@ public class OpenRegionHandler extends EventHandler {
// Check that this region is not already online // Check that this region is not already online
HRegion region = this.rsServices.getFromOnlineRegions(encodedName); HRegion region = this.rsServices.getFromOnlineRegions(encodedName);
if (region != null) { if (region != null) {
LOG.warn("Attempting open of " + regionInfo.getRegionNameAsString() + LOG.warn("Attempting open of " + name +
" but it's already online on this server"); " but it's already online on this server");
return; return;
} }
@ -89,22 +90,22 @@ public class OpenRegionHandler extends EventHandler {
try { try {
// Instantiate the region. This also periodically updates OPENING. // Instantiate the region. This also periodically updates OPENING.
region = HRegion.openHRegion(regionInfo, this.rsServices.getWAL(), region = HRegion.openHRegion(regionInfo, this.rsServices.getWAL(),
server.getConfiguration(), this.rsServices.getFlushRequester(), server.getConfiguration(), this.rsServices.getFlushRequester(),
new Progressable() { new Progressable() {
public void progress() { public void progress() {
try { try {
int vsn = ZKAssign.retransitionNodeOpening( int vsn = ZKAssign.retransitionNodeOpening(
server.getZooKeeper(), regionInfo, server.getServerName(), server.getZooKeeper(), regionInfo, server.getServerName(),
openingInteger.get()); openingInteger.get());
if (vsn == -1) { if (vsn == -1) {
throw KeeperException.create(Code.BADVERSION); throw KeeperException.create(Code.BADVERSION);
}
openingInteger.set(vsn);
} catch (KeeperException e) {
server.abort("ZK exception refreshing OPENING node", e);
} }
openingInteger.set(vsn);
} catch (KeeperException e) {
server.abort("ZK exception refreshing OPENING node; " + name, e);
} }
}); }
});
} catch (IOException e) { } catch (IOException e) {
LOG.error("IOException instantiating region for " + regionInfo + LOG.error("IOException instantiating region for " + regionInfo +
"; resetting state of transition node from OPENING to OFFLINE"); "; resetting state of transition node from OPENING to OFFLINE");
@ -115,29 +116,34 @@ public class OpenRegionHandler extends EventHandler {
ZKAssign.forceNodeOffline(server.getZooKeeper(), regionInfo, ZKAssign.forceNodeOffline(server.getZooKeeper(), regionInfo,
server.getServerName()); server.getServerName());
} catch (KeeperException e1) { } catch (KeeperException e1) {
LOG.error("Error forcing node back to OFFLINE from OPENING"); LOG.error("Error forcing node back to OFFLINE from OPENING; " + name);
return;
} }
return; return;
} }
// Region is now open. Close it if error.
// Re-transition node to OPENING again to verify no one has stomped on us // Re-transition node to OPENING again to verify no one has stomped on us
openingVersion = openingInteger.get(); openingVersion = openingInteger.get();
try { try {
if((openingVersion = ZKAssign.retransitionNodeOpening( if ((openingVersion = ZKAssign.retransitionNodeOpening(
server.getZooKeeper(), regionInfo, server.getServerName(), server.getZooKeeper(), regionInfo, server.getServerName(),
openingVersion)) == -1) { openingVersion)) == -1) {
LOG.warn("Completed the OPEN of a region but when transitioning from " + LOG.warn("Completed the OPEN of region " + name +
" OPENING to OPENING got a version mismatch, someone else clashed " + " but when transitioning from " +
"so now unassigning"); " OPENING to OPENING got a version mismatch, someone else clashed " +
region.close(); "-- closing region");
cleanupFailedOpen(region);
return; return;
} }
} catch (KeeperException e) { } catch (KeeperException e) {
LOG.error("Failed transitioning node from OPENING to OPENED", e); LOG.error("Failed transitioning node " + name +
" from OPENING to OPENED -- closing region", e);
cleanupFailedOpen(region);
return; return;
} catch (IOException e) { } catch (IOException e) {
LOG.error("Failed to close region after failing to transition", e); LOG.error("Failed to close region " + name +
" after failing to transition -- closing region", e);
cleanupFailedOpen(region);
return; return;
} }
@ -146,33 +152,48 @@ public class OpenRegionHandler extends EventHandler {
this.rsServices.postOpenDeployTasks(region, this.rsServices.postOpenDeployTasks(region,
this.server.getCatalogTracker(), false); this.server.getCatalogTracker(), false);
} catch (IOException e) { } catch (IOException e) {
// TODO: rollback the open? LOG.error("Error updating " + name + " location in catalog table -- " +
LOG.error("Error updating region location in catalog table", e); "closing region", e);
cleanupFailedOpen(region);
return;
} catch (KeeperException e) { } catch (KeeperException e) {
// TODO: rollback the open? // TODO: rollback the open?
LOG.error("ZK Error updating region location in catalog table", e); LOG.error("ZK Error updating " + name + " location in catalog " +
"table -- closing region", e);
cleanupFailedOpen(region);
return;
} }
// Finally, Transition ZK node to OPENED // Finally, Transition ZK node to OPENED
try { try {
if (ZKAssign.transitionNodeOpened(server.getZooKeeper(), regionInfo, if (ZKAssign.transitionNodeOpened(server.getZooKeeper(), regionInfo,
server.getServerName(), openingVersion) == -1) { server.getServerName(), openingVersion) == -1) {
LOG.warn("Completed the OPEN of a region but when transitioning from " + LOG.warn("Completed the OPEN of region " + name +
" OPENING to OPENED got a version mismatch, someone else clashed " + " but when transitioning from " +
"so now unassigning"); " OPENING to OPENED got a version mismatch, someone else clashed " +
region.close(); "so now unassigning -- closing region");
cleanupFailedOpen(region);
return; return;
} }
} catch (KeeperException e) { } catch (KeeperException e) {
LOG.error("Failed transitioning node from OPENING to OPENED", e); LOG.error("Failed transitioning node " + name +
" from OPENING to OPENED -- closing region", e);
cleanupFailedOpen(region);
return; return;
} catch (IOException e) { } catch (IOException e) {
LOG.error("Failed to close region after failing to transition", e); LOG.error("Failed to close " + name +
" after failing to transition -- closing region", e);
cleanupFailedOpen(region);
return; return;
} }
// Done! Successful region open // Done! Successful region open
LOG.debug("Opened " + region.getRegionNameAsString()); LOG.debug("Opened " + name);
}
private void cleanupFailedOpen(final HRegion region) throws IOException {
if (region != null) region.close();
this.rsServices.removeFromOnlineRegions(regionInfo.getEncodedName());
} }
int transitionZookeeperOfflineToOpening(final String encodedName) { int transitionZookeeperOfflineToOpening(final String encodedName) {

View File

@ -41,6 +41,8 @@ public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {
/** Used to abort if a fatal error occurs */ /** Used to abort if a fatal error occurs */
protected final Abortable abortable; protected final Abortable abortable;
private boolean stopped = false;
/** /**
* Constructs a new ZK node tracker. * Constructs a new ZK node tracker.
* *
@ -81,6 +83,11 @@ public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {
} }
} }
public synchronized void stop() {
this.stopped = true;
notifyAll();
}
/** /**
* Gets the data of the node, blocking until the node is available. * Gets the data of the node, blocking until the node is available.
* *
@ -107,7 +114,7 @@ public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {
boolean notimeout = timeout == 0; boolean notimeout = timeout == 0;
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
long remaining = timeout; long remaining = timeout;
while ((notimeout || remaining > 0) && this.data == null) { while (!this.stopped && (notimeout || remaining > 0) && this.data == null) {
if (notimeout) { if (notimeout) {
wait(); wait();
continue; continue;

View File

@ -19,6 +19,8 @@
*/ */
package org.apache.hadoop.hbase.catalog; package org.apache.hadoop.hbase.catalog;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException; import java.net.ConnectException;
import java.util.ArrayList; import java.util.ArrayList;
@ -33,11 +35,11 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
@ -102,6 +104,37 @@ public class TestCatalogTracker {
return ct; return ct;
} }
/**
* Test interruptable while blocking wait on root and meta.
* @throws IOException
* @throws InterruptedException
*/
@Test public void testInterruptWaitOnMetaAndRoot()
throws IOException, InterruptedException {
final CatalogTracker ct = constructAndStartCatalogTracker();
HServerAddress hsa = ct.getRootLocation();
Assert.assertNull(hsa);
HServerAddress meta = ct.getMetaLocation();
Assert.assertNull(meta);
Thread t = new Thread() {
@Override
public void run() {
try {
ct.waitForMeta();
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted", e);
}
}
};
t.start();
while (!t.isAlive()) Threads.sleep(1);
Threads.sleep(1);
assertTrue(t.isAlive());
ct.stop();
// Join the thread... should exit shortly.
t.join();
}
@Test public void testGetMetaServerConnectionFails() @Test public void testGetMetaServerConnectionFails()
throws IOException, InterruptedException, KeeperException { throws IOException, InterruptedException, KeeperException {
HConnection connection = Mockito.mock(HConnection.class); HConnection connection = Mockito.mock(HConnection.class);
@ -292,7 +325,7 @@ public class TestCatalogTracker {
try { try {
doWaiting(); doWaiting();
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException("Failed wait on root", e); throw new RuntimeException("Failed wait", e);
} }
LOG.info("Exiting " + getName()); LOG.info("Exiting " + getName());
} }

View File

@ -26,6 +26,7 @@ import java.util.NavigableMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
@ -50,9 +51,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
* a particular cell, and write it back to the table. * a particular cell, and write it back to the table.
*/ */
public class TestTableMapReduce extends MultiRegionTable { public class TestTableMapReduce extends MultiRegionTable {
private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class); private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
static final String MULTI_REGION_TABLE_NAME = "mrtest"; static final String MULTI_REGION_TABLE_NAME = "mrtest";
static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text"); static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
@ -112,17 +111,16 @@ public class TestTableMapReduce extends MultiRegionTable {
*/ */
public void testMultiRegionTable() public void testMultiRegionTable()
throws IOException, InterruptedException, ClassNotFoundException { throws IOException, InterruptedException, ClassNotFoundException {
runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME)); runTestOnTable(new HTable(new Configuration(conf), MULTI_REGION_TABLE_NAME));
} }
private void runTestOnTable(HTable table) private void runTestOnTable(HTable table)
throws IOException, InterruptedException, ClassNotFoundException { throws IOException, InterruptedException, ClassNotFoundException {
MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1); MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
Job job = null; Job job = null;
try { try {
LOG.info("Before map/reduce startup"); LOG.info("Before map/reduce startup");
job = new Job(conf, "process column contents"); job = new Job(table.getConfiguration(), "process column contents");
job.setNumReduceTasks(1); job.setNumReduceTasks(1);
Scan scan = new Scan(); Scan scan = new Scan();
scan.addFamily(INPUT_FAMILY); scan.addFamily(INPUT_FAMILY);
@ -150,7 +148,7 @@ public class TestTableMapReduce extends MultiRegionTable {
} }
private void verify(String tableName) throws IOException { private void verify(String tableName) throws IOException {
HTable table = new HTable(conf, tableName); HTable table = new HTable(new Configuration(conf), tableName);
boolean verified = false; boolean verified = false;
long pause = conf.getLong("hbase.client.pause", 5 * 1000); long pause = conf.getLong("hbase.client.pause", 5 * 1000);
int numRetries = conf.getInt("hbase.client.retries.number", 5); int numRetries = conf.getInt("hbase.client.retries.number", 5);

View File

@ -25,6 +25,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Random; import java.util.Random;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.master.TestActiveMasterManager.NodeDeletionListener; import org.apache.hadoop.hbase.master.TestActiveMasterManager.NodeDeletionListener;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher;
@ -60,9 +62,36 @@ public class TestZooKeeperNodeTracker {
TEST_UTIL.shutdownMiniZKCluster(); TEST_UTIL.shutdownMiniZKCluster();
} }
/**
* Test that we can interrupt a node that is blocked on a wait.
* @throws IOException
* @throws InterruptedException
*/
@Test public void testInterruptible() throws IOException, InterruptedException {
Abortable abortable = new StubAbortable();
ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
"testInterruptible", abortable);
final TestTracker tracker = new TestTracker(zk, "/xyz", abortable);
tracker.start();
Thread t = new Thread() {
@Override
public void run() {
try {
tracker.blockUntilAvailable();
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted", e);
}
}
};
t.start();
while (!t.isAlive()) Threads.sleep(1);
tracker.stop();
t.join();
// If it wasn't interruptible, we'd never get to here.
}
@Test @Test
public void testNodeTracker() throws Exception { public void testNodeTracker() throws Exception {
Abortable abortable = new StubAbortable(); Abortable abortable = new StubAbortable();
ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
"testNodeTracker", abortable); "testNodeTracker", abortable);
@ -209,7 +238,6 @@ public class TestZooKeeperNodeTracker {
} }
public static class TestTracker extends ZooKeeperNodeTracker { public static class TestTracker extends ZooKeeperNodeTracker {
public TestTracker(ZooKeeperWatcher watcher, String node, public TestTracker(ZooKeeperWatcher watcher, String node,
Abortable abortable) { Abortable abortable) {
super(watcher, node, abortable); super(watcher, node, abortable);