ARTEMIS-488 Fix OpenWire Test (Temp Queue removal and others)

Temp Queue not deleted when connection is closed.
Enable Stomp in openwire test because some test uses it.
Remove unused code in opwnwire
Wrong XA error code returned when xid is missing
(ActiveMQXAConnectionFactory.testRollbackXaErrorCode)
regression in ActiveMQSslConnectionFactoryTest (SSL related)
This commit is contained in:
Howard Gao 2016-04-20 11:37:22 +08:00 committed by Clebert Suconic
parent 0a719e08ed
commit 3012447404
17 changed files with 220 additions and 260 deletions

View File

@ -71,7 +71,6 @@ import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
@ -151,8 +150,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private ConnectionState state;
private final Set<ActiveMQDestination> tempQueues = new ConcurrentHashSet<>();
/**
* Openwire doesn't sen transactions associated with any sessions.
* It will however send beingTX / endTX as it would be doing it with XA Transactions.
@ -289,7 +286,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
response.setCorrelationId(commandId);
dispatchSync(response);
}
}
}
catch (Exception e) {
@ -512,10 +508,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
return this.wireFormat;
}
public void registerTempQueue(ActiveMQDestination queue) {
tempQueues.add(queue);
}
private void shutdown(boolean fail) {
if (fail) {
transportConnection.forceClose();
@ -692,19 +684,17 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
SimpleString qName = OpenWireUtil.toCoreAddress(dest);
QueueBinding binding = (QueueBinding) server.getPostOffice().getBinding(qName);
if (binding == null) {
if (getState().getInfo() != null) {
if (dest.isTemporary()) {
internalSession.createQueue(qName, qName, null, dest.isTemporary(), false);
}
else {
ConnectionInfo connInfo = getState().getInfo();
CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE;
server.getSecurityStore().check(qName, checkType, this);
server.checkQueueCreationLimit(getUsername());
}
ConnectionInfo connInfo = getState().getInfo();
server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, dest.isTemporary());
}
server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, false);
if (dest.isTemporary()) {
registerTempQueue(dest);
}
}
}
@ -1407,7 +1397,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
if (transaction == null) {
throw new IllegalStateException("cannot find transactionInfo::" + txID + " xid=" + xid);
return null;
}
if (session != null && transaction.getProtocolData() != session) {

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import javax.jms.InvalidDestinationException;
import javax.jms.ResourceAllocationException;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
@ -24,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
@ -361,7 +363,11 @@ public class AMQSession implements SessionCallback {
connection.getTransportConnection().setAutoRead(false);
}
getCoreSession().send(coreMsg, false);
RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
}
if (runToUse != null) {
// if the timeout is >0, it will wait this much milliseconds

View File

@ -68,22 +68,22 @@ public interface PostOffice extends ActiveMQComponent {
Map<SimpleString, Binding> getAllBindings();
void route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception;
RoutingStatus route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception;
void route(ServerMessage message, QueueCreator queueCreator, Transaction tx, boolean direct) throws Exception;
RoutingStatus route(ServerMessage message, QueueCreator queueCreator, Transaction tx, boolean direct) throws Exception;
void route(ServerMessage message,
RoutingStatus route(ServerMessage message,
QueueCreator queueCreator,
Transaction tx,
boolean direct,
boolean rejectDuplicates) throws Exception;
void route(ServerMessage message,
RoutingStatus route(ServerMessage message,
QueueCreator queueCreator,
RoutingContext context,
boolean direct) throws Exception;
void route(ServerMessage message,
RoutingStatus route(ServerMessage message,
QueueCreator queueCreator,
RoutingContext context,
boolean direct,

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.postoffice;
/**
* Used to indicate the result of a server send
*/
public enum RoutingStatus {
OK,
NO_BINDINGS,
NO_BINDINGS_DLA,
DUPLICATED_ID;
}

View File

