This closes #998
This commit is contained in:
commit
0d2cd3b721
|
@ -1401,6 +1401,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
|
||||
@Override
|
||||
public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
|
||||
if (destroyed) {
|
||||
return null;
|
||||
}
|
||||
SessionId sessionId = id.getParentId();
|
||||
SessionState ss = state.getSessionState(sessionId);
|
||||
if (ss == null) {
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.io.File;
|
|||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
@ -1453,6 +1454,10 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
|
|||
setupLiveServer(node, fileStorage, false, netty, isLive);
|
||||
}
|
||||
|
||||
protected boolean isResolveProtocols() {
|
||||
return false;
|
||||
}
|
||||
|
||||
protected void setupLiveServer(final int node,
|
||||
final boolean fileStorage,
|
||||
final boolean sharedStorage,
|
||||
|
@ -1472,7 +1477,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
|
|||
haPolicyConfiguration = new ReplicatedPolicyConfiguration();
|
||||
}
|
||||
|
||||
Configuration configuration = createBasicConfig(node).setJournalMaxIO_AIO(1000).setThreadPoolMaxSize(10).clearAcceptorConfigurations().addAcceptorConfiguration(createTransportConfiguration(netty, true, generateParams(node, netty))).setHAPolicyConfiguration(haPolicyConfiguration).setResolveProtocols(false);
|
||||
Configuration configuration = createBasicConfig(node).setJournalMaxIO_AIO(1000).setThreadPoolMaxSize(10).clearAcceptorConfigurations().addAcceptorConfiguration(createTransportConfiguration(netty, true, generateParams(node, netty))).setHAPolicyConfiguration(haPolicyConfiguration).setResolveProtocols(isResolveProtocols());
|
||||
|
||||
ActiveMQServer server;
|
||||
|
||||
|
@ -1889,4 +1894,13 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
|
|||
protected boolean isFileStorage() {
|
||||
return true;
|
||||
}
|
||||
|
||||
protected String getServerUri(int node) throws URISyntaxException {
|
||||
ActiveMQServer server = servers[node];
|
||||
if (server == null) {
|
||||
throw new IllegalStateException("No server at node " + server);
|
||||
}
|
||||
int port = TransportConstants.DEFAULT_PORT + node;
|
||||
return "tcp://localhost:" + port;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,114 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.openwire.cluster;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.util.ConsumerThread;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.Session;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class MessageRedistributionTest extends ClusterTestBase {
|
||||
|
||||
@Test
|
||||
public void testRemoteConsumerClose() throws Exception {
|
||||
|
||||
setupServer(0, true, true);
|
||||
setupServer(1, true, true);
|
||||
|
||||
setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, true, 0, 1);
|
||||
setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 0);
|
||||
|
||||
startServers(0, 1);
|
||||
|
||||
waitForTopology(servers[0], 2);
|
||||
waitForTopology(servers[1], 2);
|
||||
|
||||
setupSessionFactory(0, true);
|
||||
setupSessionFactory(1, true);
|
||||
|
||||
createQueue(0, "queues.testaddress", "queue0", null, false);
|
||||
createQueue(1, "queues.testaddress", "queue0", null, false);
|
||||
|
||||
//alternately create consumers to the 2 nodes
|
||||
//close the connection then close consumer quickly
|
||||
//check server's consumer count
|
||||
for (int i = 0; i < 50; i++) {
|
||||
int target = i % 2;
|
||||
int remote = (i + 1) % 2;
|
||||
closeConsumerAndConnectionConcurrently(target, remote);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isResolveProtocols() {
|
||||
return true;
|
||||
}
|
||||
|
||||
private void closeConsumerAndConnectionConcurrently(int targetNode, int remoteNode) throws Exception {
|
||||
|
||||
String targetUri = getServerUri(targetNode);
|
||||
System.out.println("uri is " + targetUri);
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(targetUri);
|
||||
Connection conn = null;
|
||||
CountDownLatch active = new CountDownLatch(1);
|
||||
try {
|
||||
conn = factory.createConnection();
|
||||
conn.start();
|
||||
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination dest = ActiveMQDestination.createDestination("queue0", ActiveMQDestination.QUEUE_TYPE);
|
||||
ConsumerThread consumer = new ConsumerThread(session, dest);
|
||||
consumer.setMessageCount(0);
|
||||
consumer.setFinished(active);
|
||||
consumer.start();
|
||||
|
||||
assertTrue("consumer takes too long to finish!", active.await(5, TimeUnit.SECONDS));
|
||||
} finally {
|
||||
conn.close();
|
||||
}
|
||||
|
||||
//check remote server's consumer count
|
||||
ActiveMQServer remoteServer = servers[remoteNode];
|
||||
Bindings bindings = remoteServer.getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress"));
|
||||
Collection<Binding> bindingSet = bindings.getBindings();
|
||||
|
||||
RemoteQueueBinding remoteBinding = null;
|
||||
for (Binding b : bindingSet) {
|
||||
if (b instanceof RemoteQueueBinding) {
|
||||
remoteBinding = (RemoteQueueBinding) b;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assertNotNull(remoteBinding);
|
||||
int count = remoteBinding.consumerCount();
|
||||
assertTrue("consumer count should never be negative " + count, count >= 0);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue