This closes #469

This commit is contained in:
Clebert Suconic 2016-04-20 12:33:02 -04:00
commit 1f586eaecb
17 changed files with 220 additions and 260 deletions

View File

@ -71,7 +71,6 @@ import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
@ -151,8 +150,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private ConnectionState state;
private final Set<ActiveMQDestination> tempQueues = new ConcurrentHashSet<>();
/**
* Openwire doesn't sen transactions associated with any sessions.
* It will however send beingTX / endTX as it would be doing it with XA Transactions.
@ -289,7 +286,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
response.setCorrelationId(commandId);
dispatchSync(response);
}
}
}
catch (Exception e) {
@ -512,10 +508,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
return this.wireFormat;
}
public void registerTempQueue(ActiveMQDestination queue) {
tempQueues.add(queue);
}
private void shutdown(boolean fail) {
if (fail) {
transportConnection.forceClose();
@ -692,19 +684,17 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
SimpleString qName = OpenWireUtil.toCoreAddress(dest);
QueueBinding binding = (QueueBinding) server.getPostOffice().getBinding(qName);
if (binding == null) {
if (getState().getInfo() != null) {
if (dest.isTemporary()) {
internalSession.createQueue(qName, qName, null, dest.isTemporary(), false);
}
else {
ConnectionInfo connInfo = getState().getInfo();
CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE;
server.getSecurityStore().check(qName, checkType, this);
server.checkQueueCreationLimit(getUsername());
}
ConnectionInfo connInfo = getState().getInfo();
server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, dest.isTemporary());
}
server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, false);
if (dest.isTemporary()) {
registerTempQueue(dest);
}
}
}
@ -1407,7 +1397,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
if (transaction == null) {
throw new IllegalStateException("cannot find transactionInfo::" + txID + " xid=" + xid);
return null;
}
if (session != null && transaction.getProtocolData() != session) {

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import javax.jms.InvalidDestinationException;
import javax.jms.ResourceAllocationException;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
@ -24,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
@ -361,7 +363,11 @@ public class AMQSession implements SessionCallback {
connection.getTransportConnection().setAutoRead(false);
}
getCoreSession().send(coreMsg, false);
RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
}
if (runToUse != null) {
// if the timeout is >0, it will wait this much milliseconds

View File

@ -68,26 +68,26 @@ public interface PostOffice extends ActiveMQComponent {
Map<SimpleString, Binding> getAllBindings();
void route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception;
RoutingStatus route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception;
void route(ServerMessage message, QueueCreator queueCreator, Transaction tx, boolean direct) throws Exception;
RoutingStatus route(ServerMessage message, QueueCreator queueCreator, Transaction tx, boolean direct) throws Exception;
void route(ServerMessage message,
RoutingStatus route(ServerMessage message,
QueueCreator queueCreator,
Transaction tx,
boolean direct,
boolean rejectDuplicates) throws Exception;
void route(ServerMessage message,
QueueCreator queueCreator,
RoutingContext context,
boolean direct) throws Exception;
RoutingStatus route(ServerMessage message,
QueueCreator queueCreator,
RoutingContext context,
boolean direct) throws Exception;
void route(ServerMessage message,
QueueCreator queueCreator,
RoutingContext context,
boolean direct,
boolean rejectDuplicates) throws Exception;
RoutingStatus route(ServerMessage message,
QueueCreator queueCreator,
RoutingContext context,
boolean direct,
boolean rejectDuplicates) throws Exception;
MessageReference reroute(ServerMessage message, Queue queue, Transaction tx) throws Exception;

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.postoffice;
/**
* Used to indicate the result of a server send
*/
public enum RoutingStatus {
OK,
NO_BINDINGS,
NO_BINDINGS_DLA,
DUPLICATED_ID;
}

View File

