diff --git a/activemq-amqp/pom.xml b/activemq-amqp/pom.xml index 2ff108ec91..c4f04620aa 100644 --- a/activemq-amqp/pom.xml +++ b/activemq-amqp/pom.xml @@ -35,10 +35,13 @@ org.apache.activemq - activemq-core - provided + activemq-broker - + + org.slf4j + slf4j-api + + org.apache.qpid qpid-proton @@ -75,15 +78,20 @@ org.apache.activemq - activemq-core + activemq-broker test-jar test org.apache.activemq - activemq-console + activemq-spring test + + + + + junit diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index fd8e0964e7..2d6012b2d5 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -436,8 +436,6 @@ class AmqpProtocolConverter { Receiver receiver = ((Receiver)delivery.getLink()); if( !delivery.isReadable() ) { System.out.println("it was not readable!"); -// delivery.settle(); -// receiver.advance(); return; } @@ -457,8 +455,6 @@ class AmqpProtocolConverter { } receiver.advance(); - delivery.settle(); - Buffer buffer = current.toBuffer(); current = null; onMessage(receiver, delivery, buffer); @@ -478,7 +474,7 @@ class AmqpProtocolConverter { } @Override - protected void onMessage(Receiver receiver, Delivery delivery, Buffer buffer) throws Exception { + protected void onMessage(Receiver receiver, final Delivery delivery, Buffer buffer) throws Exception { EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length); final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em); current = null; @@ -498,9 +494,29 @@ class AmqpProtocolConverter { message.setTransactionId(new LocalTransactionId(connectionId, txid)); } + ResponseHandler handler = null; + if( delivery.remotelySettled() ) { + delivery.settle(); + } else { + handler = new ResponseHandler() { + @Override + public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { + if( response.isException() ) { + ExceptionResponse er = (ExceptionResponse)response; + Rejected rejected = new Rejected(); + ArrayList errors = new ArrayList(); + errors.add(er.getException().getMessage()); + rejected.setError(errors); + delivery.disposition(rejected); + } + delivery.settle(); + pumpProtonToSocket(); + } + }; + } + message.onSend(); -// sendToActiveMQ(message, createResponseHandler(command)); - sendToActiveMQ(message, null); + sendToActiveMQ(message, handler); } } @@ -584,12 +600,10 @@ class AmqpProtocolConverter { pumpProtonToSocket(); } }); - receiver.advance(); } else { throw new Exception("Expected coordinator message type: "+action.getClass()); } - } }; diff --git a/activemq-console/src/test/java/org/apache/activemq/leveldb/IDERunner.scala b/activemq-console/src/test/java/org/apache/activemq/leveldb/IDERunner.scala deleted file mode 100644 index 28ac8456e0..0000000000 --- a/activemq-console/src/test/java/org/apache/activemq/leveldb/IDERunner.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.leveldb - -import org.apache.activemq.console.Main -import scala.Array -import scala.Predef._ - -object IDERunner { - def main(args:Array[String]) ={ - Main.main(args) - } -} \ No newline at end of file diff --git a/activemq-core/pom.xml b/activemq-core/pom.xml index 8ea7d21cde..430b21bdfc 100755 --- a/activemq-core/pom.xml +++ b/activemq-core/pom.xml @@ -1194,5 +1194,14 @@ + + unstable + + + org.apache.activemq + activemq-amqp + + + diff --git a/activemq-core/src/test/java/org/apache/activemq/CombinationTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/CombinationTestSupport.java index d4155a7670..7e53660c1a 100755 --- a/activemq-core/src/test/java/org/apache/activemq/CombinationTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/CombinationTestSupport.java @@ -16,9 +16,12 @@ */ package org.apache.activemq; +import java.io.File; +import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.lang.reflect.Modifier; +import java.security.ProtectionDomain; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -65,6 +68,16 @@ public abstract class CombinationTestSupport extends AutoFailTestSupport { private HashMap comboOptions = new HashMap(); private boolean combosEvaluated; private Map options; + protected File basedir; + + static protected File basedir(Class clazz) { + try { + ProtectionDomain protectionDomain = clazz.getProtectionDomain(); + return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile(); + } catch (IOException e) { + return new File("."); + } + } static class ComboOption { final String attribute; @@ -76,6 +89,9 @@ public abstract class CombinationTestSupport extends AutoFailTestSupport { } } + public CombinationTestSupport() { + basedir = basedir(getClass()); + } public void addCombinationValues(String attribute, Object[] options) { ComboOption co = this.comboOptions.get(attribute); if (co == null) { diff --git a/pom.xml b/pom.xml index 6d6d7f2d32..84c4e0673c 100755 --- a/pom.xml +++ b/pom.xml @@ -245,6 +245,11 @@ + + org.apache.activemq + activemq-amqp + ${project.version} + org.apache.activemq activemq-http