This closes #577

This commit is contained in:
jbertram 2016-06-10 13:33:26 -05:00
commit a624a818c1
12 changed files with 73 additions and 83 deletions

View File

@ -29,6 +29,7 @@ import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;
import org.jboss.logging.Logger;
import org.proton.plug.AMQPConnectionCallback;
import org.proton.plug.AMQPConnectionContext;
import org.proton.plug.SASLResult;
@ -37,13 +38,13 @@ import org.proton.plug.exceptions.ActiveMQAMQPException;
import org.proton.plug.handler.ProtonHandler;
import org.proton.plug.handler.impl.DefaultEventHandler;
import org.proton.plug.util.ByteUtil;
import org.proton.plug.util.DebugInfo;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_IDLE_TIMEOUT;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_CHANNEL_MAX;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE;
public abstract class AbstractConnectionContext extends ProtonInitializable implements AMQPConnectionContext {
private static final Logger log = Logger.getLogger(AbstractConnectionContext.class);
public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
@ -87,8 +88,8 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
@Override
public void inputBuffer(ByteBuf buffer) {
if (DebugInfo.debug) {
ByteUtil.debugFrame("Buffer Received ", buffer);
if (log.isTraceEnabled()) {
ByteUtil.debugFrame(log, "Buffer Received ", buffer);
}
handler.inputBuffer(buffer);

View File

@ -25,6 +25,7 @@ import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.jboss.logging.Logger;
import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.AMQPSessionContext;
import org.proton.plug.exceptions.ActiveMQAMQPException;
@ -37,6 +38,7 @@ import org.proton.plug.exceptions.ActiveMQAMQPInternalErrorException;
*/
public abstract class AbstractProtonSessionContext extends ProtonInitializable implements AMQPSessionContext {
private static final Logger log = Logger.getLogger(AbstractProtonSessionContext.class);
protected final AbstractConnectionContext connection;
protected final AMQPSessionCallback sessionSPI;
@ -119,8 +121,7 @@ public abstract class AbstractProtonSessionContext extends ProtonInitializable i
protonProducer.close(false);
}
catch (Exception e) {
e.printStackTrace();
// TODO Logging
log.warn(e.getMessage(), e);
}
}
receivers.clear();
@ -133,8 +134,7 @@ public abstract class AbstractProtonSessionContext extends ProtonInitializable i
protonConsumer.close(false);
}
catch (Exception e) {
e.printStackTrace();
// TODO Logging
log.warn(e.getMessage(), e);
}
}
senders.clear();
@ -145,8 +145,7 @@ public abstract class AbstractProtonSessionContext extends ProtonInitializable i
}
}
catch (Exception e) {
e.printStackTrace();
// TODO logging
log.warn(e.getMessage(), e);
}
closed = true;
}

View File

