ARTEMIS-2029 Fixing wire checks after reconnects

This commit is contained in:
Clebert Suconic 2018-08-13 11:49:19 -04:00
parent 4dd116ee04
commit 87fdff51e1
5 changed files with 124 additions and 9 deletions

View File

@ -237,7 +237,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
public void connect(final int initialConnectAttempts,
final boolean failoverOnInitialConnection) throws ActiveMQException {
// Get the connection
getConnectionWithRetry(initialConnectAttempts);
getConnectionWithRetry(initialConnectAttempts, null);
if (connection == null) {
StringBuilder msg = new StringBuilder("Unable to connect to server using configuration ").append(currentConnectorConfig);
@ -743,7 +743,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
session.preHandleFailover(connection);
}
getConnectionWithRetry(reconnectAttempts);
getConnectionWithRetry(reconnectAttempts, oldConnection);
if (connection == null) {
if (!clientProtocolManager.isAlive())
@ -774,7 +774,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
}
}
private void getConnectionWithRetry(final int reconnectAttempts) {
private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) {
if (!clientProtocolManager.isAlive())
return;
if (logger.isTraceEnabled()) {
@ -795,6 +795,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
}
if (getConnection() != null) {
if (oldConnection != null && oldConnection instanceof CoreRemotingConnection) {
// transferring old connection version into the new connection
((CoreRemotingConnection)connection).setChannelVersion(((CoreRemotingConnection)oldConnection).getChannelVersion());
}
if (logger.isDebugEnabled()) {
logger.debug("Reconnection successful");
}

View File

@ -22,9 +22,9 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun;
if (serverArg[0].startsWith("HORNETQ")) {
cf = new ActiveMQConnectionFactory("tcp://localhost:61616?protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory&confirmationWindowSize=1048576&blockOnDurableSend=false");
cf = new ActiveMQConnectionFactory("tcp://localhost:61616?protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory&confirmationWindowSize=1048576&blockOnDurableSend=false&reconnectAttempts=-1&retryInterval=100");
} else {
cf = new ActiveMQConnectionFactory("tcp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false");
cf = new ActiveMQConnectionFactory("tcp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false&ha=true&reconnectAttempts=-1&retryInterval=100");
}

View File

@ -0,0 +1,41 @@
package clients
import org.apache.activemq.artemis.api.core.ActiveMQException
import org.apache.activemq.artemis.api.core.client.FailoverEventListener
import org.apache.activemq.artemis.api.core.client.FailoverEventType
import org.apache.activemq.artemis.jms.client.ActiveMQConnection
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Create a client connection factory
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit;
CountDownLatch latch = new CountDownLatch(1);
((ActiveMQConnection)connectionToFail).setFailoverListener(new FailoverEventListener() {
@Override
void failoverEvent(FailoverEventType eventType) {
latch.countDown();
}
})
((ActiveMQConnection)connectionToFail).getSessionFactory().getConnection().fail(new ActiveMQException("fail"));
GroovyRun.assertTrue(latch.await(10, TimeUnit.SECONDS));

View File

@ -26,7 +26,6 @@ String serverType = arg[0];
String clientType = arg[1];
String operation = arg[2];
try {
legacyOption = legacy;
} catch (Throwable e) {
@ -127,8 +126,60 @@ if (operation.equals("sendAckMessages") || operation.equals("sendTopic")) {
plain.setStringProperty("plain", "doce");
plain.setIntProperty("order", 15)
producer.send(plain);
session.commit();
session.close();
Session newSession = connection.createSession(true, Session.SESSION_TRANSACTED);
connectionToFail = connection;
if (clientType.equals("ARTEMIS-SNAPSHOT")) {
// this is validating a bug that could only be fixed in snapshot
GroovyRun.evaluate("clients/artemisFail.groovy", "serverArg", serverType);
}
MessageProducer newProducer = newSession.createProducer(destination);
for (int i = 0 ; i < 10; i++) {
String bodyText = "This is message " + i;
TextMessage textMessage = newSession.createTextMessage(bodyText);
int size = 5 + i % 10;
StringBuffer variableSize = new StringBuffer();
for (int s = 0; s < size; s++) {
variableSize.append(" " + i);
}
textMessage.setStringProperty("inMessageId", variableSize.toString());
newProducer.send(textMessage);
newSession.commit();
newSession.close();
newSession = connection.createSession(true, Session.SESSION_TRANSACTED);
newProducer = newSession.createProducer(destination);
if (i % 2 == 0) {
// failing half of the sessions for the snapshots
if (clientType.equals("ARTEMIS-SNAPSHOT")) {
// this is validating a bug that could only be fixed in snapshot
GroovyRun.evaluate("clients/artemisFail.groovy", "serverArg", serverType);
}
}
}
// even if topic, will send a few on queue
newProducer = newSession.createProducer(queue);
for (int i = 0; i < 7; i++) {
String bodyText = "This is message " + i;
TextMessage textMessage = newSession.createTextMessage(bodyText);
int size = 5 + i % 10;
StringBuffer variableSize = new StringBuffer();
for (int s = 0; s < size; s++) {
variableSize.append(" " + i);
}
textMessage.setStringProperty("inMessageId", variableSize.toString());
newProducer.send(textMessage);
newSession.commit();
}
newSession.commit();
newSession.close();
connection.close();
}
@ -194,7 +245,26 @@ if (operation.equals("receiveMessages") || operation.equals("receiveNonDurableSu
GroovyRun.assertNotNull(plain);
GroovyRun.assertEquals("doce", plain.getStringProperty("plain"));
for (int i = 0 ; i < 10; i++) {
TextMessage recMessage = consumer.receive(5000);
GroovyRun.assertNotNull(recMessage);
GroovyRun.assertEquals("This is message " + i, recMessage.getText());
}
session.commit();
consumer.close();
// force a few on the queue even if the test is for topics
consumer = session.createConsumer(queue);
for (int i = 0; i < 7; i++) {
TextMessage recMessage = consumer.receive(5000);
GroovyRun.assertNotNull(recMessage);
GroovyRun.assertEquals("This is message " + i, recMessage.getText());
}
connection.close();
}

View File

@ -32,6 +32,6 @@ for (Object o : queueControls) {
QueueControl c = (QueueControl) o;
GroovyRun.assertTrue(c.getPersistentSize() > 0);
GroovyRun.assertTrue(c.getDurablePersistentSize() > 0);
GroovyRun.assertEquals(16l, c.getMessageCount());
GroovyRun.assertEquals(16l, c.getDurableMessageCount());
GroovyRun.assertEquals(33l, c.getMessageCount());
GroovyRun.assertEquals(33l, c.getDurableMessageCount());
}