ARTEMIS-1056 Better event processing

This commit is contained in:
Clebert Suconic 2017-03-27 17:45:01 -04:00 committed by Justin Bertram
parent feadb6338c
commit dce59d5436
1 changed files with 16 additions and 25 deletions

View File

@ -164,7 +164,7 @@ public class ProtonHandler extends ProtonInitializable {
capacity = transport.capacity(); capacity = transport.capacity();
} }
} catch (Throwable e) { } catch (Throwable e) {
log.debug(e.getMessage(), e); log.warn(e.getMessage(), e);
} }
receivedFirstPacket = true; receivedFirstPacket = true;
@ -296,20 +296,6 @@ public class ProtonHandler extends ProtonInitializable {
} }
} }
private Event popEvent() {
synchronized (lock) {
Event ev = collector.peek();
if (ev != null) {
// pop will invalidate the event
// for that reason we make a new one
// Events are reused inside the collector, so we need to make a new one here
ev = ev.copy();
collector.pop();
}
return ev;
}
}
private void dispatchAuth(boolean sasl) { private void dispatchAuth(boolean sasl) {
for (EventHandler h : handlers) { for (EventHandler h : handlers) {
h.onAuthInit(this, getConnection(), sasl); h.onAuthInit(this, getConnection(), sasl);
@ -322,17 +308,22 @@ public class ProtonHandler extends ProtonInitializable {
// because we could have a distributed deadlock // because we could have a distributed deadlock
// while processing events (for instance onTransport) // while processing events (for instance onTransport)
// while a client is also trying to write here // while a client is also trying to write here
while ((ev = popEvent()) != null) {
for (EventHandler h : handlers) { synchronized (lock) {
if (log.isTraceEnabled()) { while ((ev = collector.peek()) != null) {
log.trace("Handling " + ev + " towards " + h); for (EventHandler h : handlers) {
} if (log.isTraceEnabled()) {
try { log.trace("Handling " + ev + " towards " + h);
Events.dispatch(ev, h); }
} catch (Exception e) { try {
log.warn(e.getMessage(), e); Events.dispatch(ev, h);
connection.setCondition(new ErrorCondition()); } catch (Exception e) {
log.warn(e.getMessage(), e);
connection.setCondition(new ErrorCondition());
}
} }
collector.pop();
} }
} }