@ -29,6 +29,7 @@ import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.jboss.logging.Logger;
import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.exceptions.ActiveMQAMQPException;
import org.proton.plug.logger.ActiveMQAMQPProtocolMessageBundle;
@ -41,6 +42,8 @@ import static org.proton.plug.util.DeliveryUtil.readDelivery;
*/
public class ProtonTransactionHandler implements ProtonDeliveryHandler {
private static final Logger log = Logger.getLogger(ProtonTransactionHandler.class);
final AMQPSessionCallback sessionSPI;
public ProtonTransactionHandler(AMQPSessionCallback sessionSPI) {
@ -97,7 +100,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
}
catch (Exception e) {
e.printStackTrace();
log.warn(e.getMessage(), e);
Rejected rejected = new Rejected();
ErrorCondition condition = new ErrorCondition();
condition.setCondition(Symbol.valueOf("failed"));

View File

@ -24,6 +24,7 @@ import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.jboss.logging.Logger;
import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.context.AbstractConnectionContext;
import org.proton.plug.context.AbstractProtonReceiverContext;
@ -36,6 +37,8 @@ import static org.proton.plug.util.DeliveryUtil.readDelivery;
public class ProtonServerReceiverContext extends AbstractProtonReceiverContext {
private static final Logger log = Logger.getLogger(ProtonServerReceiverContext.class);
private final int numberOfCredits = 100;
public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI,
@ -127,7 +130,7 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext {
}
}
catch (Exception e) {
e.printStackTrace();
log.warn(e.getMessage(), e);
Rejected rejected = new Rejected();
ErrorCondition condition = new ErrorCondition();
condition.setCondition(Symbol.valueOf("failed"));

View File

@ -35,6 +35,7 @@ import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.message.ProtonJMessage;
import org.jboss.logging.Logger;
import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.context.AbstractConnectionContext;
import org.proton.plug.context.AbstractProtonContextSender;
@ -50,6 +51,8 @@ import static org.proton.plug.AmqpSupport.findFilter;
public class ProtonServerSenderContext extends AbstractProtonContextSender implements ProtonPlugSender {
private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class);
private static final Symbol SELECTOR = Symbol.getSymbol("jms-selector");
private static final Symbol COPY = Symbol.valueOf("copy");
private static final Symbol TOPIC = Symbol.valueOf("topic");
@ -249,7 +252,7 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
sessionSPI.closeSender(brokerConsumer);
}
catch (Exception e) {
e.printStackTrace();
log.warn(e.getMessage(), e);
throw new ActiveMQAMQPInternalErrorException(e.getMessage());
}
}
@ -277,7 +280,7 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
}
}
catch (Exception e) {
e.printStackTrace();
log.warn(e.getMessage(), e);
throw new ActiveMQAMQPInternalErrorException(e.getMessage());
}
}
@ -356,7 +359,7 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
serverMessage = sessionSPI.encodeMessage(message, deliveryCount);
}
catch (Throwable e) {
e.printStackTrace();
log.warn(e.getMessage(), e);
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
}

View File

