HBASE-14968 ConcurrentModificationException in region close resulting in the region staying in closing state
This commit is contained in:
parent
f0a97a1fdf
commit
3e26063161
|
@ -103,8 +103,11 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
*/
|
||||
private static Set<String> coprocessorNames =
|
||||
Collections.synchronizedSet(new HashSet<String>());
|
||||
|
||||
public static Set<String> getLoadedCoprocessors() {
|
||||
return coprocessorNames;
|
||||
synchronized (coprocessorNames) {
|
||||
return new HashSet(coprocessorNames);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -348,6 +351,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
*/
|
||||
static class EnvironmentPriorityComparator
|
||||
implements Comparator<CoprocessorEnvironment> {
|
||||
@Override
|
||||
public int compare(final CoprocessorEnvironment env1,
|
||||
final CoprocessorEnvironment env2) {
|
||||
if (env1.getPriority() < env2.getPriority()) {
|
||||
|
@ -436,14 +440,16 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
LOG.warn("Not stopping coprocessor "+impl.getClass().getName()+
|
||||
" because not active (state="+state.toString()+")");
|
||||
}
|
||||
// clean up any table references
|
||||
for (Table table: openTables) {
|
||||
try {
|
||||
((HTableWrapper)table).internalClose();
|
||||
} catch (IOException e) {
|
||||
// nothing can be done here
|
||||
LOG.warn("Failed to close " +
|
||||
table.getName(), e);
|
||||
synchronized (openTables) {
|
||||
// clean up any table references
|
||||
for (Table table: openTables) {
|
||||
try {
|
||||
((HTableWrapper)table).internalClose();
|
||||
} catch (IOException e) {
|
||||
// nothing can be done here
|
||||
LOG.warn("Failed to close " +
|
||||
table.getName(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -97,6 +97,7 @@ public abstract class EventHandler implements Runnable, Comparable<Runnable> {
|
|||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
TraceScope chunk = Trace.startSpan(this.getClass().getSimpleName(), parent);
|
||||
try {
|
||||
|
@ -183,6 +184,10 @@ public abstract class EventHandler implements Runnable, Comparable<Runnable> {
|
|||
* @param t Throwable object
|
||||
*/
|
||||
protected void handleException(Throwable t) {
|
||||
LOG.error("Caught throwable while processing event " + eventType, t);
|
||||
String msg = "Caught throwable while processing event " + eventType;
|
||||
LOG.error(msg, t);
|
||||
if (server != null) {
|
||||
server.abort(msg, t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -80,10 +80,8 @@ public class RegionReplicaFlushHandler extends EventHandler {
|
|||
|
||||
@Override
|
||||
protected void handleException(Throwable t) {
|
||||
super.handleException(t);
|
||||
|
||||
if (t instanceof InterruptedIOException || t instanceof InterruptedException) {
|
||||
// ignore
|
||||
LOG.error("Caught throwable while processing event " + eventType, t);
|
||||
} else if (t instanceof RuntimeException) {
|
||||
server.abort("ServerAborting because a runtime exception was thrown", t);
|
||||
} else {
|
||||
|
|
|
@ -19,8 +19,6 @@
|
|||
package org.apache.hadoop.hbase.executor;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.StringWriter;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
@ -29,7 +27,9 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.Waiter.Predicate;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService.Executor;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
|
@ -90,8 +90,8 @@ public class TestExecutorService {
|
|||
assertTrue(status.queuedEvents.isEmpty());
|
||||
assertEquals(5, status.running.size());
|
||||
checkStatusDump(status);
|
||||
|
||||
|
||||
|
||||
|
||||
// Now interrupt the running Executor
|
||||
synchronized (lock) {
|
||||
lock.set(false);
|
||||
|
@ -140,7 +140,7 @@ public class TestExecutorService {
|
|||
status.dumpTo(sw, "");
|
||||
String dump = sw.toString();
|
||||
LOG.info("Got status dump:\n" + dump);
|
||||
|
||||
|
||||
assertTrue(dump.contains("Waiting on java.util.concurrent.atomic.AtomicBoolean"));
|
||||
}
|
||||
|
||||
|
@ -173,5 +173,38 @@ public class TestExecutorService {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAborting() throws Exception {
|
||||
final Configuration conf = HBaseConfiguration.create();
|
||||
final Server server = mock(Server.class);
|
||||
when(server.getConfiguration()).thenReturn(conf);
|
||||
|
||||
ExecutorService executorService = new ExecutorService("unit_test");
|
||||
executorService.startExecutorService(
|
||||
ExecutorType.MASTER_SERVER_OPERATIONS, 1);
|
||||
|
||||
|
||||
executorService.submit(new EventHandler(server, EventType.M_SERVER_SHUTDOWN) {
|
||||
@Override
|
||||
public void process() throws IOException {
|
||||
throw new RuntimeException("Should cause abort");
|
||||
}
|
||||
});
|
||||
|
||||
Waiter.waitFor(conf, 30000, new Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
try {
|
||||
verify(server, times(1)).abort(anyString(), (Throwable) anyObject());
|
||||
return true;
|
||||
} catch (Throwable t) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
executorService.shutdown();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue