HADOOP-1813 OOME makes zombie of region server
git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@575928 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1596f8aa10
commit
83298b4721
|
@ -38,6 +38,7 @@ Trunk (unreleased changes)
|
|||
(Ning Li via Stack)
|
||||
HADOOP-1800 output should default utf8 encoding
|
||||
HADOOP-1801 When hdfs is yanked out from under hbase, hbase should go down gracefully
|
||||
HADOOP-1813 OOME makes zombie of region server
|
||||
HADOOP-1814 TestCleanRegionServerExit fails too often on Hudson
|
||||
HADOOP-1821 Replace all String.getBytes() with String.getBytes("UTF-8")
|
||||
HADOOP-1832 listTables() returns duplicate tables
|
||||
|
|
|
@ -206,7 +206,11 @@ else
|
|||
CLASS=$COMMAND
|
||||
fi
|
||||
|
||||
|
||||
# Have JVM dump heap if we run out of memory. Files will be 'launch directory'
|
||||
# and are named like the following: java_pid21612.hprof. Apparently it doesn't
|
||||
# 'cost' to have this flag enabled. Its a 1.6 flag only. See:
|
||||
# http://blogs.sun.com/alanb/entry/outofmemoryerror_looks_a_bit_better
|
||||
HBASE_OPTS="$HBASE_OPTS -XX:+HeapDumpOnOutOfMemoryError"
|
||||
HBASE_OPTS="$HBASE_OPTS -Dhadoop.log.dir=$HADOOP_LOG_DIR"
|
||||
HBASE_OPTS="$HBASE_OPTS -Dhadoop.log.file=$HADOOP_LOGFILE"
|
||||
HBASE_OPTS="$HBASE_OPTS -Dhadoop.home.dir=$HADOOP_HOME"
|
||||
|
|
|
@ -219,7 +219,6 @@ public class HLog implements HConstants {
|
|||
// cache-flush. Otherwise, the log sequence number for
|
||||
// the CACHEFLUSH operation will appear in a "newer" log file
|
||||
// than it should.
|
||||
|
||||
while(insideCacheFlush) {
|
||||
try {
|
||||
wait();
|
||||
|
@ -402,14 +401,14 @@ public class HLog implements HConstants {
|
|||
* @see #completeCacheFlush(Text, Text, long)
|
||||
*/
|
||||
synchronized long startCacheFlush() {
|
||||
while (insideCacheFlush) {
|
||||
while (this.insideCacheFlush) {
|
||||
try {
|
||||
wait();
|
||||
} catch (InterruptedException ie) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
insideCacheFlush = true;
|
||||
this.insideCacheFlush = true;
|
||||
notifyAll();
|
||||
return obtainSeqNum();
|
||||
}
|
||||
|
@ -427,7 +426,7 @@ public class HLog implements HConstants {
|
|||
return;
|
||||
}
|
||||
|
||||
if(! insideCacheFlush) {
|
||||
if (!this.insideCacheFlush) {
|
||||
throw new IOException("Impossible situation: inside " +
|
||||
"completeCacheFlush(), but 'insideCacheFlush' flag is false");
|
||||
}
|
||||
|
@ -444,6 +443,16 @@ public class HLog implements HConstants {
|
|||
insideCacheFlush = false;
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
/**
|
||||
* Abort a cache flush.
|
||||
* This method will clear waits on {@link #insideCacheFlush} but if this
|
||||
* method is called, we are losing data. TODO: Fix.
|
||||
*/
|
||||
synchronized void abort() {
|
||||
this.insideCacheFlush = false;
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
private static void usage() {
|
||||
System.err.println("Usage: java org.apache.hbase.HLog" +
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -834,37 +834,45 @@ public class HRegion implements HConstants {
|
|||
// When execution returns from snapshotMemcacheForLog() with a non-NULL
|
||||
// value, the HMemcache will have a snapshot object stored that must be
|
||||
// explicitly cleaned up using a call to deleteSnapshot().
|
||||
//
|
||||
HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log);
|
||||
if(retval == null || retval.memcacheSnapshot == null) {
|
||||
LOG.debug("Finished memcache flush; empty snapshot");
|
||||
return;
|
||||
}
|
||||
long logCacheFlushId = retval.sequenceId;
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Snapshotted memcache for region " +
|
||||
this.regionInfo.regionName + " with sequence id " + retval.sequenceId +
|
||||
" and entries " + retval.memcacheSnapshot.size());
|
||||
}
|
||||
try {
|
||||
long logCacheFlushId = retval.sequenceId;
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Snapshotted memcache for region " +
|
||||
this.regionInfo.regionName + " with sequence id " +
|
||||
retval.sequenceId + " and entries " +
|
||||
retval.memcacheSnapshot.size());
|
||||
}
|
||||
|
||||
// A. Flush memcache to all the HStores.
|
||||
// Keep running vector of all store files that includes both old and the
|
||||
// just-made new flush store file.
|
||||
for(HStore hstore: stores.values()) {
|
||||
hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
|
||||
}
|
||||
// A. Flush memcache to all the HStores.
|
||||
// Keep running vector of all store files that includes both old and the
|
||||
// just-made new flush store file.
|
||||
for(HStore hstore: stores.values()) {
|
||||
hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
|
||||
}
|
||||
|
||||
// B. Write a FLUSHCACHE-COMPLETE message to the log.
|
||||
// This tells future readers that the HStores were emitted correctly,
|
||||
// and that all updates to the log for this regionName that have lower
|
||||
// log-sequence-ids can be safely ignored.
|
||||
// B. Write a FLUSHCACHE-COMPLETE message to the log.
|
||||
// This tells future readers that the HStores were emitted correctly,
|
||||
// and that all updates to the log for this regionName that have lower
|
||||
// log-sequence-ids can be safely ignored.
|
||||
|
||||
log.completeCacheFlush(this.regionInfo.regionName,
|
||||
regionInfo.tableDesc.getName(), logCacheFlushId);
|
||||
} catch (IOException e) {
|
||||
LOG.fatal("Interrupted while flushing. Edits lost. FIX! HADOOP-1903", e);
|
||||
log.abort();
|
||||
throw e;
|
||||
} finally {
|
||||
// C. Delete the now-irrelevant memcache snapshot; its contents have been
|
||||
// dumped to disk-based HStores.
|
||||
memcache.deleteSnapshot();
|
||||
}
|
||||
|
||||
log.completeCacheFlush(this.regionInfo.regionName,
|
||||
regionInfo.tableDesc.getName(), logCacheFlushId);
|
||||
|
||||
// C. Delete the now-irrelevant memcache snapshot; its contents have been
|
||||
// dumped to disk-based HStores.
|
||||
memcache.deleteSnapshot();
|
||||
|
||||
// D. Finally notify anyone waiting on memcache to clear:
|
||||
// e.g. checkResources().
|
||||
synchronized(this) {
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import java.io.*;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* Leases
|
||||
|
@ -41,14 +42,13 @@ import java.util.*;
|
|||
public class Leases {
|
||||
protected static final Log LOG = LogFactory.getLog(Leases.class.getName());
|
||||
|
||||
protected final long leasePeriod;
|
||||
protected final long leaseCheckFrequency;
|
||||
private final LeaseMonitor leaseMonitor;
|
||||
protected final int leasePeriod;
|
||||
protected final int leaseCheckFrequency;
|
||||
private final Thread leaseMonitorThread;
|
||||
protected final Map<LeaseName, Lease> leases =
|
||||
new HashMap<LeaseName, Lease>();
|
||||
protected final TreeSet<Lease> sortedLeases = new TreeSet<Lease>();
|
||||
protected boolean running = true;
|
||||
protected AtomicBoolean stop = new AtomicBoolean(false);
|
||||
|
||||
/**
|
||||
* Creates a lease
|
||||
|
@ -57,18 +57,25 @@ public class Leases {
|
|||
* @param leaseCheckFrequency - how often the lease should be checked
|
||||
* (milliseconds)
|
||||
*/
|
||||
public Leases(long leasePeriod, long leaseCheckFrequency) {
|
||||
public Leases(final int leasePeriod, final int leaseCheckFrequency) {
|
||||
this.leasePeriod = leasePeriod;
|
||||
this.leaseCheckFrequency = leaseCheckFrequency;
|
||||
this.leaseMonitor = new LeaseMonitor();
|
||||
this.leaseMonitorThread = new Thread(leaseMonitor);
|
||||
this.leaseMonitorThread.setName("Lease.monitor");
|
||||
this.leaseMonitorThread =
|
||||
new LeaseMonitor(this.leaseCheckFrequency, this.stop);
|
||||
this.leaseMonitorThread.setDaemon(true);
|
||||
}
|
||||
|
||||
/** Starts the lease monitor */
|
||||
public void start() {
|
||||
leaseMonitorThread.start();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param name Set name on the lease checking daemon thread.
|
||||
*/
|
||||
public void setName(final String name) {
|
||||
this.leaseMonitorThread.setName(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Shuts down this lease instance when all outstanding leases expire.
|
||||
|
@ -99,8 +106,7 @@ public class Leases {
|
|||
*/
|
||||
public void close() {
|
||||
LOG.info("closing leases");
|
||||
|
||||
this.running = false;
|
||||
this.stop.set(true);
|
||||
try {
|
||||
this.leaseMonitorThread.interrupt();
|
||||
this.leaseMonitorThread.join();
|
||||
|
@ -196,37 +202,33 @@ public class Leases {
|
|||
sortedLeases.remove(lease);
|
||||
leases.remove(name);
|
||||
}
|
||||
}
|
||||
// if (LOG.isDebugEnabled()) {
|
||||
// LOG.debug("Cancel lease " + name);
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
/** LeaseMonitor is a thread that expires Leases that go on too long. */
|
||||
class LeaseMonitor implements Runnable {
|
||||
/** {@inheritDoc} */
|
||||
public void run() {
|
||||
while(running) {
|
||||
synchronized(leases) {
|
||||
synchronized(sortedLeases) {
|
||||
Lease top;
|
||||
while((sortedLeases.size() > 0)
|
||||
&& ((top = sortedLeases.first()) != null)) {
|
||||
if(top.shouldExpire()) {
|
||||
leases.remove(top.getLeaseName());
|
||||
sortedLeases.remove(top);
|
||||
top.expired();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
/**
|
||||
* LeaseMonitor is a thread that expires Leases that go on too long.
|
||||
* Its a daemon thread.
|
||||
*/
|
||||
class LeaseMonitor extends Chore {
|
||||
public LeaseMonitor(int p, AtomicBoolean s) {
|
||||
super(p, s);
|
||||
}
|
||||
|
||||
protected void chore() {
|
||||
synchronized(leases) {
|
||||
synchronized(sortedLeases) {
|
||||
Lease top;
|
||||
while((sortedLeases.size() > 0)
|
||||
&& ((top = sortedLeases.first()) != null)) {
|
||||
if(top.shouldExpire()) {
|
||||
leases.remove(top.getLeaseName());
|
||||
sortedLeases.remove(top);
|
||||
top.expired();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
try {
|
||||
Thread.sleep(leaseCheckFrequency);
|
||||
} catch (InterruptedException ie) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,7 +30,29 @@ import org.apache.hadoop.ipc.RemoteException;
|
|||
* org.apache.hadoop.ipc.RemoteException exceptions.
|
||||
*/
|
||||
public class RemoteExceptionHandler {
|
||||
private RemoteExceptionHandler(){} // not instantiable
|
||||
/* Not instantiable */
|
||||
private RemoteExceptionHandler() {super();}
|
||||
|
||||
/**
|
||||
* Examine passed IOException. See if its carrying a RemoteException. If so,
|
||||
* run {@link #decodeRemoteException(RemoteException)} on it. Otherwise,
|
||||
* pass back <code>e</code> unaltered.
|
||||
* @param e Exception to examine.
|
||||
* @return Decoded RemoteException carried by <code>e</code> or
|
||||
* <code>e</code> unaltered.
|
||||
*/
|
||||
public static IOException checkIOException(final IOException e) {
|
||||
IOException result = e;
|
||||
if (e instanceof RemoteException) {
|
||||
try {
|
||||
result = RemoteExceptionHandler.decodeRemoteException(
|
||||
(RemoteException) e);
|
||||
} catch (IOException ex) {
|
||||
result = ex;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts org.apache.hadoop.ipc.RemoteException into original exception,
|
||||
|
@ -69,10 +91,15 @@ public class RemoteExceptionHandler {
|
|||
}
|
||||
|
||||
} catch (ClassNotFoundException x) {
|
||||
// continue
|
||||
} catch (NoSuchMethodException x) {
|
||||
// continue
|
||||
} catch (IllegalAccessException x) {
|
||||
// continue
|
||||
} catch (InvocationTargetException x) {
|
||||
// continue
|
||||
} catch (InstantiationException x) {
|
||||
// continue
|
||||
}
|
||||
return i;
|
||||
}
|
||||
|
|
|
@ -77,7 +77,6 @@ public class MiniHBaseCluster implements HConstants {
|
|||
*/
|
||||
public MiniHBaseCluster(Configuration conf, int nRegionNodes,
|
||||
final boolean miniHdfsFilesystem) throws IOException {
|
||||
|
||||
this(conf, nRegionNodes, miniHdfsFilesystem, true, true);
|
||||
}
|
||||
|
||||
|
@ -127,7 +126,6 @@ public class MiniHBaseCluster implements HConstants {
|
|||
fs.mkdirs(parentdir);
|
||||
this.masterThread = startMaster(this.conf);
|
||||
this.regionThreads = startRegionServers(this.conf, nRegionNodes);
|
||||
|
||||
} catch(IOException e) {
|
||||
shutdown();
|
||||
throw e;
|
||||
|
@ -233,18 +231,22 @@ public class MiniHBaseCluster implements HConstants {
|
|||
* Starts a region server thread running
|
||||
*
|
||||
* @throws IOException
|
||||
* @return Name of regionserver started.
|
||||
*/
|
||||
public void startRegionServer() throws IOException {
|
||||
public String startRegionServer() throws IOException {
|
||||
RegionServerThread t =
|
||||
startRegionServer(this.conf, this.regionThreads.size());
|
||||
this.regionThreads.add(t);
|
||||
return t.getName();
|
||||
}
|
||||
|
||||
private static RegionServerThread startRegionServer(final Configuration c,
|
||||
final int index) throws IOException {
|
||||
|
||||
final HRegionServer hsr = new HRegionServer(c);
|
||||
RegionServerThread t = new RegionServerThread(hsr, index);
|
||||
final int index)
|
||||
throws IOException {
|
||||
final HRegionServer hrs = new HRegionServer(c);
|
||||
RegionServerThread t = new RegionServerThread(hrs, index);
|
||||
t.setName("regionserver" +
|
||||
t.getRegionServer().server.getListenerAddress().toString());
|
||||
t.start();
|
||||
return t;
|
||||
}
|
||||
|
@ -296,8 +298,9 @@ public class MiniHBaseCluster implements HConstants {
|
|||
* Wait for the specified region server to stop
|
||||
* Removes this thread from list of running threads.
|
||||
* @param serverNumber
|
||||
* @return Name of region server that just went down.
|
||||
*/
|
||||
public void waitOnRegionServer(int serverNumber) {
|
||||
public String waitOnRegionServer(int serverNumber) {
|
||||
RegionServerThread regionServerThread =
|
||||
this.regionThreads.remove(serverNumber);
|
||||
try {
|
||||
|
@ -307,6 +310,7 @@ public class MiniHBaseCluster implements HConstants {
|
|||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return regionServerThread.getName();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -353,14 +357,16 @@ public class MiniHBaseCluster implements HConstants {
|
|||
if(masterThread != null) {
|
||||
masterThread.getMaster().shutdown();
|
||||
}
|
||||
synchronized(regionServerThreads) {
|
||||
if (regionServerThreads != null) {
|
||||
for(Thread t: regionServerThreads) {
|
||||
if (t.isAlive()) {
|
||||
try {
|
||||
t.join();
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
if (regionServerThreads != null) {
|
||||
synchronized(regionServerThreads) {
|
||||
if (regionServerThreads != null) {
|
||||
for(Thread t: regionServerThreads) {
|
||||
if (t.isAlive()) {
|
||||
try {
|
||||
t.join();
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,3 +1,22 @@
|
|||
/**
|
||||
* Copyright 2007 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
|
|
@ -22,6 +22,8 @@ package org.apache.hadoop.hbase;
|
|||
import java.io.IOException;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.Logger;
|
||||
|
@ -30,6 +32,7 @@ import org.apache.log4j.Logger;
|
|||
* Tests region server failover when a region server exits.
|
||||
*/
|
||||
public class TestCleanRegionServerExit extends HBaseClusterTestCase {
|
||||
private final Log LOG = LogFactory.getLog(this.getClass());
|
||||
private HTable table;
|
||||
|
||||
/** constructor */
|
||||
|
@ -65,14 +68,13 @@ public class TestCleanRegionServerExit extends HBaseClusterTestCase {
|
|||
table.commit(lockid);
|
||||
// Start up a new region server to take over serving of root and meta
|
||||
// after we shut down the current meta/root host.
|
||||
this.cluster.startRegionServer();
|
||||
LOG.info("Started " + this.cluster.startRegionServer());
|
||||
// Now shutdown the region server and wait for it to go down.
|
||||
this.cluster.stopRegionServer(0);
|
||||
this.cluster.waitOnRegionServer(0);
|
||||
LOG.info(this.cluster.waitOnRegionServer(0) + " is down");
|
||||
|
||||
// Verify that the client can find the data after the region has been moved
|
||||
// to a different server
|
||||
|
||||
HScannerInterface scanner =
|
||||
table.obtainScanner(HConstants.COLUMN_FAMILY_ARRAY, new Text());
|
||||
|
||||
|
|
|
@ -19,6 +19,9 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import junit.framework.TestSuite;
|
||||
import junit.textui.TestRunner;
|
||||
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
|
@ -40,10 +43,8 @@ public class TestDFSAbort extends HBaseClusterTestCase {
|
|||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
HTableDescriptor desc = new HTableDescriptor(getName());
|
||||
desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY_STR));
|
||||
|
||||
HBaseAdmin admin = new HBaseAdmin(conf);
|
||||
admin.createTable(desc);
|
||||
}
|
||||
|
@ -52,14 +53,14 @@ public class TestDFSAbort extends HBaseClusterTestCase {
|
|||
* @throws Exception
|
||||
*/
|
||||
public void testDFSAbort() throws Exception {
|
||||
|
||||
// By now the Mini DFS is running, Mini HBase is running and we have
|
||||
// created a table. Now let's yank the rug out from HBase
|
||||
|
||||
cluster.getDFSCluster().shutdown();
|
||||
|
||||
// Now wait for Mini HBase Cluster to shut down
|
||||
|
||||
cluster.join();
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
TestRunner.run(new TestSuite(TestDFSAbort.class));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue