[AMQ-6603] ensure failover does not track consumer creation that fails with an exception, fix and test. Thanks for the test Tadayoshi Sato

(cherry picked from commit 8641928553)
This commit is contained in:
gtully 2017-02-28 17:17:18 +00:00 committed by Timothy Bish
parent 6d848c440c
commit 5a52bf2a51
3 changed files with 246 additions and 0 deletions

View File

@ -108,6 +108,21 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
}
}
private final class ExceptionResponseCheckAction implements ResponseHandler {
private final ConsumerInfo info;
public ExceptionResponseCheckAction(ConsumerInfo info) {
this.info = info;
}
@Override
public void onResponse(Command response) {
if (ExceptionResponse.DATA_STRUCTURE_TYPE == response.getDataStructureType()) {
processRemoveConsumer(info.getConsumerId(), 0l);
}
}
}
private class PrepareReadonlyTransactionAction extends RemoveTransactionAction {
public PrepareReadonlyTransactionAction(TransactionInfo info) {
super(info);
@ -415,6 +430,9 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
SessionState ss = cs.getSessionState(sessionId);
if (ss != null) {
ss.addConsumer(info);
if (info.isResponseRequired()) {
return new Tracked(new ExceptionResponseCheckAction(info));
}
}
}
}

View File

@ -104,6 +104,11 @@
<artifactId>log4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-jaas</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>

View File

@ -0,0 +1,223 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
import com.google.common.base.Strings;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.filter.DestinationMap;
import org.apache.activemq.jaas.GroupPrincipal;
import org.apache.activemq.security.AuthenticationUser;
import org.apache.activemq.security.AuthorizationPlugin;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.security.SimpleAuthorizationMap;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class PooledConsumerTest {
private static final Logger LOGGER = LoggerFactory.getLogger(PooledConsumerTest.class);
public static final String USERNAME = "test";
public static final String PASSWORD = "test";
private static final ActiveMQQueue QUEUE = new ActiveMQQueue("TEST");
BrokerService brokerService;
class PooledConsumer implements MessageListener {
private ConnectionFactory factory;
private Connection connection;
public boolean done = false;
public PooledConsumer(String url) throws JMSException {
org.apache.activemq.pool.PooledConnectionFactory factory = new org.apache.activemq.pool.PooledConnectionFactory(url);
factory.setMaxConnections(5);
factory.setIdleTimeout(0);
this.factory = factory;
init();
}
private void init() throws JMSException {
if (connection != null) {
close();
}
connection = factory.createConnection(USERNAME, PASSWORD);
connection.start();
}
public void listen() {
Session session = null;
MessageConsumer consumer = null;
boolean success = true;
while (!done) {
try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(QUEUE);
onMessage(consumer.receive());
success = true;
} catch (JMSException e) {
LOGGER.info(e.getMessage());
success = false;
} finally {
try {
if (consumer != null) consumer.close();
if (session != null) session.close();
if (!success) init();
} catch (JMSException ignore) {
ignore.printStackTrace();
}
}
sleep(50);
}
}
private void sleep(long milliseconds) {
try {
TimeUnit.MILLISECONDS.sleep(milliseconds);
} catch (InterruptedException e) {
}
}
@Override
public void onMessage(Message message) {
if (message != null) {
TextMessage textMessage = (TextMessage) message;
try {
String response = textMessage.getText();
LOGGER.info(Strings.repeat("=", 50));
LOGGER.info("Received: '{}'", response);
LOGGER.info(Strings.repeat("=", 50));
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
}
public void close() {
try {
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
}
}
public void done() {
done = true;
close();
}
}
public void startBroker(String group, String trasport) throws Exception {
brokerService = new BrokerService();
brokerService.addConnector(trasport);
brokerService.setPersistent(false);
brokerService.setUseJmx(false);
brokerService.setAdvisorySupport(false);
brokerService.setDestinations(new ActiveMQDestination[]{QUEUE});
List<AuthenticationUser> users = new ArrayList<>();
users.add(new AuthenticationUser("test", "test", group));
SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users);
AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin();
SimpleAuthorizationMap simpleAuthorizationMap = new SimpleAuthorizationMap();
DestinationMap readAcls = new DestinationMap();
GroupPrincipal USERS = new GroupPrincipal("users");
readAcls.put(QUEUE, USERS);
simpleAuthorizationMap.setReadACLs(readAcls);
authorizationPlugin.setMap(simpleAuthorizationMap);
BrokerPlugin[] plugins = new BrokerPlugin[]{authenticationPlugin, authorizationPlugin};
brokerService.setPlugins(plugins);
brokerService.start();
}
@After
public void stopBroker() throws Exception {
if (brokerService != null) {
brokerService.stop();
brokerService.waitUntilStopped();
}
}
@Test
public void testFailedConsumerNotRetainedByFailover() throws Exception {
startBroker("test", "tcp://0.0.0.0:0");
String url = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString();
final PooledConsumer consumer = new PooledConsumer("failover:(" + brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString() + ")?jms.watchTopicAdvisories=false");
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Runnable() {
@Override
public void run() {
consumer.listen();
}
});
assertTrue("5 connectons - pool fils up", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 5 == brokerService.getTransportConnectorByScheme("tcp").getConnections().size();
}
}));
stopBroker();
// with perms
startBroker("users", url);
assertTrue("5 reconnections from the pool", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 5 == brokerService.getTransportConnectorByScheme("tcp").getConnections().size();
}
}));
assertEquals("one consumer", 1, brokerService.getRegionBroker().getDestinationMap().get(QUEUE).getConsumers().size());
consumer.done();
executorService.shutdownNow();
}
}