HADOOP-1801 When hdfs is yanked out from under hbase, hbase should go down gracefully

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@573777 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2007-09-08 03:09:42 +00:00
parent 135670ccf4
commit 361f74c0c2
9 changed files with 532 additions and 310 deletions

View File

@ -27,6 +27,7 @@ Trunk (unreleased changes)
HADOOP-1785 TableInputFormat.TableRecordReader.next has a bug
(Ning Li via Stack)
HADOOP-1800 output should default utf8 encoding
HADOOP-1801 When hdfs is yanked out from under hbase, hbase should go down gracefully
HADOOP-1814 TestCleanRegionServerExit fails too often on Hudson
HADOOP-1821 Replace all String.getBytes() with String.getBytes("UTF-8")
HADOOP-1832 listTables() returns duplicate tables

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Writables;
@ -102,7 +103,8 @@ HMasterRegionInterface, Runnable {
long metaRescanInterval;
final AtomicReference<HServerAddress> rootRegionLocation;
final AtomicReference<HServerAddress> rootRegionLocation =
new AtomicReference<HServerAddress>();
Lock splitLogLock = new ReentrantLock();
@ -359,8 +361,10 @@ HMasterRegionInterface, Runnable {
for (Text family: split.getTableDesc().families().keySet()) {
Path p = HStoreFile.getMapDir(fs.makeQualified(dir),
split.getRegionName(), HStoreKey.extractFamily(family));
// Look for reference files. Call listPaths with an anonymous
// instance of PathFilter.
Path [] ps = fs.listPaths(p,
new PathFilter () {
public boolean accept(Path path) {
@ -368,7 +372,7 @@ HMasterRegionInterface, Runnable {
}
}
);
if (ps != null && ps.length > 0) {
result = true;
break;
@ -393,7 +397,7 @@ HMasterRegionInterface, Runnable {
}
protected void checkAssigned(final HRegionInfo info,
final String serverName, final long startCode) {
final String serverName, final long startCode) throws IOException {
// Skip region - if ...
if(info.offLine // offline
@ -445,6 +449,7 @@ HMasterRegionInterface, Runnable {
} catch (IOException e) {
LOG.warn("unable to split region server log because: ", e);
throw e;
}
}
@ -512,6 +517,14 @@ HMasterRegionInterface, Runnable {
// at least log it rather than go out silently.
LOG.error("Unexpected exception", e);
}
// We ran out of tries. Make sure the file system is still available
if (!FSUtils.isFileSystemAvailable(fs)) {
LOG.fatal("Shutting down hbase cluster: file system not available");
closed = true;
}
if (!closed) {
// sleep before retry
@ -675,6 +688,13 @@ HMasterRegionInterface, Runnable {
LOG.error("Unexpected exception", e);
}
// We ran out of tries. Make sure the file system is still available
if (!FSUtils.isFileSystemAvailable(fs)) {
LOG.fatal("Shutting down hbase cluster: file system not available");
closed = true;
}
if (!closed) {
// sleep before retry
try {
@ -829,46 +849,56 @@ HMasterRegionInterface, Runnable {
* @throws IOException
*/
public HMaster(Path dir, HServerAddress address, Configuration conf)
throws IOException {
throws IOException {
this.closed = true;
this.dir = dir;
this.conf = conf;
this.fs = FileSystem.get(conf);
this.rand = new Random();
// Make sure the root directory exists!
if(! fs.exists(dir)) {
fs.mkdirs(dir);
}
Path rootRegionDir =
HRegion.getRegionDir(dir, HGlobals.rootRegionInfo.regionName);
LOG.info("Root region dir: " + rootRegionDir.toString());
if (!fs.exists(rootRegionDir)) {
LOG.info("bootstrap: creating ROOT and first META regions");
try {
HRegion root = HRegion.createHRegion(HGlobals.rootRegionInfo, this.dir,
this.conf, null);
HRegion meta =
HRegion.createHRegion(new HRegionInfo(1L, HGlobals.metaTableDesc,
null, null), this.dir, this.conf, null);
// Add first region from the META table to the ROOT region.
HRegion.addRegionToMETA(root, meta);
root.close();
root.getLog().closeAndDelete();
meta.close();
meta.getLog().closeAndDelete();
} catch (IOException e) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
LOG.error("bootstrap", e);
try {
// Make sure the root directory exists!
if(! fs.exists(dir)) {
fs.mkdirs(dir);
}
if (!fs.exists(rootRegionDir)) {
LOG.info("bootstrap: creating ROOT and first META regions");
try {
HRegion root = HRegion.createHRegion(HGlobals.rootRegionInfo, this.dir,
this.conf, null);
HRegion meta =
HRegion.createHRegion(new HRegionInfo(1L, HGlobals.metaTableDesc,
null, null), this.dir, this.conf, null);
// Add first region from the META table to the ROOT region.
HRegion.addRegionToMETA(root, meta);
root.close();
root.getLog().closeAndDelete();
meta.close();
meta.getLog().closeAndDelete();
} catch (IOException e) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
LOG.error("bootstrap", e);
throw e;
}
}
} catch (IOException e) {
LOG.fatal("Not starting HMaster because:", e);
return;
}
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
@ -898,7 +928,6 @@ HMasterRegionInterface, Runnable {
// The root region
this.rootRegionLocation = new AtomicReference<HServerAddress>();
this.rootScanned = false;
this.rootScanner = new RootScanner();
this.rootScannerThread = new Thread(rootScanner, "HMaster.rootScanner");
@ -1038,9 +1067,15 @@ HMasterRegionInterface, Runnable {
(RemoteException) ex);
} catch (IOException e) {
ex = e;
LOG.warn("main processing loop: " + op.toString(), e);
}
}
if (!FSUtils.isFileSystemAvailable(fs)) {
LOG.fatal("Shutting down hbase cluster: file system not available");
closed = true;
break;
}
LOG.warn("Processing pending operations: " + op.toString(), ex);
try {
msgQueue.put(op);
@ -2627,6 +2662,13 @@ HMasterRegionInterface, Runnable {
} catch (IOException e) {
if (tries == numRetries - 1) {
// No retries left
if (!FSUtils.isFileSystemAvailable(fs)) {
LOG.fatal("Shutting down hbase cluster: file system not available");
closed = true;
}
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException(
(RemoteException) e);
@ -2692,7 +2734,7 @@ HMasterRegionInterface, Runnable {
@Override
protected void postProcessMeta(MetaRegion m, HRegionInterface server)
throws IOException {
throws IOException {
// Process regions not being served
@ -2719,32 +2761,9 @@ HMasterRegionInterface, Runnable {
updateRegionInfo(b, i);
b.delete(lockid, COL_SERVER);
b.delete(lockid, COL_STARTCODE);
for (int tries = 0; tries < numRetries; tries++) {
try {
server.batchUpdate(m.getRegionName(), System.currentTimeMillis(), b);
if (LOG.isDebugEnabled()) {
LOG.debug("updated columns in row: " + i.regionName);
}
break;
} catch (IOException e) {
if (tries == numRetries - 1) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException(
(RemoteException) e);
}
LOG.error("column update failed in row: " + i.regionName, e);
break;
}
}
try {
Thread.sleep(threadWakeFrequency);
} catch (InterruptedException e) {
// continue
}
server.batchUpdate(m.getRegionName(), System.currentTimeMillis(), b);
if (LOG.isDebugEnabled()) {
LOG.debug("updated columns in row: " + i.regionName);
}
if (online) { // Bring offline regions on-line
@ -2795,7 +2814,7 @@ HMasterRegionInterface, Runnable {
}
protected void updateRegionInfo(final BatchUpdate b, final HRegionInfo i)
throws IOException {
throws IOException {
i.offLine = !online;
b.put(lockid, COL_REGIONINFO, Writables.getBytes(i));
@ -2815,7 +2834,7 @@ HMasterRegionInterface, Runnable {
@Override
protected void postProcessMeta(MetaRegion m, HRegionInterface server)
throws IOException {
throws IOException {
// For regions that are being served, mark them for deletion
@ -2853,6 +2872,7 @@ HMasterRegionInterface, Runnable {
}
private abstract class ColumnOperation extends TableOperation {
protected ColumnOperation(Text tableName) throws IOException {
super(tableName);
}
@ -2874,31 +2894,9 @@ HMasterRegionInterface, Runnable {
BatchUpdate b = new BatchUpdate(rand.nextLong());
long lockid = b.startUpdate(i.regionName);
b.put(lockid, COL_REGIONINFO, Writables.getBytes(i));
for (int tries = 0; tries < numRetries; tries++) {
try {
server.batchUpdate(regionName, System.currentTimeMillis(), b);
if (LOG.isDebugEnabled()) {
LOG.debug("updated columns in row: " + i.regionName);
}
break;
} catch (IOException e) {
if (tries == numRetries - 1) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
LOG.error("column update failed in row: " + i.regionName, e);
break;
}
}
try {
Thread.sleep(threadWakeFrequency);
} catch (InterruptedException e) {
// continue
}
server.batchUpdate(regionName, System.currentTimeMillis(), b);
if (LOG.isDebugEnabled()) {
LOG.debug("updated columns in row: " + i.regionName);
}
}
}
@ -2914,7 +2912,7 @@ HMasterRegionInterface, Runnable {
@Override
protected void postProcessMeta(MetaRegion m, HRegionInterface server)
throws IOException {
throws IOException {
for (HRegionInfo i: unservedRegions) {
i.tableDesc.families().remove(columnName);
@ -2922,27 +2920,8 @@ HMasterRegionInterface, Runnable {
// Delete the directories used by the column
try {
fs.delete(HStoreFile.getMapDir(dir, i.regionName, columnName));
} catch (IOException e) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException(
(RemoteException) e);
}
LOG.error("", e);
}
try {
fs.delete(HStoreFile.getInfoDir(dir, i.regionName, columnName));
} catch (IOException e) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException(
(RemoteException) e);
}
LOG.error("", e);
}
fs.delete(HStoreFile.getMapDir(dir, i.regionName, columnName));
fs.delete(HStoreFile.getInfoDir(dir, i.regionName, columnName));
}
}
}
@ -2958,7 +2937,7 @@ HMasterRegionInterface, Runnable {
@Override
protected void postProcessMeta(MetaRegion m, HRegionInterface server)
throws IOException {
throws IOException {
for (HRegionInfo i: unservedRegions) {

View File

@ -334,7 +334,9 @@ public class HMemcache {
} catch (RuntimeException ex) {
LOG.error("error initializing HMemcache scanner: ", ex);
close();
throw ex;
IOException e = new IOException("error initializing HMemcache scanner");
e.initCause(ex);
throw e;
} catch(IOException ex) {
LOG.error("error initializing HMemcache scanner: ", ex);

View File

@ -52,20 +52,20 @@ import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.BatchOperation;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Writables;
/*******************************************************************************
/**
* HRegionServer makes a set of HRegions available to clients. It checks in with
* the HMaster. There are many HRegionServers in a single HBase deployment.
******************************************************************************/
*/
public class HRegionServer implements HConstants, HRegionInterface, Runnable {
/**
* {@inheritDoc}
*/
/** {@inheritDoc} */
public long getProtocolVersion(final String protocol,
@SuppressWarnings("unused") final long clientVersion)
throws IOException {
throws IOException {
if (protocol.equals(HRegionInterface.class.getName())) {
return HRegionInterface.versionID;
}
@ -79,7 +79,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// of HRegionServer in isolation.
protected volatile boolean stopRequested;
// Go down hard. Used debugging and in unit tests.
// Go down hard. Used if file system becomes unavailable and also in
// debugging and unit tests.
protected volatile boolean abortRequested;
final Path rootDir;
@ -146,23 +147,23 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
* {@inheritDoc}
*/
public void run() {
while(!stopRequested) {
while (!stopRequested) {
long startTime = System.currentTimeMillis();
synchronized(splitOrCompactLock) { // Don't interrupt us while we're working
synchronized (splitOrCompactLock) { // Don't interrupt us while we're working
// Grab a list of regions to check
Vector<HRegion> regionsToCheck = new Vector<HRegion>();
ArrayList<HRegion> regionsToCheck = new ArrayList<HRegion>();
lock.readLock().lock();
try {
regionsToCheck.addAll(onlineRegions.values());
} finally {
lock.readLock().unlock();
}
try {
for(HRegion cur: regionsToCheck) {
if(cur.isClosed()) {
// Skip if closed
continue;
}
for(HRegion cur: regionsToCheck) {
if(cur.isClosed()) {
// Skip if closed
continue;
}
try {
if (cur.needsCompaction()) {
cur.compactStores();
}
@ -172,10 +173,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
if (cur.needsSplit(midKey)) {
split(cur, midKey);
}
} catch(IOException e) {
//TODO: What happens if this fails? Are we toast?
LOG.error("Split or compaction failed", e);
if (!checkFileSystem()) {
break;
}
}
} catch(IOException e) {
//TODO: What happens if this fails? Are we toast?
LOG.error("What happens if this fails? Are we toast?", e);
}
}
@ -198,7 +202,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
private void split(final HRegion region, final Text midKey)
throws IOException {
throws IOException {
final HRegionInfo oldRegionInfo = region.getRegionInfo();
final HRegion[] newRegions = region.closeAndSplit(midKey, this);
@ -286,7 +291,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
synchronized(cacheFlusherLock) {
// Grab a list of items to flush
Vector<HRegion> toFlush = new Vector<HRegion>();
ArrayList<HRegion> toFlush = new ArrayList<HRegion>();
lock.readLock().lock();
try {
toFlush.addAll(onlineRegions.values());
@ -310,7 +315,10 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
iex = x;
}
}
LOG.error("", iex);
LOG.error("Cache flush failed", iex);
if (!checkFileSystem()) {
break;
}
}
}
}
@ -332,7 +340,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// File paths
private FileSystem fs;
FileSystem fs;
// Logging
@ -368,7 +376,10 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
iex = x;
}
}
LOG.warn("", iex);
LOG.error("Log rolling failed", iex);
if (!checkFileSystem()) {
break;
}
}
}
}
@ -737,7 +748,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
e = ex;
}
}
LOG.warn("Abort close of log", e);
LOG.error("Unable to close log in abort", e);
}
closeAllRegions(); // Don't leave any open file handles
LOG.info("aborting server at: " +
@ -902,6 +913,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
} else {
LOG.error("unable to process message: " + e.msg.toString(), ie);
if (!checkFileSystem()) {
break;
}
}
}
}
@ -973,117 +987,246 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
return regionsToClose;
}
//////////////////////////////////////////////////////////////////////////////
//
// HRegionInterface
//////////////////////////////////////////////////////////////////////////////
//
/** {@inheritDoc} */
public HRegionInfo getRegionInfo(final Text regionName)
throws NotServingRegionException {
throws NotServingRegionException {
requestCount.incrementAndGet();
return getRegion(regionName).getRegionInfo();
}
/** {@inheritDoc} */
public byte [] get(final Text regionName, final Text row,
final Text column)
throws IOException {
final Text column) throws IOException {
requestCount.incrementAndGet();
return getRegion(regionName).get(row, column);
try {
return getRegion(regionName).get(row, column);
} catch (IOException e) {
checkFileSystem();
throw e;
}
}
/** {@inheritDoc} */
public byte [][] get(final Text regionName, final Text row,
final Text column, final int numVersions)
throws IOException {
final Text column, final int numVersions) throws IOException {
requestCount.incrementAndGet();
return getRegion(regionName).get(row, column, numVersions);
try {
return getRegion(regionName).get(row, column, numVersions);
} catch (IOException e) {
checkFileSystem();
throw e;
}
}
/** {@inheritDoc} */
public byte [][] get(final Text regionName, final Text row, final Text column,
final long timestamp, final int numVersions) throws IOException {
requestCount.incrementAndGet();
return getRegion(regionName).get(row, column, timestamp, numVersions);
try {
return getRegion(regionName).get(row, column, timestamp, numVersions);
} catch (IOException e) {
checkFileSystem();
throw e;
}
}
/** {@inheritDoc} */
public MapWritable getRow(final Text regionName, final Text row)
throws IOException {
throws IOException {
requestCount.incrementAndGet();
HRegion region = getRegion(regionName);
MapWritable result = new MapWritable();
TreeMap<Text, byte[]> map = region.getFull(row);
for (Map.Entry<Text, byte []> es: map.entrySet()) {
result.put(new HStoreKey(row, es.getKey()),
new ImmutableBytesWritable(es.getValue()));
try {
HRegion region = getRegion(regionName);
MapWritable result = new MapWritable();
TreeMap<Text, byte[]> map = region.getFull(row);
for (Map.Entry<Text, byte []> es: map.entrySet()) {
result.put(new HStoreKey(row, es.getKey()),
new ImmutableBytesWritable(es.getValue()));
}
return result;
} catch (IOException e) {
checkFileSystem();
throw e;
}
return result;
}
/** {@inheritDoc} */
public MapWritable next(final long scannerId)
throws IOException {
public MapWritable next(final long scannerId) throws IOException {
requestCount.incrementAndGet();
String scannerName = String.valueOf(scannerId);
HInternalScannerInterface s = scanners.get(scannerName);
if (s == null) {
throw new UnknownScannerException("Name: " + scannerName);
}
leases.renewLease(scannerId, scannerId);
// Collect values to be returned here
MapWritable values = new MapWritable();
// Keep getting rows until we find one that has at least one non-deleted column value
HStoreKey key = new HStoreKey();
TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
while (s.next(key, results)) {
for(Map.Entry<Text, byte []> e: results.entrySet()) {
HStoreKey k = new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp());
byte [] val = e.getValue();
if (HGlobals.deleteBytes.compareTo(val) == 0) {
// Column value is deleted. Don't return it.
continue;
try {
String scannerName = String.valueOf(scannerId);
HInternalScannerInterface s = scanners.get(scannerName);
if (s == null) {
throw new UnknownScannerException("Name: " + scannerName);
}
leases.renewLease(scannerId, scannerId);
// Collect values to be returned here
MapWritable values = new MapWritable();
// Keep getting rows until we find one that has at least one non-deleted column value
HStoreKey key = new HStoreKey();
TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
while (s.next(key, results)) {
for(Map.Entry<Text, byte []> e: results.entrySet()) {
HStoreKey k = new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp());
byte [] val = e.getValue();
if (HGlobals.deleteBytes.compareTo(val) == 0) {
// Column value is deleted. Don't return it.
continue;
}
values.put(k, new ImmutableBytesWritable(val));
}
values.put(k, new ImmutableBytesWritable(val));
if(values.size() > 0) {
// Row has something in it. Return the value.
break;
}
// No data for this row, go get another.
results.clear();
}
return values;
if(values.size() > 0) {
// Row has something in it. Return the value.
break;
}
// No data for this row, go get another.
results.clear();
} catch (IOException e) {
checkFileSystem();
throw e;
}
return values;
}
/** {@inheritDoc} */
public void batchUpdate(Text regionName, long timestamp, BatchUpdate b)
throws IOException {
throws IOException {
requestCount.incrementAndGet();
long lockid = startUpdate(regionName, b.getRow());
for(BatchOperation op: b) {
switch(op.getOp()) {
case BatchOperation.PUT_OP:
put(regionName, lockid, op.getColumn(), op.getValue());
break;
try {
long lockid = startUpdate(regionName, b.getRow());
for(BatchOperation op: b) {
switch(op.getOp()) {
case BatchOperation.PUT_OP:
put(regionName, lockid, op.getColumn(), op.getValue());
break;
case BatchOperation.DELETE_OP:
delete(regionName, lockid, op.getColumn());
break;
case BatchOperation.DELETE_OP:
delete(regionName, lockid, op.getColumn());
break;
}
}
commit(regionName, lockid, timestamp);
} catch (IOException e) {
checkFileSystem();
throw e;
}
commit(regionName, lockid, timestamp);
}
protected long startUpdate(Text regionName, Text row)
throws IOException {
//
// remote scanner interface
//
/** {@inheritDoc} */
public long openScanner(Text regionName, Text[] cols, Text firstRow,
final long timestamp, final RowFilterInterface filter)
throws IOException {
requestCount.incrementAndGet();
try {
HRegion r = getRegion(regionName);
long scannerId = -1L;
HInternalScannerInterface s =
r.getScanner(cols, firstRow, timestamp, filter);
scannerId = rand.nextLong();
String scannerName = String.valueOf(scannerId);
synchronized(scanners) {
scanners.put(scannerName, s);
}
leases.createLease(scannerId, scannerId, new ScannerListener(scannerName));
return scannerId;
} catch (IOException e) {
if (e instanceof RemoteException) {
try {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
} catch (IOException x) {
e = x;
}
}
LOG.error("", e);
checkFileSystem();
throw e;
}
}
/** {@inheritDoc} */
public void close(final long scannerId) throws IOException {
requestCount.incrementAndGet();
try {
String scannerName = String.valueOf(scannerId);
HInternalScannerInterface s = null;
synchronized(scanners) {
s = scanners.remove(scannerName);
}
if(s == null) {
throw new UnknownScannerException(scannerName);
}
s.close();
leases.cancelLease(scannerId, scannerId);
} catch (IOException e) {
checkFileSystem();
throw e;
}
}
Map<String, HInternalScannerInterface> scanners =
Collections.synchronizedMap(new HashMap<String,
HInternalScannerInterface>());
/**
* Instantiated as a scanner lease.
* If the lease times out, the scanner is closed
*/
private class ScannerListener implements LeaseListener {
private final String scannerName;
ScannerListener(final String n) {
this.scannerName = n;
}
/** {@inheritDoc} */
public void leaseExpired() {
LOG.info("Scanner " + this.scannerName + " lease expired");
HInternalScannerInterface s = null;
synchronized(scanners) {
s = scanners.remove(this.scannerName);
}
if (s != null) {
s.close();
}
}
}
//
// Methods that do the actual work for the remote API
//
protected long startUpdate(Text regionName, Text row) throws IOException {
HRegion region = getRegion(regionName);
return region.startUpdate(row);
@ -1097,7 +1240,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
protected void delete(Text regionName, long lockid, Text column)
throws IOException {
throws IOException {
HRegion region = getRegion(regionName);
region.delete(lockid, column);
@ -1117,7 +1260,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
* @throws NotServingRegionException
*/
protected HRegion getRegion(final Text regionName)
throws NotServingRegionException {
throws NotServingRegionException {
return getRegion(regionName, false);
}
@ -1129,9 +1273,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
* @return {@link HRegion} for <code>regionName</code>
* @throws NotServingRegionException
*/
protected HRegion getRegion(final Text regionName,
final boolean checkRetiringRegions)
throws NotServingRegionException {
protected HRegion getRegion(final Text regionName,
final boolean checkRetiringRegions) throws NotServingRegionException {
HRegion region = null;
this.lock.readLock().lock();
try {
@ -1154,91 +1298,28 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
this.lock.readLock().unlock();
}
}
//////////////////////////////////////////////////////////////////////////////
// remote scanner interface
//////////////////////////////////////////////////////////////////////////////
Map<String, HInternalScannerInterface> scanners =
Collections.synchronizedMap(new HashMap<String,
HInternalScannerInterface>());
/**
* Instantiated as a scanner lease.
* If the lease times out, the scanner is closed
*/
private class ScannerListener implements LeaseListener {
private final String scannerName;
ScannerListener(final String n) {
this.scannerName = n;
}
/**
* {@inheritDoc}
*/
public void leaseExpired() {
LOG.info("Scanner " + this.scannerName + " lease expired");
HInternalScannerInterface s = null;
synchronized(scanners) {
s = scanners.remove(this.scannerName);
}
if (s != null) {
s.close();
}
}
}
/**
* {@inheritDoc}
* Checks to see if the file system is still accessible.
* If not, sets abortRequested and stopRequested
*
* @return false if file system is not available
*/
public long openScanner(Text regionName, Text[] cols, Text firstRow,
final long timestamp, final RowFilterInterface filter)
throws IOException {
requestCount.incrementAndGet();
HRegion r = getRegion(regionName);
long scannerId = -1L;
try {
HInternalScannerInterface s =
r.getScanner(cols, firstRow, timestamp, filter);
scannerId = rand.nextLong();
String scannerName = String.valueOf(scannerId);
synchronized(scanners) {
scanners.put(scannerName, s);
}
leases.createLease(scannerId, scannerId,
new ScannerListener(scannerName));
} catch (IOException e) {
if (e instanceof RemoteException) {
try {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
} catch (IOException x) {
e = x;
}
}
LOG.error("", e);
throw e;
protected boolean checkFileSystem() {
boolean fsOk = true;
if (!FSUtils.isFileSystemAvailable(fs)) {
LOG.fatal("Shutting down HRegionServer: file system not available");
abortRequested = true;
stopRequested = true;
fsOk = false;
}
return scannerId;
}
/**
* {@inheritDoc}
*/
public void close(final long scannerId) throws IOException {
requestCount.incrementAndGet();
String scannerName = String.valueOf(scannerId);
HInternalScannerInterface s = null;
synchronized(scanners) {
s = scanners.remove(scannerName);
}
if(s == null) {
throw new UnknownScannerException(scannerName);
}
s.close();
leases.cancelLease(scannerId, scannerId);
return fsOk;
}
//
// Main program and support routines
//
private static void printUsageAndExit() {
printUsageAndExit(null);
}

View File

@ -1298,6 +1298,9 @@ class HStore implements HConstants {
} catch (Exception ex) {
LOG.error("Failed construction", ex);
close();
IOException e = new IOException("HStoreScanner failed construction");
e.initCause(ex);
throw e;
}
}

View File

@ -0,0 +1,63 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.dfs.DistributedFileSystem;
/**
* Utility methods for interacting with the underlying file system.
*/
public class FSUtils {
private static final Log LOG = LogFactory.getLog(FSUtils.class);
private FSUtils() {} // not instantiable
/**
* Checks to see if the specified file system is available
*
* @param fs
* @return true if the specified file system is available.
*/
public static boolean isFileSystemAvailable(FileSystem fs) {
boolean available = false;
if (fs instanceof DistributedFileSystem) {
try {
if (((DistributedFileSystem) fs).getDataNodeStats().length > 0) {
available = true;
} else {
LOG.fatal("file system unavailable: no data nodes");
}
} catch (IOException e) {
LOG.fatal("file system unavailable because: ", e);
}
} else {
available = true;
}
return available;
}
}

View File

@ -309,6 +309,32 @@ public class MiniHBaseCluster implements HConstants {
}
}
/**
* Wait for Mini HBase Cluster to shut down.
*/
public void join() {
if (regionThreads != null) {
synchronized(regionThreads) {
for(Thread t: regionThreads) {
if (t.isAlive()) {
try {
t.join();
} catch (InterruptedException e) {
// continue
}
}
}
}
}
if (masterThread != null && masterThread.isAlive()) {
try {
masterThread.join();
} catch(InterruptedException e) {
// continue
}
}
}
/**
* Shut down HBase cluster started by calling
* {@link #startMaster(Configuration)} and then

View File

@ -61,31 +61,32 @@ public class StaticTestEnvironment {
value = System.getenv("DEBUGGING");
if(value != null && value.equalsIgnoreCase("TRUE")) {
debugging = true;
}
Logger rootLogger = Logger.getRootLogger();
// rootLogger.setLevel(Level.WARN);
Logger rootLogger = Logger.getRootLogger();
rootLogger.setLevel(Level.WARN);
Level logLevel = Level.INFO;
value = System.getenv("LOGGING_LEVEL");
if(value != null && value.length() != 0) {
if(value.equalsIgnoreCase("ALL")) {
logLevel = Level.ALL;
} else if(value.equalsIgnoreCase("DEBUG")) {
logLevel = Level.DEBUG;
} else if(value.equalsIgnoreCase("ERROR")) {
logLevel = Level.ERROR;
} else if(value.equalsIgnoreCase("FATAL")) {
logLevel = Level.FATAL;
} else if(value.equalsIgnoreCase("INFO")) {
logLevel = Level.INFO;
} else if(value.equalsIgnoreCase("OFF")) {
logLevel = Level.OFF;
} else if(value.equalsIgnoreCase("TRACE")) {
logLevel = Level.TRACE;
} else if(value.equalsIgnoreCase("WARN")) {
logLevel = Level.WARN;
}
Level logLevel = Level.DEBUG;
value = System.getenv("LOGGING_LEVEL");
if(value != null && value.length() != 0) {
if(value.equalsIgnoreCase("ALL")) {
logLevel = Level.ALL;
} else if(value.equalsIgnoreCase("DEBUG")) {
logLevel = Level.DEBUG;
} else if(value.equalsIgnoreCase("ERROR")) {
logLevel = Level.ERROR;
} else if(value.equalsIgnoreCase("FATAL")) {
logLevel = Level.FATAL;
} else if(value.equalsIgnoreCase("INFO")) {
logLevel = Level.INFO;
} else if(value.equalsIgnoreCase("OFF")) {
logLevel = Level.OFF;
} else if(value.equalsIgnoreCase("TRACE")) {
logLevel = Level.TRACE;
} else if(value.equalsIgnoreCase("WARN")) {
logLevel = Level.WARN;
}
ConsoleAppender consoleAppender = null;
for(Enumeration<Appender> e = rootLogger.getAllAppenders();
e.hasMoreElements();) {
@ -103,8 +104,8 @@ public class StaticTestEnvironment {
consoleLayout.setConversionPattern("%d %-5p [%t] %l: %m%n");
}
}
Logger.getLogger(
HBaseTestCase.class.getPackage().getName()).setLevel(logLevel);
}
Logger.getLogger(
HBaseTestCase.class.getPackage().getName()).setLevel(logLevel);
}
}

View File

@ -0,0 +1,66 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
/**
* Test ability of HBase to handle DFS failure
*/
public class TestDFSAbort extends HBaseClusterTestCase {
/** constructor */
public TestDFSAbort() {
super();
conf.setInt("ipc.client.timeout", 5000); // reduce ipc client timeout
conf.setInt("ipc.client.connect.max.retries", 5); // and number of retries
conf.setInt("hbase.client.retries.number", 5); // reduce HBase retries
Logger.getRootLogger().setLevel(Level.WARN);
Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG);
}
/** {@inheritDoc} */
@Override
public void setUp() throws Exception {
super.setUp();
HTableDescriptor desc = new HTableDescriptor(getName());
desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY_STR));
HBaseAdmin admin = new HBaseAdmin(conf);
admin.createTable(desc);
}
/**
* @throws Exception
*/
public void testDFSAbort() throws Exception {
// By now the Mini DFS is running, Mini HBase is running and we have
// created a table. Now let's yank the rug out from HBase
cluster.getDFSCluster().shutdown();
// Now wait for Mini HBase Cluster to shut down
cluster.join();
}
}