@ -55,6 +55,7 @@ import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueInfo;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@ -570,41 +571,42 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
@Override
public void route(final ServerMessage message, QueueCreator queueCreator, final boolean direct) throws Exception {
route(message, queueCreator, (Transaction) null, direct);
public RoutingStatus route(final ServerMessage message, QueueCreator queueCreator, final boolean direct) throws Exception {
return route(message, queueCreator, (Transaction) null, direct);
}
@Override
public void route(final ServerMessage message,
public RoutingStatus route(final ServerMessage message,
QueueCreator queueCreator,
final Transaction tx,
final boolean direct) throws Exception {
route(message, queueCreator, new RoutingContextImpl(tx), direct);
return route(message, queueCreator, new RoutingContextImpl(tx), direct);
}
@Override
public void route(final ServerMessage message,
public RoutingStatus route(final ServerMessage message,
final QueueCreator queueCreator,
final Transaction tx,
final boolean direct,
final boolean rejectDuplicates) throws Exception {
route(message, queueCreator, new RoutingContextImpl(tx), direct, rejectDuplicates);
return route(message, queueCreator, new RoutingContextImpl(tx), direct, rejectDuplicates);
}
@Override
public void route(final ServerMessage message,
public RoutingStatus route(final ServerMessage message,
final QueueCreator queueCreator,
final RoutingContext context,
final boolean direct) throws Exception {
route(message, queueCreator, context, direct, true);
return route(message, queueCreator, context, direct, true);
}
@Override
public void route(final ServerMessage message,
public RoutingStatus route(final ServerMessage message,
final QueueCreator queueCreator,
final RoutingContext context,
final boolean direct,
boolean rejectDuplicates) throws Exception {
RoutingStatus result = RoutingStatus.OK;
// Sanity check
if (message.getRefCount() > 0) {
throw new IllegalStateException("Message cannot be routed more than once");
@ -619,7 +621,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
applyExpiryDelay(message, address);
if (!checkDuplicateID(message, context, rejectDuplicates, startedTX)) {
return;
return RoutingStatus.DUPLICATED_ID;
}
if (message.hasInternalProperties()) {
@ -671,6 +673,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
if (dlaAddress == null) {
result = RoutingStatus.NO_BINDINGS;
ActiveMQServerLogger.LOGGER.noDLA(address);
}
else {
@ -679,9 +682,12 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
message.setAddress(dlaAddress);
route(message, null, context.getTransaction(), false);
result = RoutingStatus.NO_BINDINGS_DLA;
}
}
else {
result = RoutingStatus.NO_BINDINGS;
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("Message " + message + " is not going anywhere as it didn't have a binding on address:" + address);
}
@ -709,6 +715,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
if (startedTX.get()) {
context.getTransaction().commit();
}
return result;
}
// HORNETQ-1029

View File

@ -23,6 +23,7 @@ import java.util.Set;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@ -132,7 +133,9 @@ public interface ServerSession extends SecurityAuth {
void sendContinuations(int packetSize, long totalBodySize, byte[] body, boolean continues) throws Exception;
void send(ServerMessage message, boolean direct) throws Exception;
RoutingStatus send(ServerMessage message, boolean direct, boolean noAutoCreateQueue) throws Exception;
RoutingStatus send(ServerMessage message, boolean direct) throws Exception;
void sendLarge(MessageInternal msg) throws Exception;

View File

@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.security.CheckType;
@ -1213,7 +1214,13 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
@Override
public void send(final ServerMessage message, final boolean direct) throws Exception {
public RoutingStatus send(final ServerMessage message, final boolean direct) throws Exception {
return send(message, direct, false);
}
@Override
public RoutingStatus send(final ServerMessage message, final boolean direct, boolean noAutoCreateQueue) throws Exception {
RoutingStatus result = RoutingStatus.OK;
//large message may come from StompSession directly, in which
//case the id header already generated.
if (!message.isLargeMessage()) {
@ -1256,8 +1263,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
handleManagementMessage(message, direct);
}
else {
doSend(message, direct);
result = doSend(message, direct, noAutoCreateQueue);
}
return result;
}
@Override
@ -1281,7 +1289,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
}
doSend(currentLargeMessage, false);
doSend(currentLargeMessage, false, false);
currentLargeMessage = null;
}
@ -1479,7 +1487,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
if (replyTo != null) {
reply.setAddress(replyTo);
doSend(reply, direct);
doSend(reply, direct, false);
}
}
@ -1535,7 +1543,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
theTx.rollback();
}
protected void doSend(final ServerMessage msg, final boolean direct) throws Exception {
protected RoutingStatus doSend(final ServerMessage msg, final boolean direct, final boolean noAutoCreateQueue) throws Exception {
RoutingStatus result = RoutingStatus.OK;
// check the user has write access to this address.
try {
securityCheck(msg.getAddress(), CheckType.SEND, this);
@ -1554,7 +1563,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
try {
postOffice.route(msg, queueCreator, routingContext, direct);
if (noAutoCreateQueue) {
result = postOffice.route(msg, null, routingContext, direct);
}
else {
result = postOffice.route(msg, queueCreator, routingContext, direct);
}
Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddress());
@ -1569,6 +1583,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
finally {
routingContext.clear();
}
return result;
}
@Override

View File

@ -313,6 +313,19 @@
<artifactId>artemis-openwire-protocol</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-stomp-protocol</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-stomp-protocol</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.jboss.byteman</groupId>
<artifactId>byteman</artifactId>
@ -422,6 +435,11 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>${skipActiveMQ5Tests}</skipTests>
<!--
<additionalClasspathElements>
<additionalClasspathElement>src/main/resources/stomp</additionalClasspathElement>
</additionalClasspathElements>
-->
<includes>
<!-- included packages -->
<include>**/org/apache/activemq/*Test.java</include>

View File

@ -84,12 +84,6 @@ public class BrokerService implements Service {
public static final long DEFAULT_START_TIMEOUT = 600000L;
public static boolean disableWrapper = false;
public String SERVER_SIDE_KEYSTORE;
public String KEYSTORE_PASSWORD;
public String SERVER_SIDE_TRUSTSTORE;
public String TRUSTSTORE_PASSWORD;
public String storeType;
private SslContext sslContext;
private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
@ -151,7 +145,6 @@ public class BrokerService implements Service {
tmpfolder = new TemporaryFolder(targetTmp);
tmpfolder.create();
Exception e = new Exception();
e.fillInStackTrace();
startBroker(startAsync);
map.put(broker, e);
}
@ -261,10 +254,6 @@ public class BrokerService implements Service {
}
}
public boolean enableSsl() {
return this.SERVER_SIDE_KEYSTORE != null;
}
//below are methods called directly by tests
//we don't actually implement any of these for now,
//just to make test compile pass.
@ -500,23 +489,11 @@ public class BrokerService implements Service {
public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
this.transportConnectors = transportConnectors;
for (TransportConnector connector : transportConnectors) {
if (connector.getUri().getScheme().equals("ssl")) {
boolean added = this.extraConnectors.add(new ConnectorInfo(connector.getUri().getPort(), true));
if (added) {
System.out.println("added ssl connector " + connector);
if (sslContext instanceof SpringSslContext) {
this.extraConnectors.add(new ConnectorInfo(connector.getUri(), (SpringSslContext)sslContext));
}
else {
System.out.println("WARNing! failed to add ssl connector: " + connector);
}
}
else {
boolean added = this.extraConnectors.add(new ConnectorInfo(connector.getUri().getPort()));
if (added) {
System.out.println("added connector " + connector);
}
else {
System.out.println("WARNing! failed to add connector: " + connector);
}
this.extraConnectors.add(new ConnectorInfo(connector.getUri()));
}
}
}
@ -584,7 +561,12 @@ public class BrokerService implements Service {
connector = new FakeTransportConnector(bindAddress);
this.transportConnectors.add(connector);
if (sslContext instanceof SpringSslContext) {
this.extraConnectors.add(new ConnectorInfo(bindAddress, (SpringSslContext) sslContext));
}
else {
this.extraConnectors.add(new ConnectorInfo(bindAddress));
}
return connector;
}
@ -738,14 +720,6 @@ public class BrokerService implements Service {
public void setSslContext(SslContext sslContext) {
this.sslContext = sslContext;
if (sslContext instanceof SpringSslContext) {
SpringSslContext springContext = (SpringSslContext)sslContext;
this.SERVER_SIDE_KEYSTORE = springContext.getKeyStore();
this.KEYSTORE_PASSWORD = springContext.getKeyStorePassword();
this.SERVER_SIDE_TRUSTSTORE = springContext.getTrustStore();
this.TRUSTSTORE_PASSWORD = springContext.getTrustStorePassword();
this.storeType = springContext.getKeyStoreType();
}
}
public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
@ -807,48 +781,15 @@ public class BrokerService implements Service {
public URI uri;
public boolean ssl;
public String keyStore;
public String keyStorePassword;
public String keyStoreType;
public String trustStore;
public String trustStorePassword;
public String trustStoreType;
public boolean clientAuth;
public ConnectorInfo(int port) throws URISyntaxException {
this(port, false);
}
public ConnectorInfo(int port, boolean ssl) throws URISyntaxException {
this(port, ssl, false);
}
public ConnectorInfo(int port, boolean ssl, boolean clientAuth) throws URISyntaxException {
this.ssl = ssl;
if (port == 0) {
port = getPseudoRandomPort();
}
String baseUri = "tcp://localhost:" + port + "?protocols=OPENWIRE,CORE";
if (ssl) {
baseUri = baseUri + "&" + TransportConstants.KEYSTORE_PATH_PROP_NAME + "=" + defaultKeyStore + "&"
+ TransportConstants.KEYSTORE_PASSWORD_PROP_NAME + "=" + defaultKeyStorePassword + "&"
+ TransportConstants.KEYSTORE_PROVIDER_PROP_NAME + "=" + defaultKeyStoreType;
if (clientAuth) {
baseUri = baseUri + "&" + TransportConstants.NEED_CLIENT_AUTH_PROP_NAME + "=true" + "&"
+ TransportConstants.TRUSTSTORE_PATH_PROP_NAME + "=" + defaultTrustStore + "&"
+ TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME + "=" + defaultTrustStorePassword + "&"
+ TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME + "=" + defaultTrustStoreType;
}
}
this.uri = new URI(baseUri);
public ConnectorInfo(URI bindAddress) throws URISyntaxException {
this(bindAddress, null);
}
//bindAddress must be Artemis compliant, except
//scheme
public ConnectorInfo(URI bindAddress) throws URISyntaxException {
public ConnectorInfo(URI bindAddress, SpringSslContext context) throws URISyntaxException {
Integer port = bindAddress.getPort();
String host = bindAddress.getHost();
@ -870,16 +811,16 @@ public class BrokerService implements Service {
host, port, bindAddress.getPath(), bindAddress.getQuery(), bindAddress.getFragment());
}
else {
String baseUri = "tcp://" + host + ":" + port + "?protocols=OPENWIRE,CORE&"
String baseUri = "tcp://" + host + ":" + port + "?"
+ TransportConstants.SSL_ENABLED_PROP_NAME + "=true&"
+ TransportConstants.KEYSTORE_PATH_PROP_NAME + "=" + defaultKeyStore + "&"
+ TransportConstants.KEYSTORE_PASSWORD_PROP_NAME + "=" + defaultKeyStorePassword + "&"
+ TransportConstants.KEYSTORE_PROVIDER_PROP_NAME + "=" + defaultKeyStoreType;
+ TransportConstants.KEYSTORE_PATH_PROP_NAME + "=" + (context == null ? defaultKeyStore : context.getKeyStore()) + "&"
+ TransportConstants.KEYSTORE_PASSWORD_PROP_NAME + "=" + (context == null ? defaultKeyStorePassword : context.getKeyStorePassword()) + "&"
+ TransportConstants.KEYSTORE_PROVIDER_PROP_NAME + "=" + (context == null ? defaultKeyStoreType : context.getKeyStoreType());
if (clientAuth) {
baseUri = baseUri + "&" + TransportConstants.NEED_CLIENT_AUTH_PROP_NAME + "=true" + "&"
+ TransportConstants.TRUSTSTORE_PATH_PROP_NAME + "=" + defaultTrustStore + "&"
+ TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME + "=" + defaultTrustStorePassword + "&"
+ TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME + "=" + defaultTrustStoreType;
+ TransportConstants.TRUSTSTORE_PATH_PROP_NAME + "=" + (context == null ? defaultTrustStore : context.getTrustStore()) + "&"
+ TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME + "=" + (context == null ? defaultTrustStorePassword : context.getTrustStorePassword()) + "&"
+ TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME + "=" + (context == null ? defaultTrustStoreType : context.getTrustStoreType());
}
uri = new URI(baseUri);
}

View File

@ -56,11 +56,4 @@ public class SslBrokerService extends BrokerService {
SecureRandom random) throws IOException, KeyManagementException {
return null;
}
//one way
public void setupSsl(String keystoreType, String password, String serverKeystore) {
this.SERVER_SIDE_KEYSTORE = serverKeystore;
this.KEYSTORE_PASSWORD = password;
this.storeType = keystoreType;
}
}

View File

@ -90,11 +90,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
commonSettings.setAutoCreateJmsQueues(true);
if (bservice.extraConnectors.size() == 0) {
serverConfig.addAcceptorConfiguration("home", "tcp://localhost:61616?protocols=OPENWIRE,CORE");
}
if (this.bservice.enableSsl()) {
//default
addServerAcceptor(serverConfig, new BrokerService.ConnectorInfo(61611, true));
serverConfig.addAcceptorConfiguration("home", "tcp://localhost:61616");
}
for (BrokerService.ConnectorInfo info : bservice.extraConnectors) {

View File

@ -78,6 +78,7 @@ public class TcpTransportFactory extends TransportFactory {
params.remove("broker.useJmx");
params.remove("marshal");
params.remove("create");
params.remove("asyncQueueDepth");
URI location2 = URISupport.createRemainingURI(location, params);
return super.doConnect(location2);
}

View File

@ -1,2 +0,0 @@
org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory

View File

@ -204,7 +204,7 @@ public class ActiveMQSslConnectionFactoryTest extends CombinationTestSupport {
SslBrokerService service = new SslBrokerService();
service.setPersistent(false);
service.setupSsl(KEYSTORE_TYPE, PASSWORD, SERVER_KEYSTORE);
service.addConnector(uri);
service.start();

View File

@ -16,15 +16,11 @@
*/
package org.apache.activemq;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.Connection;
@ -39,27 +35,27 @@ import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.vm.VMTransport;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @version
*/
public class JmsTempDestinationTest extends TestCase {
public class JmsTempDestinationTest {
private static final Logger LOG = LoggerFactory.getLogger(JmsTempDestinationTest.class);
private Connection connection;
private ActiveMQConnectionFactory factory;
protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
@Override
protected void setUp() throws Exception {
factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
@Before
public void setUp() throws Exception {
factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
factory.setAlwaysSyncSend(true);
connection = factory.createConnection();
connections.add(connection);
@ -68,8 +64,8 @@ public class JmsTempDestinationTest extends TestCase {
/**
* @see junit.framework.TestCase#tearDown()
*/
@Override
protected void tearDown() throws Exception {
@After
public void tearDown() throws Exception {
for (Iterator<Connection> iter = connections.iterator(); iter.hasNext(); ) {
Connection conn = iter.next();
try {
@ -86,6 +82,7 @@ public class JmsTempDestinationTest extends TestCase {
*
* @throws JMSException
*/
@Test
public void testTempDestOnlyConsumedByLocalConn() throws JMSException {
connection.start();
@ -103,22 +100,22 @@ public class JmsTempDestinationTest extends TestCase {
TemporaryQueue otherQueue = otherSession.createTemporaryQueue();
MessageConsumer consumer = otherSession.createConsumer(otherQueue);
Message msg = consumer.receive(3000);
assertNull(msg);
Assert.assertNull(msg);
// should throw InvalidDestinationException when consuming a temp
// destination from another connection
try {
consumer = otherSession.createConsumer(queue);
fail("Send should fail since temp destination should be used from another connection");
Assert.fail("Send should fail since temp destination should be used from another connection");
}
catch (InvalidDestinationException e) {
assertTrue("failed to throw an exception", true);
Assert.assertTrue("failed to throw an exception", true);
}
// should be able to consume temp destination from the same connection
consumer = tempSession.createConsumer(queue);
msg = consumer.receive(3000);
assertNotNull(msg);
Assert.assertNotNull(msg);
}
@ -128,6 +125,7 @@ public class JmsTempDestinationTest extends TestCase {
*
* @throws JMSException
*/
@Test
public void testTempQueueHoldsMessagesWithConsumers() throws JMSException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createTemporaryQueue();
@ -140,9 +138,9 @@ public class JmsTempDestinationTest extends TestCase {
producer.send(message);
Message message2 = consumer.receive(1000);
assertNotNull(message2);
assertTrue("Expected message to be a TextMessage", message2 instanceof TextMessage);
assertTrue("Expected message to be a '" + message.getText() + "'", ((TextMessage) message2).getText().equals(message.getText()));
Assert.assertNotNull(message2);
Assert.assertTrue("Expected message to be a TextMessage", message2 instanceof TextMessage);
Assert.assertTrue("Expected message to be a '" + message.getText() + "'", ((TextMessage) message2).getText().equals(message.getText()));
}
/**
@ -151,6 +149,7 @@ public class JmsTempDestinationTest extends TestCase {
*
* @throws JMSException
*/
@Test
public void testTempQueueHoldsMessagesWithoutConsumers() throws JMSException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -163,9 +162,9 @@ public class JmsTempDestinationTest extends TestCase {
connection.start();
MessageConsumer consumer = session.createConsumer(queue);
Message message2 = consumer.receive(3000);
assertNotNull(message2);
assertTrue("Expected message to be a TextMessage", message2 instanceof TextMessage);
assertTrue("Expected message to be a '" + message.getText() + "'", ((TextMessage) message2).getText().equals(message.getText()));
Assert.assertNotNull(message2);
Assert.assertTrue("Expected message to be a TextMessage", message2 instanceof TextMessage);
Assert.assertTrue("Expected message to be a '" + message.getText() + "'", ((TextMessage) message2).getText().equals(message.getText()));
}
@ -174,6 +173,7 @@ public class JmsTempDestinationTest extends TestCase {
*
* @throws JMSException
*/
@Test
public void testTmpQueueWorksUnderLoad() throws JMSException {
int count = 500;
int dataSize = 1024;
@ -197,9 +197,9 @@ public class JmsTempDestinationTest extends TestCase {
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < count; i++) {
Message message2 = consumer.receive(2000);
assertTrue(message2 != null);
assertEquals(i, message2.getIntProperty("c"));
assertTrue(message2.equals(list.get(i)));
Assert.assertTrue(message2 != null);
Assert.assertEquals(i, message2.getIntProperty("c"));
Assert.assertTrue(message2.equals(list.get(i)));
}
}
@ -211,18 +211,20 @@ public class JmsTempDestinationTest extends TestCase {
* @throws InterruptedException
* @throws URISyntaxException
*/
@Test
public void testPublishFailsForClosedConnection() throws Exception {
Connection tempConnection = factory.createConnection();
connections.add(tempConnection);
Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final TemporaryQueue queue = tempSession.createTemporaryQueue();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final TemporaryQueue queue = tempSession.createTemporaryQueue();
final ActiveMQConnection activeMQConnection = (ActiveMQConnection) connection;
assertTrue("creation advisory received in time with async dispatch", Wait.waitFor(new Wait.Condition() {
Assert.assertTrue("creation advisory received in time with async dispatch", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return activeMQConnection.activeTempDestinations.containsKey(queue);
@ -246,9 +248,10 @@ public class JmsTempDestinationTest extends TestCase {
try {
message = session.createTextMessage("Hello");
producer.send(message);
fail("Send should fail since temp destination should not exist anymore.");
Assert.fail("Send should fail since temp destination should not exist anymore.");
}
catch (JMSException e) {
e.printStackTrace();
}
}
@ -259,18 +262,23 @@ public class JmsTempDestinationTest extends TestCase {
* @throws JMSException
* @throws InterruptedException
*/
@Test
public void testPublishFailsForDestroyedTempDestination() throws Exception {
Connection tempConnection = factory.createConnection();
connections.add(tempConnection);
Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final TemporaryQueue queue = tempSession.createTemporaryQueue();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
//In artemis, if you send a message to a topic where the consumer isn't there yet,
//message will get lost. So the create temp queue request has to happen
//after the connection is started (advisory consumer registered).
Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final TemporaryQueue queue = tempSession.createTemporaryQueue();
final ActiveMQConnection activeMQConnection = (ActiveMQConnection) connection;
assertTrue("creation advisory received in time with async dispatch", Wait.waitFor(new Wait.Condition() {
Assert.assertTrue("creation advisory received in time with async dispatch", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return activeMQConnection.activeTempDestinations.containsKey(queue);
@ -293,10 +301,10 @@ public class JmsTempDestinationTest extends TestCase {
try {
message = session.createTextMessage("Hello");
producer.send(message);
fail("Send should fail since temp destination should not exist anymore.");
Assert.fail("Send should fail since temp destination should not exist anymore.");
}
catch (JMSException e) {
assertTrue("failed to throw an exception", true);
Assert.assertTrue("failed to throw an exception", true);
}
}
@ -305,6 +313,7 @@ public class JmsTempDestinationTest extends TestCase {
*
* @throws JMSException
*/
@Test
public void testDeleteDestinationWithSubscribersFails() throws JMSException {
Connection connection = factory.createConnection();
connections.add(connection);
@ -319,65 +328,14 @@ public class JmsTempDestinationTest extends TestCase {
// now closed.
try {
queue.delete();
fail("Should fail as Subscribers are active");
Assert.fail("Should fail as Subscribers are active");
}
catch (JMSException e) {
assertTrue("failed to throw an exception", true);
Assert.assertTrue("failed to throw an exception", true);
}
}
//removed. the original test is only for vm transport. tcp transport will block anyway.
public void testSlowConsumerDoesNotBlockFastTempUsers() throws Exception {
ActiveMQConnectionFactory advisoryConnFactory = new ActiveMQConnectionFactory("vm://localhost?asyncQueueDepth=20");
Connection connection = advisoryConnFactory.createConnection();
connections.add(connection);
connection.start();
final CountDownLatch done = new CountDownLatch(1);
final AtomicBoolean ok = new AtomicBoolean(true);
final AtomicBoolean first = new AtomicBoolean(true);
VMTransport t = ((ActiveMQConnection) connection).getTransport().narrow(VMTransport.class);
t.setTransportListener(new TransportListener() {
@Override
public void onCommand(Object command) {
// block first dispatch for a while so broker backs up, but other connection should be able to proceed
if (first.compareAndSet(true, false)) {
try {
ok.set(done.await(35, TimeUnit.SECONDS));
LOG.info("Done waiting: " + ok.get());
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void onException(IOException error) {
}
@Override
public void transportInterupted() {
}
@Override
public void transportResumed() {
}
});
connection = factory.createConnection();
connections.add(connection);
((ActiveMQConnection) connection).setWatchTopicAdvisories(false);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i < 2500; i++) {
TemporaryQueue queue = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(queue);
consumer.close();
queue.delete();
}
LOG.info("Done with work: " + ok.get());
done.countDown();
assertTrue("ok", ok.get());
}
}

View File

@ -31,6 +31,7 @@ import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTestSupport;
import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
@ -56,6 +57,7 @@ public class SoWriteTimeoutTest extends JmsTestSupport {
KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
adapter.setConcurrentStoreAndDispatchQueues(false);
broker.setPersistenceAdapter(adapter);
broker.addConnector("tcp://localhost:61616");
broker.addConnector(brokerTransportScheme + "://localhost:0?wireFormat.maxInactivityDuration=0&transport.soWriteTimeout=1000&transport.sleep=1000");
if ("nio".equals(brokerTransportScheme)) {
broker.addConnector("stomp+" + brokerTransportScheme + "://localhost:0?transport.soWriteTimeout=1000&transport.sleep=1000&socketBufferSize=" + receiveBufferSize + "&trace=true");
@ -73,7 +75,7 @@ public class SoWriteTimeoutTest extends JmsTestSupport {
messageTextPrefix = initMessagePrefix(8 * 1024);
sendMessages(dest, 500);
URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri());
URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(1).getConnectUri());
LOG.info("consuming using uri: " + tcpBrokerUri);
SocketProxy proxy = new SocketProxy();
@ -104,7 +106,7 @@ public class SoWriteTimeoutTest extends JmsTestSupport {
messageTextPrefix = initMessagePrefix(8 * 1024);
sendMessages(dest, 500);
URI stompBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(1).getConnectUri());
URI stompBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(2).getConnectUri());
LOG.info("consuming using uri: " + stompBrokerUri);
SocketProxy proxy = new SocketProxy();
@ -121,11 +123,12 @@ public class SoWriteTimeoutTest extends JmsTestSupport {
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + dest.getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
frame = "SUBSCRIBE\n" + "destination:jms.queue." + dest.getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
// ensure dispatch has started before pause
frame = stompConnection.receiveFrame();
System.out.println("frame: " + frame);
assertTrue(frame.startsWith("MESSAGE"));
proxy.pause();
@ -148,7 +151,7 @@ public class SoWriteTimeoutTest extends JmsTestSupport {
// verify connection is dead
try {
for (int i = 0; i < 200; i++) {
stompConnection.send("/queue/" + dest.getPhysicalName(), "ShouldBeDeadConnectionText" + i);
stompConnection.send("jms.queue." + dest.getPhysicalName(), "ShouldBeDeadConnectionText" + i);
}
fail("expected send to fail with timeout out connection");
}
@ -164,6 +167,7 @@ public class SoWriteTimeoutTest extends JmsTestSupport {
@Override
protected void setUp() throws Exception {
BrokerService.disableWrapper = true;
setAutoFail(true);
super.setUp();
}

View File

@ -26,6 +26,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
@ -138,34 +139,36 @@ public class FakePostOffice implements PostOffice {
}
@Override
public void route(ServerMessage message,
public RoutingStatus route(ServerMessage message,
QueueCreator creator,
RoutingContext context,
boolean direct) throws Exception {
return RoutingStatus.OK;
}
@Override
public void route(ServerMessage message, QueueCreator creator, Transaction tx, boolean direct) throws Exception {
public RoutingStatus route(ServerMessage message, QueueCreator creator, Transaction tx, boolean direct) throws Exception {
return RoutingStatus.OK;
}
@Override
public void route(ServerMessage message,
public RoutingStatus route(ServerMessage message,
QueueCreator creator,
RoutingContext context,
boolean direct,
boolean rejectDuplicates) throws Exception {
return RoutingStatus.OK;
}
@Override
public void route(ServerMessage message,
public RoutingStatus route(ServerMessage message,
QueueCreator creator,
Transaction tx,
boolean direct,
boolean rejectDuplicates) throws Exception {
return RoutingStatus.OK;
}
@Override
@ -173,7 +176,7 @@ public class FakePostOffice implements PostOffice {
}
@Override
public void route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception {
public RoutingStatus route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception {
return RoutingStatus.OK;
}
}