HBASE-4367 Deadlock in MemStore flusher due to JDK internally synchronizing on current thread
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1188555 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3c764b1e05
commit
9570f3c907
|
@ -401,6 +401,8 @@ Release 0.92.0 - Unreleased
|
|||
HBASE-3512 Shell support for listing currently loaded coprocessors (Eugene
|
||||
Koontz via apurtell)
|
||||
HBASE-4670 Fix javadoc warnings
|
||||
HBASE-4367 Deadlock in MemStore flusher due to JDK internally synchronizing
|
||||
on current thread
|
||||
|
||||
TESTS
|
||||
HBASE-4450 test for number of blocks read: to serve as baseline for expected
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
import org.apache.hadoop.hbase.util.Sleeper;
|
||||
|
||||
/**
|
||||
|
@ -33,7 +34,7 @@ import org.apache.hadoop.hbase.util.Sleeper;
|
|||
* <p>Don't subclass Chore if the task relies on being woken up for something to
|
||||
* do, such as an entry being added to a queue, etc.
|
||||
*/
|
||||
public abstract class Chore extends Thread {
|
||||
public abstract class Chore extends HasThread {
|
||||
private final Log LOG = LogFactory.getLog(this.getClass());
|
||||
private final Sleeper sleeper;
|
||||
protected final Stoppable stopper;
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
@ -575,7 +576,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
|
|||
*
|
||||
* Thread is triggered into action by {@link LruBlockCache#runEviction()}
|
||||
*/
|
||||
private static class EvictionThread extends Thread {
|
||||
private static class EvictionThread extends HasThread {
|
||||
private WeakReference<LruBlockCache> cache;
|
||||
|
||||
public EvictionThread(LruBlockCache cache) {
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
|
|||
import org.apache.hadoop.hbase.io.hfile.CacheStats;
|
||||
import org.apache.hadoop.hbase.io.hfile.Cacheable;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -323,7 +324,7 @@ public class SlabCache implements SlabItemActionWatcher, BlockCache, HeapSize {
|
|||
/*
|
||||
* Statistics thread. Periodically prints the cache statistics to the log.
|
||||
*/
|
||||
static class StatisticsThread extends Thread {
|
||||
static class StatisticsThread extends HasThread {
|
||||
SlabCache ourcache;
|
||||
|
||||
public StatisticsThread(SlabCache slabCache) {
|
||||
|
|
|
@ -189,7 +189,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
conf.getInt("hbase.master.assignment.timeoutmonitor.period", 10000),
|
||||
master,
|
||||
conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 1800000));
|
||||
Threads.setDaemonThreadRunning(timeoutMonitor,
|
||||
Threads.setDaemonThreadRunning(timeoutMonitor.getThread(),
|
||||
master.getServerName() + ".timeoutMonitor");
|
||||
this.zkTable = new ZKTable(this.master.getZooKeeper());
|
||||
this.maximumAssignmentAttempts =
|
||||
|
|
|
@ -77,6 +77,7 @@ import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
|||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
import org.apache.hadoop.hbase.util.InfoServer;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Sleeper;
|
||||
|
@ -109,7 +110,7 @@ import org.apache.zookeeper.Watcher;
|
|||
* @see HMasterRegionInterface
|
||||
* @see Watcher
|
||||
*/
|
||||
public class HMaster extends Thread
|
||||
public class HMaster extends HasThread
|
||||
implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
||||
private static final Log LOG = LogFactory.getLog(HMaster.class.getName());
|
||||
|
||||
|
@ -175,7 +176,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
// If 'true', the balancer is 'on'. If 'false', the balancer will not run.
|
||||
private volatile boolean balanceSwitch = true;
|
||||
|
||||
private Thread catalogJanitorChore;
|
||||
private CatalogJanitor catalogJanitorChore;
|
||||
private LogCleaner logCleaner;
|
||||
|
||||
private MasterCoprocessorHost cpHost;
|
||||
|
@ -479,8 +480,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
// been assigned.
|
||||
status.setStatus("Starting balancer and catalog janitor");
|
||||
this.balancerChore = getAndStartBalancerChore(this);
|
||||
this.catalogJanitorChore =
|
||||
Threads.setDaemonThreadRunning(new CatalogJanitor(this, this));
|
||||
this.catalogJanitorChore = new CatalogJanitor(this, this);
|
||||
Threads.setDaemonThreadRunning(catalogJanitorChore.getThread());
|
||||
|
||||
status.markComplete("Initialization successful");
|
||||
LOG.info("Master has completed initialization");
|
||||
|
@ -628,7 +629,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
new LogCleaner(conf.getInt("hbase.master.cleaner.interval", 60 * 1000),
|
||||
this, conf, getMasterFileSystem().getFileSystem(),
|
||||
getMasterFileSystem().getOldLogDir());
|
||||
Threads.setDaemonThreadRunning(logCleaner, n + ".oldLogCleaner");
|
||||
Threads.setDaemonThreadRunning(logCleaner.getThread(), n + ".oldLogCleaner");
|
||||
|
||||
// Put up info server.
|
||||
int port = this.conf.getInt("hbase.master.info.port", 60010);
|
||||
|
@ -678,7 +679,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
master.balance();
|
||||
}
|
||||
};
|
||||
return Threads.setDaemonThreadRunning(chore);
|
||||
return Threads.setDaemonThreadRunning(chore.getThread());
|
||||
}
|
||||
|
||||
private void stopChores() {
|
||||
|
|
|
@ -166,7 +166,7 @@ public class SplitLogManager extends ZooKeeperListener {
|
|||
}
|
||||
|
||||
public void finishInitialization() {
|
||||
Threads.setDaemonThreadRunning(timeoutMonitor, serverName +
|
||||
Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName +
|
||||
".splitLogManagerTimeoutMonitor");
|
||||
// Watcher can be null during tests with Mock'd servers.
|
||||
if (this.watcher != null) {
|
||||
|
|
|
@ -1423,10 +1423,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
|
||||
conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
|
||||
|
||||
Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller", handler);
|
||||
Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
|
||||
Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller", handler);
|
||||
Threads.setDaemonThreadRunning(this.cacheFlusher.getThread(), n + ".cacheFlusher",
|
||||
handler);
|
||||
Threads.setDaemonThreadRunning(this.compactionChecker, n +
|
||||
Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n +
|
||||
".compactionChecker", handler);
|
||||
|
||||
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
|
||||
|
@ -1647,9 +1647,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
* have already been called.
|
||||
*/
|
||||
protected void join() {
|
||||
Threads.shutdown(this.compactionChecker);
|
||||
Threads.shutdown(this.cacheFlusher);
|
||||
Threads.shutdown(this.hlogRoller);
|
||||
Threads.shutdown(this.compactionChecker.getThread());
|
||||
Threads.shutdown(this.cacheFlusher.getThread());
|
||||
Threads.shutdown(this.hlogRoller.getThread());
|
||||
if (this.compactSplitThread != null) {
|
||||
this.compactSplitThread.join();
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
|
||||
import java.util.ConcurrentModificationException;
|
||||
import java.util.HashMap;
|
||||
|
@ -50,7 +51,7 @@ import java.io.IOException;
|
|||
* can be interrupted when there is something to do, rather than the Chore
|
||||
* sleep time which is invariant.
|
||||
*/
|
||||
public class Leases extends Thread {
|
||||
public class Leases extends HasThread {
|
||||
private static final Log LOG = LogFactory.getLog(Leases.class.getName());
|
||||
private final int leasePeriod;
|
||||
private final int leaseCheckFrequency;
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
|||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -40,7 +41,7 @@ import java.util.concurrent.locks.ReentrantLock;
|
|||
* can be interrupted when there is something to do, rather than the Chore
|
||||
* sleep time which is invariant.
|
||||
*/
|
||||
class LogRoller extends Thread implements WALActionsListener {
|
||||
class LogRoller extends HasThread implements WALActionsListener {
|
||||
static final Log LOG = LogFactory.getLog(LogRoller.class);
|
||||
private final ReentrantLock rollLock = new ReentrantLock();
|
||||
private final AtomicBoolean rollLog = new AtomicBoolean(false);
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.DroppedSnapshotException;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -55,7 +56,7 @@ import com.google.common.base.Preconditions;
|
|||
*
|
||||
* @see FlushRequester
|
||||
*/
|
||||
class MemStoreFlusher extends Thread implements FlushRequester {
|
||||
class MemStoreFlusher extends HasThread implements FlushRequester {
|
||||
static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
|
||||
// These two data members go together. Any entry in the one must have
|
||||
// a corresponding entry in the other.
|
||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
import org.apache.hadoop.hbase.util.PairOfSameType;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
|
@ -443,7 +444,7 @@ public class SplitTransaction {
|
|||
* Open daughter region in its own thread.
|
||||
* If we fail, abort this hosting server.
|
||||
*/
|
||||
class DaughterOpener extends Thread {
|
||||
class DaughterOpener extends HasThread {
|
||||
private final Server server;
|
||||
private final HRegion r;
|
||||
private Throwable t = null;
|
||||
|
|
|
@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
@ -406,7 +407,7 @@ public class HLog implements Syncable {
|
|||
this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
|
||||
|
||||
logSyncerThread = new LogSyncer(this.optionalFlushInterval);
|
||||
Threads.setDaemonThreadRunning(logSyncerThread,
|
||||
Threads.setDaemonThreadRunning(logSyncerThread.getThread(),
|
||||
Thread.currentThread().getName() + ".logSyncer");
|
||||
coprocessorHost = new WALCoprocessorHost(this, conf);
|
||||
}
|
||||
|
@ -1133,7 +1134,7 @@ public class HLog implements Syncable {
|
|||
* This thread is responsible to call syncFs and buffer up the writers while
|
||||
* it happens.
|
||||
*/
|
||||
class LogSyncer extends Thread {
|
||||
class LogSyncer extends HasThread {
|
||||
|
||||
private final long optionalFlushInterval;
|
||||
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import java.lang.Thread.UncaughtExceptionHandler;
|
||||
|
||||
/**
|
||||
* Abstract class which contains a Thread and delegates the common Thread
|
||||
* methods to that instance.
|
||||
*
|
||||
* The purpose of this class is to workaround Sun JVM bug #6915621, in which
|
||||
* something internal to the JDK uses Thread.currentThread() as a monitor
|
||||
* lock. This can produce deadlocks like HBASE-4367, HBASE-4101, etc.
|
||||
*/
|
||||
public abstract class HasThread implements Runnable {
|
||||
private final Thread thread;
|
||||
|
||||
public HasThread() {
|
||||
this.thread = new Thread(this);
|
||||
}
|
||||
|
||||
public HasThread(String name) {
|
||||
this.thread = new Thread(this, name);
|
||||
}
|
||||
|
||||
public Thread getThread() {
|
||||
return thread;
|
||||
}
|
||||
|
||||
public abstract void run();
|
||||
|
||||
//// Begin delegation to Thread
|
||||
|
||||
public final String getName() {
|
||||
return thread.getName();
|
||||
}
|
||||
|
||||
public void interrupt() {
|
||||
thread.interrupt();
|
||||
}
|
||||
|
||||
public final boolean isAlive() {
|
||||
return thread.isAlive();
|
||||
}
|
||||
|
||||
public boolean isInterrupted() {
|
||||
return thread.isInterrupted();
|
||||
}
|
||||
|
||||
public final void setDaemon(boolean on) {
|
||||
thread.setDaemon(on);
|
||||
}
|
||||
|
||||
public final void setName(String name) {
|
||||
thread.setName(name);
|
||||
}
|
||||
|
||||
public final void setPriority(int newPriority) {
|
||||
thread.setPriority(newPriority);
|
||||
}
|
||||
|
||||
public void setUncaughtExceptionHandler(UncaughtExceptionHandler eh) {
|
||||
thread.setUncaughtExceptionHandler(eh);
|
||||
}
|
||||
|
||||
public void start() {
|
||||
thread.start();
|
||||
}
|
||||
|
||||
public final void join() throws InterruptedException {
|
||||
thread.join();
|
||||
}
|
||||
|
||||
public final void join(long millis, int nanos) throws InterruptedException {
|
||||
thread.join(millis, nanos);
|
||||
}
|
||||
|
||||
public final void join(long millis) throws InterruptedException {
|
||||
thread.join(millis);
|
||||
}
|
||||
//// End delegation to Thread
|
||||
}
|
|
@ -240,7 +240,7 @@ public class JVMClusterUtil {
|
|||
// The below has been replaced to debug sometime hangs on end of
|
||||
// tests.
|
||||
// this.master.join():
|
||||
Threads.threadDumpingIsAlive(t.master);
|
||||
Threads.threadDumpingIsAlive(t.master.getThread());
|
||||
} catch(InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue