ARTEMIS-949 Prevent Openwire from closing consumer twice

This commit is contained in:
Howard Gao 2017-02-08 15:57:43 +08:00
parent aac21ce165
commit 1a3fdd0916
3 changed files with 132 additions and 1 deletions

View File

@ -1401,6 +1401,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
@Override @Override
public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception { public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
if (destroyed) {
return null;
}
SessionId sessionId = id.getParentId(); SessionId sessionId = id.getParentId();
SessionState ss = state.getSessionState(sessionId); SessionState ss = state.getSessionState(sessionId);
if (ss == null) { if (ss == null) {

View File

@ -20,6 +20,7 @@ import java.io.File;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -1453,6 +1454,10 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
setupLiveServer(node, fileStorage, false, netty, isLive); setupLiveServer(node, fileStorage, false, netty, isLive);
} }
protected boolean isResolveProtocols() {
return false;
}
protected void setupLiveServer(final int node, protected void setupLiveServer(final int node,
final boolean fileStorage, final boolean fileStorage,
final boolean sharedStorage, final boolean sharedStorage,
@ -1472,7 +1477,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
haPolicyConfiguration = new ReplicatedPolicyConfiguration(); haPolicyConfiguration = new ReplicatedPolicyConfiguration();
} }
Configuration configuration = createBasicConfig(node).setJournalMaxIO_AIO(1000).setThreadPoolMaxSize(10).clearAcceptorConfigurations().addAcceptorConfiguration(createTransportConfiguration(netty, true, generateParams(node, netty))).setHAPolicyConfiguration(haPolicyConfiguration).setResolveProtocols(false); Configuration configuration = createBasicConfig(node).setJournalMaxIO_AIO(1000).setThreadPoolMaxSize(10).clearAcceptorConfigurations().addAcceptorConfiguration(createTransportConfiguration(netty, true, generateParams(node, netty))).setHAPolicyConfiguration(haPolicyConfiguration).setResolveProtocols(isResolveProtocols());
ActiveMQServer server; ActiveMQServer server;
@ -1889,4 +1894,13 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
protected boolean isFileStorage() { protected boolean isFileStorage() {
return true; return true;
} }
protected String getServerUri(int node) throws URISyntaxException {
ActiveMQServer server = servers[node];
if (server == null) {
throw new IllegalStateException("No server at node " + server);
}
int port = TransportConstants.DEFAULT_PORT + node;
return "tcp://localhost:" + port;
}
} }

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.openwire.cluster;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.ConsumerThread;
import org.junit.Test;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Session;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class MessageRedistributionTest extends ClusterTestBase {
@Test
public void testRemoteConsumerClose() throws Exception {
setupServer(0, true, true);
setupServer(1, true, true);
setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, true, 0, 1);
setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 0);
startServers(0, 1);
waitForTopology(servers[0], 2);
waitForTopology(servers[1], 2);
setupSessionFactory(0, true);
setupSessionFactory(1, true);
createQueue(0, "queues.testaddress", "queue0", null, false);
createQueue(1, "queues.testaddress", "queue0", null, false);
//alternately create consumers to the 2 nodes
//close the connection then close consumer quickly
//check server's consumer count
for (int i = 0; i < 50; i++) {
int target = i % 2;
int remote = (i + 1) % 2;
closeConsumerAndConnectionConcurrently(target, remote);
}
}
@Override
protected boolean isResolveProtocols() {
return true;
}
private void closeConsumerAndConnectionConcurrently(int targetNode, int remoteNode) throws Exception {
String targetUri = getServerUri(targetNode);
System.out.println("uri is " + targetUri);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(targetUri);
Connection conn = null;
CountDownLatch active = new CountDownLatch(1);
try {
conn = factory.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = ActiveMQDestination.createDestination("queue0", ActiveMQDestination.QUEUE_TYPE);
ConsumerThread consumer = new ConsumerThread(session, dest);
consumer.setMessageCount(0);
consumer.setFinished(active);
consumer.start();
assertTrue("consumer takes too long to finish!", active.await(5, TimeUnit.SECONDS));
} finally {
conn.close();
}
//check remote server's consumer count
ActiveMQServer remoteServer = servers[remoteNode];
Bindings bindings = remoteServer.getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress"));
Collection<Binding> bindingSet = bindings.getBindings();
RemoteQueueBinding remoteBinding = null;
for (Binding b : bindingSet) {
if (b instanceof RemoteQueueBinding) {
remoteBinding = (RemoteQueueBinding) b;
break;
}
}
assertNotNull(remoteBinding);
int count = remoteBinding.consumerCount();
assertTrue("consumer count should never be negative " + count, count >= 0);
}
}