mirror of https://github.com/apache/activemq.git
Fixing up activemq-amqp build a bit.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1408917 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
478927e6b8
commit
11e0df8e04
|
@ -35,10 +35,13 @@
|
|||
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>activemq-core</artifactId>
|
||||
<scope>provided</scope>
|
||||
<artifactId>activemq-broker</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.qpid</groupId>
|
||||
<artifactId>qpid-proton</artifactId>
|
||||
|
@ -75,15 +78,20 @@
|
|||
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>activemq-core</artifactId>
|
||||
<artifactId>activemq-broker</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>activemq-console</artifactId>
|
||||
<artifactId>activemq-spring</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<!--<dependency>-->
|
||||
<!--<groupId>org.apache.activemq</groupId>-->
|
||||
<!--<artifactId>activemq-console</artifactId>-->
|
||||
<!--<scope>test</scope>-->
|
||||
<!--</dependency>-->
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
|
|
|
@ -436,8 +436,6 @@ class AmqpProtocolConverter {
|
|||
Receiver receiver = ((Receiver)delivery.getLink());
|
||||
if( !delivery.isReadable() ) {
|
||||
System.out.println("it was not readable!");
|
||||
// delivery.settle();
|
||||
// receiver.advance();
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -457,8 +455,6 @@ class AmqpProtocolConverter {
|
|||
}
|
||||
|
||||
receiver.advance();
|
||||
delivery.settle();
|
||||
|
||||
Buffer buffer = current.toBuffer();
|
||||
current = null;
|
||||
onMessage(receiver, delivery, buffer);
|
||||
|
@ -478,7 +474,7 @@ class AmqpProtocolConverter {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void onMessage(Receiver receiver, Delivery delivery, Buffer buffer) throws Exception {
|
||||
protected void onMessage(Receiver receiver, final Delivery delivery, Buffer buffer) throws Exception {
|
||||
EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
|
||||
final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em);
|
||||
current = null;
|
||||
|
@ -498,9 +494,29 @@ class AmqpProtocolConverter {
|
|||
message.setTransactionId(new LocalTransactionId(connectionId, txid));
|
||||
}
|
||||
|
||||
ResponseHandler handler = null;
|
||||
if( delivery.remotelySettled() ) {
|
||||
delivery.settle();
|
||||
} else {
|
||||
handler = new ResponseHandler() {
|
||||
@Override
|
||||
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
|
||||
if( response.isException() ) {
|
||||
ExceptionResponse er = (ExceptionResponse)response;
|
||||
Rejected rejected = new Rejected();
|
||||
ArrayList errors = new ArrayList();
|
||||
errors.add(er.getException().getMessage());
|
||||
rejected.setError(errors);
|
||||
delivery.disposition(rejected);
|
||||
}
|
||||
delivery.settle();
|
||||
pumpProtonToSocket();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
message.onSend();
|
||||
// sendToActiveMQ(message, createResponseHandler(command));
|
||||
sendToActiveMQ(message, null);
|
||||
sendToActiveMQ(message, handler);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -584,12 +600,10 @@ class AmqpProtocolConverter {
|
|||
pumpProtonToSocket();
|
||||
}
|
||||
});
|
||||
receiver.advance();
|
||||
|
||||
} else {
|
||||
throw new Exception("Expected coordinator message type: "+action.getClass());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
};
|
||||
|
|
|
@ -1,28 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.leveldb
|
||||
|
||||
import org.apache.activemq.console.Main
|
||||
import scala.Array
|
||||
import scala.Predef._
|
||||
|
||||
object IDERunner {
|
||||
def main(args:Array[String]) ={
|
||||
Main.main(args)
|
||||
}
|
||||
}
|
|
@ -1194,5 +1194,14 @@
|
|||
</plugins>
|
||||
</build>
|
||||
</profile>
|
||||
<profile>
|
||||
<id>unstable</id>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>activemq-amqp</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</profile>
|
||||
</profiles>
|
||||
</project>
|
||||
|
|
|
@ -16,9 +16,12 @@
|
|||
*/
|
||||
package org.apache.activemq;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Modifier;
|
||||
import java.security.ProtectionDomain;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
@ -65,6 +68,16 @@ public abstract class CombinationTestSupport extends AutoFailTestSupport {
|
|||
private HashMap<String, ComboOption> comboOptions = new HashMap<String, ComboOption>();
|
||||
private boolean combosEvaluated;
|
||||
private Map<String, Object> options;
|
||||
protected File basedir;
|
||||
|
||||
static protected File basedir(Class clazz) {
|
||||
try {
|
||||
ProtectionDomain protectionDomain = clazz.getProtectionDomain();
|
||||
return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile();
|
||||
} catch (IOException e) {
|
||||
return new File(".");
|
||||
}
|
||||
}
|
||||
|
||||
static class ComboOption {
|
||||
final String attribute;
|
||||
|
@ -76,6 +89,9 @@ public abstract class CombinationTestSupport extends AutoFailTestSupport {
|
|||
}
|
||||
}
|
||||
|
||||
public CombinationTestSupport() {
|
||||
basedir = basedir(getClass());
|
||||
}
|
||||
public void addCombinationValues(String attribute, Object[] options) {
|
||||
ComboOption co = this.comboOptions.get(attribute);
|
||||
if (co == null) {
|
||||
|
|
5
pom.xml
5
pom.xml
|
@ -245,6 +245,11 @@
|
|||
<!-- =============================== -->
|
||||
<!-- Internal ActiveMQ Dependencies -->
|
||||
<!-- =============================== -->
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>activemq-amqp</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>activemq-http</artifactId>
|
||||
|
|
Loading…
Reference in New Issue