This commit is contained in:
Justin Bertram 2017-03-27 22:01:36 -05:00
commit 5843354168
1 changed files with 16 additions and 25 deletions

View File

@ -164,7 +164,7 @@ public class ProtonHandler extends ProtonInitializable {
capacity = transport.capacity(); capacity = transport.capacity();
} }
} catch (Throwable e) { } catch (Throwable e) {
log.debug(e.getMessage(), e); log.warn(e.getMessage(), e);
} }
receivedFirstPacket = true; receivedFirstPacket = true;
@ -296,20 +296,6 @@ public class ProtonHandler extends ProtonInitializable {
} }
} }
private Event popEvent() {
synchronized (lock) {
Event ev = collector.peek();
if (ev != null) {
// pop will invalidate the event
// for that reason we make a new one
// Events are reused inside the collector, so we need to make a new one here
ev = ev.copy();
collector.pop();
}
return ev;
}
}
private void dispatchAuth(boolean sasl) { private void dispatchAuth(boolean sasl) {
for (EventHandler h : handlers) { for (EventHandler h : handlers) {
h.onAuthInit(this, getConnection(), sasl); h.onAuthInit(this, getConnection(), sasl);
@ -322,7 +308,9 @@ public class ProtonHandler extends ProtonInitializable {
// because we could have a distributed deadlock // because we could have a distributed deadlock
// while processing events (for instance onTransport) // while processing events (for instance onTransport)
// while a client is also trying to write here // while a client is also trying to write here
while ((ev = popEvent()) != null) {
synchronized (lock) {
while ((ev = collector.peek()) != null) {
for (EventHandler h : handlers) { for (EventHandler h : handlers) {
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
log.trace("Handling " + ev + " towards " + h); log.trace("Handling " + ev + " towards " + h);
@ -334,6 +322,9 @@ public class ProtonHandler extends ProtonInitializable {
connection.setCondition(new ErrorCondition()); connection.setCondition(new ErrorCondition());
} }
} }
collector.pop();
}
} }
for (EventHandler h : handlers) { for (EventHandler h : handlers) {