Updates to proton 0.5
This commit is contained in:
Timothy Bish 2013-09-04 10:26:44 -04:00
parent 74b35bc5dd
commit ebe54c46b3
5 changed files with 284 additions and 17 deletions

View File

@ -63,6 +63,11 @@
<artifactId>activemq-kahadb-store</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-jaas</artifactId>
<scope>test</scope>
</dependency>
<!-- Joram JMS conformance tests -->
<dependency>

View File

@ -72,19 +72,22 @@ import org.apache.qpid.proton.amqp.transaction.Declare;
import org.apache.qpid.proton.amqp.transaction.Declared;
import org.apache.qpid.proton.amqp.transaction.Discharge;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointError;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.EngineFactory;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.ConnectionImpl;
import org.apache.qpid.proton.engine.impl.LinkImpl;
import org.apache.qpid.proton.engine.impl.EngineFactoryImpl;
import org.apache.qpid.proton.engine.impl.ProtocolTracer;
import org.apache.qpid.proton.engine.impl.TransportImpl;
import org.apache.qpid.proton.framing.TransportFrame;
@ -120,8 +123,9 @@ class AmqpProtocolConverter {
int prefetch = 100;
ReentrantLock lock = new ReentrantLock();
TransportImpl protonTransport = new TransportImpl();
ConnectionImpl protonConnection = new ConnectionImpl();
EngineFactory engineFactory = new EngineFactoryImpl();
Transport protonTransport = engineFactory.createTransport();
Connection protonConnection = engineFactory.createConnection();
public AmqpProtocolConverter(AmqpTransport transport, BrokerContext brokerContext) {
this.amqpTransport = transport;
@ -131,7 +135,7 @@ class AmqpProtocolConverter {
void updateTracer() {
if (amqpTransport.isTrace()) {
this.protonTransport.setProtocolTracer(new ProtocolTracer() {
((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
@Override
public void receivedFrame(TransportFrame transportFrame) {
if (TRACE_FRAMES.isTraceEnabled()) {
@ -415,9 +419,7 @@ class AmqpProtocolConverter {
if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException();
// TODO: figure out how to close /w an error.
// protonConnection.setLocalError(new EndpointError(exception.getClass().getName(),
// exception.getMessage()));
protonConnection.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
protonConnection.close();
pumpProtonToSocket();
amqpTransport.onException(IOExceptionSupport.create(exception));
@ -729,7 +731,7 @@ class AmqpProtocolConverter {
if (response.isException()) {
receiver.setTarget(null);
Throwable exception = ((ExceptionResponse) response).getException();
((LinkImpl) receiver).setLocalError(new EndpointError(exception.getClass().getName(), exception.getMessage()));
receiver.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()));
receiver.close();
} else {
receiver.open();
@ -740,7 +742,7 @@ class AmqpProtocolConverter {
}
} catch (AmqpProtocolException exception) {
receiver.setTarget(null);
((LinkImpl) receiver).setLocalError(new EndpointError(exception.getSymbolicName(), exception.getMessage()));
receiver.setCondition(new ErrorCondition(Symbol.getSymbol(exception.getSymbolicName()), exception.getMessage()));
receiver.close();
}
}
@ -840,6 +842,7 @@ class AmqpProtocolConverter {
Buffer currentBuffer;
Delivery currentDelivery;
final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT";
public void pumpOutbound() throws Exception {
while (!closed) {
@ -868,6 +871,12 @@ class AmqpProtocolConverter {
final MessageDispatch md = outbound.removeFirst();
try {
if (md.getMessage() != null) {
org.apache.activemq.command.Message message = md.getMessage();
if (!message.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
message.setProperty(MESSAGE_FORMAT_KEY, 0);
}
}
final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage();
if (jms == null) {
// It's the end of browse signal.
@ -1102,7 +1111,7 @@ class AmqpProtocolConverter {
SelectorParser.parse(selector);
} catch (InvalidSelectorException e) {
sender.setSource(null);
((LinkImpl) sender).setLocalError(new EndpointError("amqp:invalid-field", e.getMessage()));
sender.setCondition(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
sender.close();
consumerContext.closed = true;
return;
@ -1133,7 +1142,7 @@ class AmqpProtocolConverter {
sender.setSource(null);
Throwable exception = ((ExceptionResponse) response).getException();
String name = exception.getClass().getName();
((LinkImpl) sender).setLocalError(new EndpointError(name, exception.getMessage()));
sender.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()));
}
sender.open();
pumpProtonToSocket();
@ -1185,11 +1194,11 @@ class AmqpProtocolConverter {
if (response.isException()) {
sender.setSource(null);
Throwable exception = ((ExceptionResponse) response).getException();
String name = exception.getClass().getName();
Symbol condition = AmqpError.INTERNAL_ERROR;
if (exception instanceof InvalidSelectorException) {
name = "amqp:invalid-field";
condition = AmqpError.INVALID_FIELD;
}
((LinkImpl) sender).setLocalError(new EndpointError(name, exception.getMessage()));
sender.setCondition(new ErrorCondition(condition, exception.getMessage()));
subscriptionsByConsumerId.remove(id);
sender.close();
} else {
@ -1201,7 +1210,7 @@ class AmqpProtocolConverter {
});
} catch (AmqpProtocolException e) {
sender.setSource(null);
((LinkImpl) sender).setLocalError(new EndpointError(e.getSymbolicName(), e.getMessage()));
sender.setCondition(new ErrorCondition(Symbol.getSymbol(e.getSymbolicName()), e.getMessage()));
sender.close();
}
}

View File

@ -0,0 +1,168 @@
package org.apache.activemq.transport.amqp;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
import java.net.URI;
import static org.junit.Assert.*;
/**
* @author Kevin Earls
*/
public class SimpleAMQPAuthTest {
public static final String SIMPLE_AUTH_AMQP_BROKER_XML = "org/apache/activemq/transport/amqp/simple-auth-amqp-broker.xml";
public BrokerService brokerService;
protected static final Logger LOG = LoggerFactory.getLogger(SimpleAMQPAuthTest.class);
protected int port = 5672;
@Before
public void setUp() throws Exception {
startBroker();
}
@After
public void stopBroker() throws Exception {
if (brokerService != null) {
brokerService.stop();
brokerService = null;
}
}
@Test(timeout = 10000)
public void testNoUserOrPassword() throws Exception {
try {
ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "", "");
Connection connection = factory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
LOG.error("Unexpected exception ", exception);
exception.printStackTrace();
}
});
connection.start();
Thread.sleep(1000);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
fail("Expected JMSException");
} catch (JMSException e) {
Exception linkedException = e.getLinkedException();
if (linkedException != null && linkedException instanceof ConnectionClosedException) {
ConnectionClosedException cce = (ConnectionClosedException) linkedException;
assertEquals("Error{condition=unauthorized-access,description=User name [null] or password is invalid.}", cce.getRemoteError().toString());
} else {
LOG.error("Unexpected Exception", e);
fail("Unexpected exception: " + e.getMessage());
}
}
}
@Test(timeout = 10000)
public void testUnknownUser() throws Exception {
try {
ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
Connection connection = factory.createConnection("nosuchuser", "blah");
connection.start();
Thread.sleep(500);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
fail("Expected JMSException");
} catch (JMSException e) {
Exception linkedException = e.getLinkedException();
if (linkedException != null && linkedException instanceof ConnectionClosedException) {
ConnectionClosedException cce = (ConnectionClosedException) linkedException;
assertEquals("Error{condition=unauthorized-access,description=User name [nosuchuser] or password is invalid.}", cce.getRemoteError().toString());
} else {
LOG.error("Unexpected Exception", e);
fail("Unexpected exception: " + e.getMessage());
}
}
}
@Test(timeout = 10000)
public void testKnownUserWrongPassword() throws Exception {
try {
ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
Connection connection = factory.createConnection("user", "wrongPassword");
connection.start();
Thread.sleep(500);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
fail("Expected JMSException");
} catch (JMSException e) {
Exception linkedException = e.getLinkedException();
if (linkedException != null && linkedException instanceof ConnectionClosedException) {
ConnectionClosedException cce = (ConnectionClosedException) linkedException;
assertEquals("Error{condition=unauthorized-access,description=User name [user] or password is invalid.}", cce.getRemoteError().toString());
} else {
LOG.error("Unexpected Exception", e);
fail("Unexpected exception: " + e.getMessage());
}
}
}
@Test(timeout = 30000)
public void testSendReceive() throws Exception {
ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
Connection connection = factory.createConnection("user", "userPassword");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueImpl queue = new QueueImpl("queue://txqueue");
MessageProducer p = session.createProducer(queue);
TextMessage message = null;
message = session.createTextMessage();
String messageText = "hello sent at " + new java.util.Date().toString();
message.setText(messageText);
p.send(message);
// Get the message we just sent
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
Message msg = consumer.receive(5000);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
TextMessage textMessage = (TextMessage) msg;
assertEquals(messageText, textMessage.getText());
connection.close();
}
protected BrokerService createBroker() throws Exception {
return createBroker(SIMPLE_AUTH_AMQP_BROKER_XML);
}
protected BrokerService createBroker(String uri) throws Exception {
LOG.debug(">>>>> Loading broker configuration from the classpath with URI: " + uri);
return BrokerFactory.createBroker(new URI("xbean:" + uri));
}
public void startBroker() throws Exception {
brokerService = createBroker();
brokerService.start();
brokerService.waitUntilStarted();
}
}

View File

@ -0,0 +1,85 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- this file can only be parsed using the xbean-spring library -->
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker useJmx="true" persistent="false" xmlns="http://activemq.apache.org/schema/core" populateJMSXUserID="true" schedulePeriodForDestinationPurge="2000">
<destinations>
<queue physicalName="TEST.Q" />
</destinations>
<!-- Use a non-default port in case the default port is in use -->
<managementContext>
<managementContext connectorPort="1199"/>
</managementContext>
<transportConnectors>
<transportConnector name="openwire" uri="vm://localhost" />
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672"/>
</transportConnectors>
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="system" password="systemPassword" groups="users,admins"/>
<authenticationUser username="user" password="userPassword" groups="users"/>
<authenticationUser username="guest" password="guestPassword" groups="guests"/>
</users>
</simpleAuthenticationPlugin>
<!-- lets configure a destination based authorization mechanism -->
<!--
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
<authorizationEntry queue="USERS.>" read="users" write="users" admin="users" />
<authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
<authorizationEntry queue="TEST.Q" read="guests" write="guests" />
<authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
<authorizationEntry topic="USERS.>" read="users" write="users" admin="users" />
<authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
<authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users"/>
</authorizationEntries>
<tempDestinationAuthorizationEntry>
<tempDestinationAuthorizationEntry read="admins" write="admins" admin="admins"/>
</tempDestinationAuthorizationEntry>
</authorizationMap>
</map>
</authorizationPlugin>
-->
</plugins>
</broker>
</beans>

View File

@ -97,7 +97,7 @@
<p2psockets-version>1.1.2</p2psockets-version>
<linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
<zookeeper-version>3.4.3</zookeeper-version>
<qpid-proton-version>0.3.0-fuse-4</qpid-proton-version>
<qpid-proton-version>0.5</qpid-proton-version>
<qpid-jms-version>0.22</qpid-jms-version>
<regexp-version>1.3</regexp-version>
<rome-version>1.0</rome-version>