@ -33,6 +33,7 @@ import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Transport;
import org.jboss.logging.Logger;
import org.proton.plug.ClientSASL;
import org.proton.plug.ServerSASL;
import org.proton.plug.handler.EventHandler;
@ -41,13 +42,13 @@ import org.proton.plug.handler.ProtonHandler;
import org.proton.plug.context.ProtonInitializable;
import org.proton.plug.SASLResult;
import org.proton.plug.util.ByteUtil;
import org.proton.plug.util.DebugInfo;
/**
* Clebert Suconic
*/
public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHandler {
private static final Logger log = Logger.getLogger(ProtonHandlerImpl.class);
private final Transport transport = Proton.transport();
@ -177,8 +178,8 @@ public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHand
capacity = transport.capacity();
}
}
catch (Throwable ignored) {
ignored.printStackTrace();
catch (Throwable e) {
log.debug(e.getMessage(), e);
}
receivedFirstPacket = true;
@ -194,10 +195,10 @@ public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHand
}
else {
if (capacity == 0) {
System.out.println("abandoning: " + buffer.readableBytes());
log.debugf("abandoning: readableBytes=%d", buffer.readableBytes());
}
else {
System.out.println("transport closed, discarding: " + buffer.readableBytes() + " capacity = " + transport.capacity());
log.debugf("transport closed, discarding: readableBytes=%d, capacity=%d", buffer.readableBytes(), transport.capacity());
}
break;
}
@ -303,8 +304,8 @@ public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHand
byte[] dataSASL = new byte[serverSasl.pending()];
serverSasl.recv(dataSASL, 0, dataSASL.length);
if (DebugInfo.debug) {
System.out.println("Working on sasl::" + ByteUtil.bytesToHex(dataSASL, 2));
if (log.isTraceEnabled()) {
log.trace("Working on sasl::" + ByteUtil.bytesToHex(dataSASL, 2));
}
saslResult = mechanism.processSASL(dataSASL);
@ -355,15 +356,14 @@ public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHand
// while a client is also trying to write here
while ((ev = popEvent()) != null) {
for (EventHandler h : handlers) {
if (DebugInfo.debug) {
System.out.println("Handling " + ev + " towards " + h);
if (log.isTraceEnabled()) {
log.trace("Handling " + ev + " towards " + h);
}
try {
Events.dispatch(ev, h);
}
catch (Exception e) {
// TODO: logs
e.printStackTrace();
log.warn(e.getMessage(), e);
connection.setCondition(new ErrorCondition());
}
}
@ -374,8 +374,7 @@ public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHand
h.onTransport(transport);
}
catch (Exception e) {
// TODO: logs
e.printStackTrace();
log.warn(e.getMessage(), e);
connection.setCondition(new ErrorCondition());
}
}

View File

@ -18,23 +18,26 @@ package org.proton.plug.util;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import org.jboss.logging.Logger;
public class ByteUtil {
public static void debugFrame(String message, ByteBuf byteIn) {
int location = byteIn.readerIndex();
// debugging
byte[] frame = new byte[byteIn.writerIndex()];
byteIn.readBytes(frame);
public static void debugFrame(Logger logger, String message, ByteBuf byteIn) {
if (logger.isTraceEnabled()) {
int location = byteIn.readerIndex();
// debugging
byte[] frame = new byte[byteIn.writerIndex()];
byteIn.readBytes(frame);
try {
System.out.println(message + "\n" + ByteUtil.formatGroup(ByteUtil.bytesToHex(frame), 8, 16));
}
catch (Exception e) {
e.printStackTrace();
}
try {
logger.trace(message + "\n" + ByteUtil.formatGroup(ByteUtil.bytesToHex(frame), 8, 16));
}
catch (Exception e) {
logger.warn(e.getMessage(), e);
}
byteIn.readerIndex(location);
byteIn.readerIndex(location);
}
}
public static String formatGroup(String str, int groupSize, int lineBreak) {

View File

@ -1,22 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.proton.plug.util;
public class DebugInfo {
public static final boolean debug = false;
}

View File

@ -26,11 +26,14 @@ import javax.jms.Session;
import java.lang.ref.WeakReference;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.jboss.logging.Logger;
import org.proton.plug.test.minimalserver.DumbServer;
import org.proton.plug.test.minimalserver.MinimalServer;
public class AbstractJMSTest {
private static final Logger log = Logger.getLogger(AbstractJMSTest.class);
protected final boolean useSASL;
protected String address = "exampleQueue";
@ -66,7 +69,7 @@ public class AbstractJMSTest {
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
log.warn(exception.getMessage(), exception);
}
});
connection.start();

View File

@ -21,6 +21,7 @@ import java.util.concurrent.Executors;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.jboss.logging.Logger;
import org.proton.plug.AMQPConnectionContext;
import org.proton.plug.AMQPConnectionCallback;
import org.proton.plug.AMQPSessionCallback;
@ -30,10 +31,10 @@ import org.proton.plug.sasl.AnonymousServerSASL;
import org.proton.plug.sasl.ServerSASLPlain;
import org.proton.plug.test.minimalserver.MinimalSessionSPI;
import org.proton.plug.util.ByteUtil;
import org.proton.plug.util.DebugInfo;
public class ProtonINVMSPI implements AMQPConnectionCallback {
private static final Logger log = Logger.getLogger(ProtonINVMSPI.class);
AMQPConnectionContext returningConnection;
ProtonServerConnectionContext serverConnection = new ProtonServerConnectionContext(new ReturnSPI(), Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
@ -69,8 +70,8 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
@Override
public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
if (DebugInfo.debug) {
ByteUtil.debugFrame("InVM->", bytes);
if (log.isTraceEnabled()) {
ByteUtil.debugFrame(log, "InVM->", bytes);
}
final int size = bytes.writerIndex();
@ -79,15 +80,15 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
@Override
public void run() {
try {
if (DebugInfo.debug) {
ByteUtil.debugFrame("InVMDone->", bytes);
if (log.isTraceEnabled()) {
ByteUtil.debugFrame(log, "InVMDone->", bytes);
}
serverConnection.inputBuffer(bytes);
try {
connection.outputDone(size);
}
catch (Exception e) {
e.printStackTrace();
log.warn(e.getMessage(), e);
}
}
finally {
@ -128,9 +129,7 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
final int size = bytes.writerIndex();
if (DebugInfo.debug) {
ByteUtil.debugFrame("InVM<-", bytes);
}
ByteUtil.debugFrame(log, "InVM<-", bytes);
bytes.retain();
returningExecutor.execute(new Runnable() {
@ -138,16 +137,14 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
public void run() {
try {
if (DebugInfo.debug) {
ByteUtil.debugFrame("InVM done<-", bytes);
}
ByteUtil.debugFrame(log, "InVM done<-", bytes);
returningConnection.inputBuffer(bytes);
try {
connection.outputDone(size);
}
catch (Exception e) {
e.printStackTrace();
log.warn(e.getMessage(), e);
}
}

View File

@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.jboss.logging.Logger;
import org.proton.plug.AMQPConnectionContext;
import org.proton.plug.AMQPConnectionCallback;
import org.proton.plug.AMQPSessionCallback;
@ -29,11 +30,11 @@ import org.proton.plug.ServerSASL;
import org.proton.plug.sasl.AnonymousServerSASL;
import org.proton.plug.sasl.ServerSASLPlain;
import org.proton.plug.util.ByteUtil;
import org.proton.plug.util.DebugInfo;
import org.proton.plug.util.ReusableLatch;
public class AMQPClientSPI implements AMQPConnectionCallback {
private static final Logger log = Logger.getLogger(AMQPClientSPI.class);
final Channel channel;
protected AMQPConnectionContext connection;
@ -65,8 +66,8 @@ public class AMQPClientSPI implements AMQPConnectionCallback {
@Override
public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
if (DebugInfo.debug) {
ByteUtil.debugFrame("Bytes leaving client", bytes);
if (log.isTraceEnabled()) {
ByteUtil.debugFrame(log, "Bytes leaving client", bytes);
}
final int bufferSize = bytes.writerIndex();
@ -85,12 +86,11 @@ public class AMQPClientSPI implements AMQPConnectionCallback {
if (connection.isSyncOnFlush()) {
try {
if (!latch.await(5, TimeUnit.SECONDS)) {
// TODO logs
System.err.println("Flush took longer than 5 seconds!!!");
log.debug("Flush took longer than 5 seconds!!!");
}
}
catch (Throwable e) {
e.printStackTrace();
log.warn(e.getMessage(), e);
}
}

View File

@ -25,6 +25,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.jboss.logging.Logger;
import org.proton.plug.AMQPConnectionContext;
import org.proton.plug.AMQPConnectionCallback;
import org.proton.plug.AMQPSessionCallback;
@ -32,11 +33,11 @@ import org.proton.plug.ServerSASL;
import org.proton.plug.sasl.AnonymousServerSASL;
import org.proton.plug.sasl.ServerSASLPlain;
import org.proton.plug.util.ByteUtil;
import org.proton.plug.util.DebugInfo;
import org.proton.plug.util.ReusableLatch;
public class MinimalConnectionSPI implements AMQPConnectionCallback {
private static final Logger logger = Logger.getLogger(MinimalConnectionSPI.class);
Channel channel;
private AMQPConnectionContext connection;
@ -73,7 +74,7 @@ public class MinimalConnectionSPI implements AMQPConnectionCallback {
public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
final int bufferSize = bytes.writerIndex();
if (DebugInfo.debug) {
if (logger.isTraceEnabled()) {
// some debug
byte[] frame = new byte[bytes.writerIndex()];
int readerOriginalPos = bytes.readerIndex();