HBASE-12192 Remove EventHandlerListener
This commit is contained in:
parent
49056295ef
commit
451798cefb
|
@ -48,11 +48,6 @@ import org.htrace.TraceScope;
|
||||||
* hbase executor, see ExecutorService, has a switch for passing
|
* hbase executor, see ExecutorService, has a switch for passing
|
||||||
* event type to executor.
|
* event type to executor.
|
||||||
* <p>
|
* <p>
|
||||||
* Event listeners can be installed and will be called pre- and post- process if
|
|
||||||
* this EventHandler is run in a Thread (its a Runnable so if its {@link #run()}
|
|
||||||
* method gets called). Implement
|
|
||||||
* {@link EventHandlerListener}s, and registering using
|
|
||||||
* {@link #setListener(EventHandlerListener)}.
|
|
||||||
* @see ExecutorService
|
* @see ExecutorService
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
@ -70,30 +65,11 @@ public abstract class EventHandler implements Runnable, Comparable<Runnable> {
|
||||||
// sequence id for this event
|
// sequence id for this event
|
||||||
private final long seqid;
|
private final long seqid;
|
||||||
|
|
||||||
// Listener to call pre- and post- processing. May be null.
|
|
||||||
private EventHandlerListener listener;
|
|
||||||
|
|
||||||
// Time to wait for events to happen, should be kept short
|
// Time to wait for events to happen, should be kept short
|
||||||
protected int waitingTimeForEvents;
|
protected int waitingTimeForEvents;
|
||||||
|
|
||||||
private final Span parent;
|
private final Span parent;
|
||||||
|
|
||||||
/**
|
|
||||||
* This interface provides pre- and post-process hooks for events.
|
|
||||||
*/
|
|
||||||
public interface EventHandlerListener {
|
|
||||||
/**
|
|
||||||
* Called before any event is processed
|
|
||||||
* @param event The event handler whose process method is about to be called.
|
|
||||||
*/
|
|
||||||
void beforeProcess(EventHandler event);
|
|
||||||
/**
|
|
||||||
* Called after any event is processed
|
|
||||||
* @param event The event handler whose process method is about to be called.
|
|
||||||
*/
|
|
||||||
void afterProcess(EventHandler event);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default base class constructor.
|
* Default base class constructor.
|
||||||
*/
|
*/
|
||||||
|
@ -124,9 +100,7 @@ public abstract class EventHandler implements Runnable, Comparable<Runnable> {
|
||||||
public void run() {
|
public void run() {
|
||||||
TraceScope chunk = Trace.startSpan(this.getClass().getSimpleName(), parent);
|
TraceScope chunk = Trace.startSpan(this.getClass().getSimpleName(), parent);
|
||||||
try {
|
try {
|
||||||
if (getListener() != null) getListener().beforeProcess(this);
|
|
||||||
process();
|
process();
|
||||||
if (getListener() != null) getListener().afterProcess(this);
|
|
||||||
} catch(Throwable t) {
|
} catch(Throwable t) {
|
||||||
handleException(t);
|
handleException(t);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -187,20 +161,6 @@ public abstract class EventHandler implements Runnable, Comparable<Runnable> {
|
||||||
return (this.seqid < eh.seqid) ? -1 : 1;
|
return (this.seqid < eh.seqid) ? -1 : 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @return Current listener or null if none set.
|
|
||||||
*/
|
|
||||||
public synchronized EventHandlerListener getListener() {
|
|
||||||
return listener;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param listener Listener to call pre- and post- {@link #process()}.
|
|
||||||
*/
|
|
||||||
public synchronized void setListener(EventHandlerListener listener) {
|
|
||||||
this.listener = listener;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "Event #" + getSeqid() +
|
return "Event #" + getSeqid() +
|
||||||
|
|
|
@ -35,7 +35,6 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener;
|
|
||||||
import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
|
import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
@ -52,10 +51,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
* call {@link #shutdown()}.
|
* call {@link #shutdown()}.
|
||||||
*
|
*
|
||||||
* <p>In order to use the service created above, call
|
* <p>In order to use the service created above, call
|
||||||
* {@link #submit(EventHandler)}. Register pre- and post- processing listeners
|
* {@link #submit(EventHandler)}.
|
||||||
* by registering your implementation of {@link EventHandler.EventHandlerListener}
|
|
||||||
* with {@link #registerListener(EventType, EventHandler.EventHandlerListener)}. Be sure
|
|
||||||
* to deregister your listener when done via {@link #unregisterListener(EventType)}.
|
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ExecutorService {
|
public class ExecutorService {
|
||||||
|
@ -65,10 +61,6 @@ public class ExecutorService {
|
||||||
private final ConcurrentHashMap<String, Executor> executorMap =
|
private final ConcurrentHashMap<String, Executor> executorMap =
|
||||||
new ConcurrentHashMap<String, Executor>();
|
new ConcurrentHashMap<String, Executor>();
|
||||||
|
|
||||||
// listeners that are called before and after an event is processed
|
|
||||||
private ConcurrentHashMap<EventType, EventHandlerListener> eventHandlerListeners =
|
|
||||||
new ConcurrentHashMap<EventType, EventHandlerListener>();
|
|
||||||
|
|
||||||
// Name of the server hosting this executor service.
|
// Name of the server hosting this executor service.
|
||||||
private final String servername;
|
private final String servername;
|
||||||
|
|
||||||
|
@ -91,7 +83,7 @@ public class ExecutorService {
|
||||||
throw new RuntimeException("An executor service with the name " + name +
|
throw new RuntimeException("An executor service with the name " + name +
|
||||||
" is already running!");
|
" is already running!");
|
||||||
}
|
}
|
||||||
Executor hbes = new Executor(name, maxThreads, this.eventHandlerListeners);
|
Executor hbes = new Executor(name, maxThreads);
|
||||||
if (this.executorMap.putIfAbsent(name, hbes) != null) {
|
if (this.executorMap.putIfAbsent(name, hbes) != null) {
|
||||||
throw new RuntimeException("An executor service with the name " + name +
|
throw new RuntimeException("An executor service with the name " + name +
|
||||||
" is already running (2)!");
|
" is already running (2)!");
|
||||||
|
@ -130,7 +122,7 @@ public class ExecutorService {
|
||||||
String name = type.getExecutorName(this.servername);
|
String name = type.getExecutorName(this.servername);
|
||||||
if (isExecutorServiceRunning(name)) {
|
if (isExecutorServiceRunning(name)) {
|
||||||
LOG.debug("Executor service " + toString() + " already running on " +
|
LOG.debug("Executor service " + toString() + " already running on " +
|
||||||
this.servername);
|
this.servername);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
startExecutorService(name, maxThreads);
|
startExecutorService(name, maxThreads);
|
||||||
|
@ -149,28 +141,6 @@ public class ExecutorService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Subscribe to updates before and after processing instances of
|
|
||||||
* {@link EventType}. Currently only one listener per
|
|
||||||
* event type.
|
|
||||||
* @param type Type of event we're registering listener for
|
|
||||||
* @param listener The listener to run.
|
|
||||||
*/
|
|
||||||
public void registerListener(final EventType type,
|
|
||||||
final EventHandlerListener listener) {
|
|
||||||
this.eventHandlerListeners.put(type, listener);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Stop receiving updates before and after processing instances of
|
|
||||||
* {@link EventType}
|
|
||||||
* @param type Type of event we're registering listener for
|
|
||||||
* @return The listener we removed or null if we did not remove it.
|
|
||||||
*/
|
|
||||||
public EventHandlerListener unregisterListener(final EventType type) {
|
|
||||||
return this.eventHandlerListeners.remove(type);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Map<String, ExecutorStatus> getAllExecutorStatuses() {
|
public Map<String, ExecutorStatus> getAllExecutorStatuses() {
|
||||||
Map<String, ExecutorStatus> ret = Maps.newHashMap();
|
Map<String, ExecutorStatus> ret = Maps.newHashMap();
|
||||||
for (Map.Entry<String, Executor> e : executorMap.entrySet()) {
|
for (Map.Entry<String, Executor> e : executorMap.entrySet()) {
|
||||||
|
@ -190,15 +160,12 @@ public class ExecutorService {
|
||||||
// work queue to use - unbounded queue
|
// work queue to use - unbounded queue
|
||||||
final BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
|
final BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
|
||||||
private final String name;
|
private final String name;
|
||||||
private final Map<EventType, EventHandlerListener> eventHandlerListeners;
|
|
||||||
private static final AtomicLong seqids = new AtomicLong(0);
|
private static final AtomicLong seqids = new AtomicLong(0);
|
||||||
private final long id;
|
private final long id;
|
||||||
|
|
||||||
protected Executor(String name, int maxThreads,
|
protected Executor(String name, int maxThreads) {
|
||||||
final Map<EventType, EventHandlerListener> eventHandlerListeners) {
|
|
||||||
this.id = seqids.incrementAndGet();
|
this.id = seqids.incrementAndGet();
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.eventHandlerListeners = eventHandlerListeners;
|
|
||||||
// create the thread pool executor
|
// create the thread pool executor
|
||||||
this.threadPoolExecutor = new TrackingThreadPoolExecutor(
|
this.threadPoolExecutor = new TrackingThreadPoolExecutor(
|
||||||
maxThreads, maxThreads,
|
maxThreads, maxThreads,
|
||||||
|
@ -216,11 +183,6 @@ public class ExecutorService {
|
||||||
void submit(final EventHandler event) {
|
void submit(final EventHandler event) {
|
||||||
// If there is a listener for this type, make sure we call the before
|
// If there is a listener for this type, make sure we call the before
|
||||||
// and after process methods.
|
// and after process methods.
|
||||||
EventHandlerListener listener =
|
|
||||||
this.eventHandlerListeners.get(event.getEventType());
|
|
||||||
if (listener != null) {
|
|
||||||
event.setListener(listener);
|
|
||||||
}
|
|
||||||
this.threadPoolExecutor.execute(event);
|
this.threadPoolExecutor.execute(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -172,7 +172,7 @@ import com.google.protobuf.ServiceException;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
public class MasterRpcServices extends RSRpcServices
|
public class MasterRpcServices extends RSRpcServices
|
||||||
implements MasterService.BlockingInterface, RegionServerStatusService.BlockingInterface {
|
implements MasterService.BlockingInterface, RegionServerStatusService.BlockingInterface {
|
||||||
protected static final Log LOG = LogFactory.getLog(MasterRpcServices.class.getName());
|
protected static final Log LOG = LogFactory.getLog(MasterRpcServices.class.getName());
|
||||||
|
|
||||||
private final HMaster master;
|
private final HMaster master;
|
||||||
|
|
|
@ -560,32 +560,6 @@ public class TestAdmin {
|
||||||
"hbase.online.schema.update.enable", true);
|
"hbase.online.schema.update.enable", true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Listens for when an event is done in Master.
|
|
||||||
*/
|
|
||||||
static class DoneListener implements EventHandler.EventHandlerListener {
|
|
||||||
private final AtomicBoolean done;
|
|
||||||
|
|
||||||
DoneListener(final AtomicBoolean done) {
|
|
||||||
super();
|
|
||||||
this.done = done;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void afterProcess(EventHandler event) {
|
|
||||||
this.done.set(true);
|
|
||||||
synchronized (this.done) {
|
|
||||||
// Wake anyone waiting on this value to change.
|
|
||||||
this.done.notifyAll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void beforeProcess(EventHandler event) {
|
|
||||||
// continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
protected void verifyRoundRobinDistribution(HTable ht, int expectedRegions) throws IOException {
|
protected void verifyRoundRobinDistribution(HTable ht, int expectedRegions) throws IOException {
|
||||||
int numRS = ht.getConnection().getCurrentNrHRS();
|
int numRS = ht.getConnection().getCurrentNrHRS();
|
||||||
|
|
Loading…
Reference in New Issue