@ -55,6 +55,7 @@ import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueInfo;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@ -570,41 +571,42 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
@Override
public void route(final ServerMessage message, QueueCreator queueCreator, final boolean direct) throws Exception {
route(message, queueCreator, (Transaction) null, direct);
public RoutingStatus route(final ServerMessage message, QueueCreator queueCreator, final boolean direct) throws Exception {
return route(message, queueCreator, (Transaction) null, direct);
}
@Override
public void route(final ServerMessage message,
public RoutingStatus route(final ServerMessage message,
QueueCreator queueCreator,
final Transaction tx,
final boolean direct) throws Exception {
route(message, queueCreator, new RoutingContextImpl(tx), direct);
return route(message, queueCreator, new RoutingContextImpl(tx), direct);
}
@Override
public void route(final ServerMessage message,
public RoutingStatus route(final ServerMessage message,
final QueueCreator queueCreator,
final Transaction tx,
final boolean direct,
final boolean rejectDuplicates) throws Exception {
route(message, queueCreator, new RoutingContextImpl(tx), direct, rejectDuplicates);
return route(message, queueCreator, new RoutingContextImpl(tx), direct, rejectDuplicates);
}
@Override
public void route(final ServerMessage message,
final QueueCreator queueCreator,
final RoutingContext context,
final boolean direct) throws Exception {
route(message, queueCreator, context, direct, true);
public RoutingStatus route(final ServerMessage message,
final QueueCreator queueCreator,
final RoutingContext context,
final boolean direct) throws Exception {
return route(message, queueCreator, context, direct, true);
}
@Override
public void route(final ServerMessage message,
final QueueCreator queueCreator,
final RoutingContext context,
final boolean direct,
boolean rejectDuplicates) throws Exception {
public RoutingStatus route(final ServerMessage message,
final QueueCreator queueCreator,
final RoutingContext context,
final boolean direct,
boolean rejectDuplicates) throws Exception {
RoutingStatus result = RoutingStatus.OK;
// Sanity check
if (message.getRefCount() > 0) {
throw new IllegalStateException("Message cannot be routed more than once");
@ -619,7 +621,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
applyExpiryDelay(message, address);
if (!checkDuplicateID(message, context, rejectDuplicates, startedTX)) {
return;
return RoutingStatus.DUPLICATED_ID;
}
if (message.hasInternalProperties()) {
@ -671,6 +673,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
if (dlaAddress == null) {
result = RoutingStatus.NO_BINDINGS;
ActiveMQServerLogger.LOGGER.noDLA(address);
}
else {
@ -679,9 +682,12 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
message.setAddress(dlaAddress);
route(message, null, context.getTransaction(), false);
result = RoutingStatus.NO_BINDINGS_DLA;
}
}
else {
result = RoutingStatus.NO_BINDINGS;
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("Message " + message + " is not going anywhere as it didn't have a binding on address:" + address);
}
@ -709,6 +715,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
if (startedTX.get()) {
context.getTransaction().commit();
}
return result;
}
// HORNETQ-1029

View File

@ -23,6 +23,7 @@ import java.util.Set;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@ -132,7 +133,9 @@ public interface ServerSession extends SecurityAuth {
void sendContinuations(int packetSize, long totalBodySize, byte[] body, boolean continues) throws Exception;
void send(ServerMessage message, boolean direct) throws Exception;
RoutingStatus send(ServerMessage message, boolean direct, boolean noAutoCreateQueue) throws Exception;
RoutingStatus send(ServerMessage message, boolean direct) throws Exception;
void sendLarge(MessageInternal msg) throws Exception;

View File

@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.security.CheckType;
@ -1213,7 +1214,13 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
@Override
public void send(final ServerMessage message, final boolean direct) throws Exception {
public RoutingStatus send(final ServerMessage message, final boolean direct) throws Exception {
return send(message, direct, false);
}
@Override
public RoutingStatus send(final ServerMessage message, final boolean direct, boolean noAutoCreateQueue) throws Exception {
RoutingStatus result = RoutingStatus.OK;
//large message may come from StompSession directly, in which
//case the id header already generated.
if (!message.isLargeMessage()) {
@ -1256,8 +1263,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
handleManagementMessage(message, direct);
}
else {
doSend(message, direct);
result = doSend(message, direct, noAutoCreateQueue);
}
return result;
}
@Override
@ -1281,7 +1289,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
}
doSend(currentLargeMessage, false);
doSend(currentLargeMessage, false, false);
currentLargeMessage = null;
}
@ -1479,7 +1487,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
if (replyTo != null) {
reply.setAddress(replyTo);
doSend(reply, direct);
doSend(reply, direct, false);
}
}
@ -1535,7 +1543,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
theTx.rollback();
}
protected void doSend(final ServerMessage msg, final boolean direct) throws Exception {
protected RoutingStatus doSend(final ServerMessage msg, final boolean direct, final boolean noAutoCreateQueue) throws Exception {
RoutingStatus result = RoutingStatus.OK;
// check the user has write access to this address.
try {
securityCheck(msg.getAddress(), CheckType.SEND, this);
@ -1554,7 +1563,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
try {
postOffice.route(msg, queueCreator, routingContext, direct);
if (noAutoCreateQueue) {
result = postOffice.route(msg, null, routingContext, direct);
}
else {
result = postOffice.route(msg, queueCreator, routingContext, direct);
}
Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddress());
@ -1569,6 +1583,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
finally {
routingContext.clear();
}
return result;
}
@Override

View File

@ -313,6 +313,19 @@
<artifactId>artemis-openwire-protocol</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-stomp-protocol</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-stomp-protocol</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.jboss.byteman</groupId>
<artifactId>byteman</artifactId>
@ -422,6 +435,11 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>${skipActiveMQ5Tests}</skipTests>
<!--
<additionalClasspathElements>
<additionalClasspathElement>src/main/resources/stomp</additionalClasspathElement>
</additionalClasspathElements>
-->
<includes>
<!-- included packages -->
<include>**/org/apache/activemq/*Test.java</include>

View File

@ -84,12 +84,6 @@ public class BrokerService implements Service {
public static final long DEFAULT_START_TIMEOUT = 600000L;
public static boolean disableWrapper = false;
public String SERVER_SIDE_KEYSTORE;
public String KEYSTORE_PASSWORD;
public String SERVER_SIDE_TRUSTSTORE;
public String TRUSTSTORE_PASSWORD;
public String storeType;
private SslContext sslContext;
private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
@ -151,7 +145,6 @@ public class BrokerService implements Service {
tmpfolder = new TemporaryFolder(targetTmp);
tmpfolder.create();
Exception e = new Exception();
e.fillInStackTrace();
startBroker(startAsync);
map.put(broker, e);
}
@ -261,10 +254,6 @@ public class BrokerService implements Service {
}
}
public boolean enableSsl() {
return this.SERVER_SIDE_KEYSTORE != null;
}
//below are methods called directly by tests
//we don't actually implement any of these for now,
//just to make test compile pass.
@ -500,23 +489,11 @@ public class BrokerService implements Service {
public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
this.transportConnectors = transportConnectors;
for (TransportConnector connector : transportConnectors) {
if (connector.getUri().getScheme().equals("ssl")) {
boolean added = this.extraConnectors.add(new ConnectorInfo(connector.getUri().getPort(), true));
if (added) {
System.out.println("added ssl connector " + connector);
}
else {
System.out.println("WARNing! failed to add ssl connector: " + connector);
}
if (sslContext instanceof SpringSslContext) {
this.extraConnectors.add(new ConnectorInfo(connector.getUri(), (SpringSslContext)sslContext));
}
else {
boolean added = this.extraConnectors.add(new ConnectorInfo(connector.getUri().getPort()));
if (added) {
System.out.println("added connector " + connector);
}
else {
System.out.println("WARNing! failed to add connector: " + connector);
}
this.extraConnectors.add(new ConnectorInfo(connector.getUri()));
}
}
}
@ -584,7 +561,12 @@ public class BrokerService implements Service {
connector = new FakeTransportConnector(bindAddress);
this.transportConnectors.add(connector);
this.extraConnectors.add(new ConnectorInfo(bindAddress));
if (sslContext instanceof SpringSslContext) {
this.extraConnectors.add(new ConnectorInfo(bindAddress, (SpringSslContext) sslContext));
}
else {
this.extraConnectors.add(new ConnectorInfo(bindAddress));
}
return connector;
}
@ -738,14 +720,6 @@ public class BrokerService implements Service {
public void setSslContext(SslContext sslContext) {
this.sslContext = sslContext;
if (sslContext instanceof SpringSslContext) {
SpringSslContext springContext = (SpringSslContext)sslContext;
this.SERVER_SIDE_KEYSTORE = springContext.getKeyStore();
this.KEYSTORE_PASSWORD = springContext.getKeyStorePassword();
this.SERVER_SIDE_TRUSTSTORE = springContext.getTrustStore();
this.TRUSTSTORE_PASSWORD = springContext.getTrustStorePassword();
this.storeType = springContext.getKeyStoreType();
}
}
public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
@ -807,48 +781,15 @@ public class BrokerService implements Service {
public URI uri;
public boolean ssl;
public String keyStore;
public String keyStorePassword;
public String keyStoreType;
public String trustStore;
public String trustStorePassword;
public String trustStoreType;
public boolean clientAuth;
public ConnectorInfo(int port) throws URISyntaxException {
this(port, false);
}
public ConnectorInfo(int port, boolean ssl) throws URISyntaxException {
this(port, ssl, false);
}
public ConnectorInfo(int port, boolean ssl, boolean clientAuth) throws URISyntaxException {
this.ssl = ssl;
if (port == 0) {
port = getPseudoRandomPort();
}
String baseUri = "tcp://localhost:" + port + "?protocols=OPENWIRE,CORE";
if (ssl) {
baseUri = baseUri + "&" + TransportConstants.KEYSTORE_PATH_PROP_NAME + "=" + defaultKeyStore + "&"
+ TransportConstants.KEYSTORE_PASSWORD_PROP_NAME + "=" + defaultKeyStorePassword + "&"
+ TransportConstants.KEYSTORE_PROVIDER_PROP_NAME + "=" + defaultKeyStoreType;
if (clientAuth) {
baseUri = baseUri + "&" + TransportConstants.NEED_CLIENT_AUTH_PROP_NAME + "=true" + "&"
+ TransportConstants.TRUSTSTORE_PATH_PROP_NAME + "=" + defaultTrustStore + "&"
+ TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME + "=" + defaultTrustStorePassword + "&"
+ TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME + "=" + defaultTrustStoreType;
}
}
this.uri = new URI(baseUri);
public ConnectorInfo(URI bindAddress) throws URISyntaxException {
this(bindAddress, null);
}
//bindAddress must be Artemis compliant, except
//scheme
public ConnectorInfo(URI bindAddress) throws URISyntaxException {
public ConnectorInfo(URI bindAddress, SpringSslContext context) throws URISyntaxException {
Integer port = bindAddress.getPort();
String host = bindAddress.getHost();
@ -870,16 +811,16 @@ public class BrokerService implements Service {
host, port, bindAddress.getPath(), bindAddress.getQuery(), bindAddress.getFragment());
}
else {
String baseUri = "tcp://" + host + ":" + port + "?protocols=OPENWIRE,CORE&"
String baseUri = "tcp://" + host + ":" + port + "?"
+ TransportConstants.SSL_ENABLED_PROP_NAME + "=true&"
+ TransportConstants.KEYSTORE_PATH_PROP_NAME + "=" + defaultKeyStore + "&"
+ TransportConstants.KEYSTORE_PASSWORD_PROP_NAME + "=" + defaultKeyStorePassword + "&"
+ TransportConstants.KEYSTORE_PROVIDER_PROP_NAME + "=" + defaultKeyStoreType;
+ TransportConstants.KEYSTORE_PATH_PROP_NAME + "=" + (context == null ? defaultKeyStore : context.getKeyStore()) + "&"
+ TransportConstants.KEYSTORE_PASSWORD_PROP_NAME + "=" + (context == null ? defaultKeyStorePassword : context.getKeyStorePassword()) + "&"
+ TransportConstants.KEYSTORE_PROVIDER_PROP_NAME + "=" + (context == null ? defaultKeyStoreType : context.getKeyStoreType());
if (clientAuth) {
baseUri = baseUri + "&" + TransportConstants.NEED_CLIENT_AUTH_PROP_NAME + "=true" + "&"
+ TransportConstants.TRUSTSTORE_PATH_PROP_NAME + "=" + defaultTrustStore + "&"
+ TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME + "=" + defaultTrustStorePassword + "&"
+ TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME + "=" + defaultTrustStoreType;
+ TransportConstants.TRUSTSTORE_PATH_PROP_NAME + "=" + (context == null ? defaultTrustStore : context.getTrustStore()) + "&"
+ TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME + "=" + (context == null ? defaultTrustStorePassword : context.getTrustStorePassword()) + "&"
+ TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME + "=" + (context == null ? defaultTrustStoreType : context.getTrustStoreType());
}
uri = new URI(baseUri);
}

View File

@ -56,11 +56,4 @@ public class SslBrokerService extends BrokerService {
SecureRandom random) throws IOException, KeyManagementException {
return null;
}
//one way
public void setupSsl(String keystoreType, String password, String serverKeystore) {
this.SERVER_SIDE_KEYSTORE = serverKeystore;
this.KEYSTORE_PASSWORD = password;
this.storeType = keystoreType;
}
}

View File

@ -90,11 +90,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
commonSettings.setAutoCreateJmsQueues(true);
if (bservice.extraConnectors.size() == 0) {
serverConfig.addAcceptorConfiguration("home", "tcp://localhost:61616?protocols=OPENWIRE,CORE");
}
if (this.bservice.enableSsl()) {
//default
addServerAcceptor(serverConfig, new BrokerService.ConnectorInfo(61611, true));
serverConfig.addAcceptorConfiguration("home", "tcp://localhost:61616");
}
for (BrokerService.ConnectorInfo info : bservice.extraConnectors) {

View File

@ -78,6 +78,7 @@ public class TcpTransportFactory extends TransportFactory {
params.remove("broker.useJmx");
params.remove("marshal");
params.remove("create");
params.remove("asyncQueueDepth");
URI location2 = URISupport.createRemainingURI(location, params);
return super.doConnect(location2);
}

View File

@ -1,2 +0,0 @@
org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory

View File

@ -204,7 +204,7 @@ public class ActiveMQSslConnectionFactoryTest extends CombinationTestSupport {
SslBrokerService service = new SslBrokerService();
service.setPersistent(false);
service.setupSsl(KEYSTORE_TYPE, PASSWORD, SERVER_KEYSTORE);
service.addConnector(uri);
service.start();

View File

@ -16,15 +16,11 @@
*/
package org.apache.activemq;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.Connection;
@ -39,27 +35,27 @@ import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.vm.VMTransport;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @version
*/
public class JmsTempDestinationTest extends TestCase {
public class JmsTempDestinationTest {
private static final Logger LOG = LoggerFactory.getLogger(JmsTempDestinationTest.class);
private Connection connection;
private ActiveMQConnectionFactory factory;
protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
@Override
protected void setUp() throws Exception {
factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
@Before
public void setUp() throws Exception {
factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
factory.setAlwaysSyncSend(true);
connection = factory.createConnection();
connections.add(connection);
@ -68,8 +64,8 @@ public class JmsTempDestinationTest extends TestCase {
/**
* @see junit.framework.TestCase#tearDown()
*/
@Override
protected void tearDown() throws Exception {
@After
public void tearDown() throws Exception {
for (Iterator<Connection> iter = connections.iterator(); iter.hasNext(); ) {
Connection conn = iter.next();
try {
@ -86,6 +82,7 @@ public class JmsTempDestinationTest extends TestCase {
*
* @throws JMSException
*/
@Test
public void testTempDestOnlyConsumedByLocalConn() throws JMSException {
connection.start();
@ -103,22 +100,22 @@ public class JmsTempDestinationTest extends TestCase {
TemporaryQueue otherQueue = otherSession.createTemporaryQueue();
MessageConsumer consumer = otherSession.createConsumer(otherQueue);
Message msg = consumer.receive(3000);
assertNull(msg);
Assert.assertNull(msg);
// should throw InvalidDestinationException when consuming a temp
// destination from another connection
try {
consumer = otherSession.createConsumer(queue);
fail("Send should fail since temp destination should be used from another connection");
Assert.fail("Send should fail since temp destination should be used from another connection");
}
catch (InvalidDestinationException e) {
assertTrue("failed to throw an exception", true);
Assert.assertTrue("failed to throw an exception", true);
}
// should be able to consume temp destination from the same connection
consumer = tempSession.createConsumer(queue);
msg = consumer.receive(3000);
assertNotNull(msg);
Assert.assertNotNull(msg);
}
@ -128,6 +125,7 @@ public class JmsTempDestinationTest extends TestCase {
*
* @throws JMSException
*/
@Test
public void testTempQueueHoldsMessagesWithConsumers() throws JMSException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createTemporaryQueue();
@ -140,9 +138,9 @@ public class JmsTempDestinationTest extends TestCase {
producer.send(message);
Message message2 = consumer.receive(1000);
assertNotNull(message2);
assertTrue("Expected message to be a TextMessage", message2 instanceof TextMessage);
assertTrue("Expected message to be a '" + message.getText() + "'", ((TextMessage) message2).getText().equals(message.getText()));
Assert.assertNotNull(message2);
Assert.assertTrue("Expected message to be a TextMessage", message2 instanceof TextMessage);
Assert.assertTrue("Expected message to be a '" + message.getText() + "'", ((TextMessage) message2).getText().equals(message.getText()));
}
/**
@ -151,6 +149,7 @@ public class JmsTempDestinationTest extends TestCase {
*
* @throws JMSException
*/
@Test
public void testTempQueueHoldsMessagesWithoutConsumers() throws JMSException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -163,9 +162,9 @@ public class JmsTempDestinationTest extends TestCase {
connection.start();
MessageConsumer consumer = session.createConsumer(queue);
Message message2 = consumer.receive(3000);
assertNotNull(message2);
assertTrue("Expected message to be a TextMessage", message2 instanceof TextMessage);
assertTrue("Expected message to be a '" + message.getText() + "'", ((TextMessage) message2).getText().equals(message.getText()));
Assert.assertNotNull(message2);
Assert.assertTrue("Expected message to be a TextMessage", message2 instanceof TextMessage);
Assert.assertTrue("Expected message to be a '" + message.getText() + "'", ((TextMessage) message2).getText().equals(message.getText()));
}
@ -174,6 +173,7 @@ public class JmsTempDestinationTest extends TestCase {
*
* @throws JMSException
*/
@Test
public void testTmpQueueWorksUnderLoad() throws JMSException {
int count = 500;
int dataSize = 1024;
@ -197,9 +197,9 @@ public class JmsTempDestinationTest extends TestCase {
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < count; i++) {
Message message2 = consumer.receive(2000);
assertTrue(message2 != null);
assertEquals(i, message2.getIntProperty("c"));
assertTrue(message2.equals(list.get(i)));
Assert.assertTrue(message2 != null);
Assert.assertEquals(i, message2.getIntProperty("c"));
Assert.assertTrue(message2.equals(list.get(i)));
}
}
@ -211,18 +211,20 @@ public class JmsTempDestinationTest extends TestCase {
* @throws InterruptedException
* @throws URISyntaxException
*/
@Test
public void testPublishFailsForClosedConnection() throws Exception {
Connection tempConnection = factory.createConnection();
connections.add(tempConnection);
Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final TemporaryQueue queue = tempSession.createTemporaryQueue();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final TemporaryQueue queue = tempSession.createTemporaryQueue();
final ActiveMQConnection activeMQConnection = (ActiveMQConnection) connection;
assertTrue("creation advisory received in time with async dispatch", Wait.waitFor(new Wait.Condition() {
Assert.assertTrue("creation advisory received in time with async dispatch", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return activeMQConnection.activeTempDestinations.containsKey(queue);
@ -246,9 +248,10 @@ public class JmsTempDestinationTest extends TestCase {
try {
message = session.createTextMessage("Hello");
producer.send(message);
fail("Send should fail since temp destination should not exist anymore.");
Assert.fail("Send should fail since temp destination should not exist anymore.");
}
catch (JMSException e) {
e.printStackTrace();
}
}
@ -259,18 +262,23 @@ public class JmsTempDestinationTest extends TestCase {
* @throws JMSException
* @throws InterruptedException
*/
@Test
public void testPublishFailsForDestroyedTempDestination() throws Exception {
Connection tempConnection = factory.createConnection();
connections.add(tempConnection);
Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final TemporaryQueue queue = tempSession.createTemporaryQueue();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
//In artemis, if you send a message to a topic where the consumer isn't there yet,
//message will get lost. So the create temp queue request has to happen
//after the connection is started (advisory consumer registered).
Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final TemporaryQueue queue = tempSession.createTemporaryQueue();
final ActiveMQConnection activeMQConnection = (ActiveMQConnection) connection;
assertTrue("creation advisory received in time with async dispatch", Wait.waitFor(new Wait.Condition() {
Assert.assertTrue("creation advisory received in time with async dispatch", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return activeMQConnection.activeTempDestinations.containsKey(queue);
@ -293,10 +301,10 @@ public class JmsTempDestinationTest extends TestCase {
try {
message = session.createTextMessage("Hello");
producer.send(message);
fail("Send should fail since temp destination should not exist anymore.");
Assert.fail("Send should fail since temp destination should not exist anymore.");
}
catch (JMSException e) {
assertTrue("failed to throw an exception", true);
Assert.assertTrue("failed to throw an exception", true);
}
}
@ -305,6 +313,7 @@ public class JmsTempDestinationTest extends TestCase {
*
* @throws JMSException
*/
@Test
public void testDeleteDestinationWithSubscribersFails() throws JMSException {
Connection connection = factory.createConnection();
connections.add(connection);
@ -319,65 +328,14 @@ public class JmsTempDestinationTest extends TestCase {
// now closed.
try {
queue.delete();
fail("Should fail as Subscribers are active");
Assert.fail("Should fail as Subscribers are active");
}
catch (JMSException e) {
assertTrue("failed to throw an exception", true);
Assert.assertTrue("failed to throw an exception", true);
}
}
//removed. the original test is only for vm transport. tcp transport will block anyway.
public void testSlowConsumerDoesNotBlockFastTempUsers() throws Exception {
ActiveMQConnectionFactory advisoryConnFactory = new ActiveMQConnectionFactory("vm://localhost?asyncQueueDepth=20");
Connection connection = advisoryConnFactory.createConnection();
connections.add(connection);
connection.start();
final CountDownLatch done = new CountDownLatch(1);
final AtomicBoolean ok = new AtomicBoolean(true);
final AtomicBoolean first = new AtomicBoolean(true);
VMTransport t = ((ActiveMQConnection) connection).getTransport().narrow(VMTransport.class);
t.setTransportListener(new TransportListener() {
@Override
public void onCommand(Object command) {
// block first dispatch for a while so broker backs up, but other connection should be able to proceed
if (first.compareAndSet(true, false)) {
try {
ok.set(done.await(35, TimeUnit.SECONDS));
LOG.info("Done waiting: " + ok.get());
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void onException(IOException error) {
}
@Override
public void transportInterupted() {
}
@Override
public void transportResumed() {
}
});
connection = factory.createConnection();
connections.add(connection);
((ActiveMQConnection) connection).setWatchTopicAdvisories(false);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i < 2500; i++) {
TemporaryQueue queue = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(queue);
consumer.close();
queue.delete();
}
LOG.info("Done with work: " + ok.get());
done.countDown();
assertTrue("ok", ok.get());
}
}

View File

@ -31,6 +31,7 @@ import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTestSupport;
import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
@ -56,6 +57,7 @@ public class SoWriteTimeoutTest extends JmsTestSupport {
KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
adapter.setConcurrentStoreAndDispatchQueues(false);
broker.setPersistenceAdapter(adapter);
broker.addConnector("tcp://localhost:61616");
broker.addConnector(brokerTransportScheme + "://localhost:0?wireFormat.maxInactivityDuration=0&transport.soWriteTimeout=1000&transport.sleep=1000");
if ("nio".equals(brokerTransportScheme)) {
broker.addConnector("stomp+" + brokerTransportScheme + "://localhost:0?transport.soWriteTimeout=1000&transport.sleep=1000&socketBufferSize=" + receiveBufferSize + "&trace=true");
@ -73,7 +75,7 @@ public class SoWriteTimeoutTest extends JmsTestSupport {
messageTextPrefix = initMessagePrefix(8 * 1024);
sendMessages(dest, 500);
URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri());
URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(1).getConnectUri());
LOG.info("consuming using uri: " + tcpBrokerUri);
SocketProxy proxy = new SocketProxy();
@ -104,7 +106,7 @@ public class SoWriteTimeoutTest extends JmsTestSupport {
messageTextPrefix = initMessagePrefix(8 * 1024);
sendMessages(dest, 500);
URI stompBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(1).getConnectUri());
URI stompBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(2).getConnectUri());
LOG.info("consuming using uri: " + stompBrokerUri);
SocketProxy proxy = new SocketProxy();
@ -121,11 +123,12 @@ public class SoWriteTimeoutTest extends JmsTestSupport {
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + dest.getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
frame = "SUBSCRIBE\n" + "destination:jms.queue." + dest.getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
// ensure dispatch has started before pause
frame = stompConnection.receiveFrame();
System.out.println("frame: " + frame);
assertTrue(frame.startsWith("MESSAGE"));
proxy.pause();
@ -148,7 +151,7 @@ public class SoWriteTimeoutTest extends JmsTestSupport {
// verify connection is dead
try {
for (int i = 0; i < 200; i++) {
stompConnection.send("/queue/" + dest.getPhysicalName(), "ShouldBeDeadConnectionText" + i);
stompConnection.send("jms.queue." + dest.getPhysicalName(), "ShouldBeDeadConnectionText" + i);
}
fail("expected send to fail with timeout out connection");
}
@ -164,6 +167,7 @@ public class SoWriteTimeoutTest extends JmsTestSupport {
@Override
protected void setUp() throws Exception {
BrokerService.disableWrapper = true;
setAutoFail(true);
super.setUp();
}

View File

@ -26,6 +26,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
@ -138,34 +139,36 @@ public class FakePostOffice implements PostOffice {
}
@Override
public void route(ServerMessage message,
QueueCreator creator,
RoutingContext context,
boolean direct) throws Exception {
public RoutingStatus route(ServerMessage message,
QueueCreator creator,
RoutingContext context,
boolean direct) throws Exception {
return RoutingStatus.OK;
}
@Override
public void route(ServerMessage message, QueueCreator creator, Transaction tx, boolean direct) throws Exception {
public RoutingStatus route(ServerMessage message, QueueCreator creator, Transaction tx, boolean direct) throws Exception {
return RoutingStatus.OK;
}
@Override
public RoutingStatus route(ServerMessage message,
QueueCreator creator,
RoutingContext context,
boolean direct,
boolean rejectDuplicates) throws Exception {
return RoutingStatus.OK;
}
@Override
public void route(ServerMessage message,
QueueCreator creator,
RoutingContext context,
boolean direct,
boolean rejectDuplicates) throws Exception {
}
@Override
public void route(ServerMessage message,
public RoutingStatus route(ServerMessage message,
QueueCreator creator,
Transaction tx,
boolean direct,
boolean rejectDuplicates) throws Exception {
return RoutingStatus.OK;
}
@Override
@ -173,7 +176,7 @@ public class FakePostOffice implements PostOffice {
}
@Override
public void route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception {
public RoutingStatus route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception {
return RoutingStatus.OK;
}
}