This closes #2247
This commit is contained in:
commit
22b62b5b0c
|
@ -237,7 +237,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
||||||
public void connect(final int initialConnectAttempts,
|
public void connect(final int initialConnectAttempts,
|
||||||
final boolean failoverOnInitialConnection) throws ActiveMQException {
|
final boolean failoverOnInitialConnection) throws ActiveMQException {
|
||||||
// Get the connection
|
// Get the connection
|
||||||
getConnectionWithRetry(initialConnectAttempts);
|
getConnectionWithRetry(initialConnectAttempts, null);
|
||||||
|
|
||||||
if (connection == null) {
|
if (connection == null) {
|
||||||
StringBuilder msg = new StringBuilder("Unable to connect to server using configuration ").append(currentConnectorConfig);
|
StringBuilder msg = new StringBuilder("Unable to connect to server using configuration ").append(currentConnectorConfig);
|
||||||
|
@ -743,7 +743,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
||||||
session.preHandleFailover(connection);
|
session.preHandleFailover(connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
getConnectionWithRetry(reconnectAttempts);
|
getConnectionWithRetry(reconnectAttempts, oldConnection);
|
||||||
|
|
||||||
if (connection == null) {
|
if (connection == null) {
|
||||||
if (!clientProtocolManager.isAlive())
|
if (!clientProtocolManager.isAlive())
|
||||||
|
@ -774,7 +774,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void getConnectionWithRetry(final int reconnectAttempts) {
|
private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) {
|
||||||
if (!clientProtocolManager.isAlive())
|
if (!clientProtocolManager.isAlive())
|
||||||
return;
|
return;
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
|
@ -795,6 +795,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
||||||
}
|
}
|
||||||
|
|
||||||
if (getConnection() != null) {
|
if (getConnection() != null) {
|
||||||
|
if (oldConnection != null && oldConnection instanceof CoreRemotingConnection) {
|
||||||
|
// transferring old connection version into the new connection
|
||||||
|
((CoreRemotingConnection)connection).setChannelVersion(((CoreRemotingConnection)oldConnection).getChannelVersion());
|
||||||
|
}
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("Reconnection successful");
|
logger.debug("Reconnection successful");
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,9 +22,9 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
|
||||||
import org.apache.activemq.artemis.tests.compatibility.GroovyRun;
|
import org.apache.activemq.artemis.tests.compatibility.GroovyRun;
|
||||||
|
|
||||||
if (serverArg[0].startsWith("HORNETQ")) {
|
if (serverArg[0].startsWith("HORNETQ")) {
|
||||||
cf = new ActiveMQConnectionFactory("tcp://localhost:61616?protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory&confirmationWindowSize=1048576&blockOnDurableSend=false");
|
cf = new ActiveMQConnectionFactory("tcp://localhost:61616?protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory&confirmationWindowSize=1048576&blockOnDurableSend=false&reconnectAttempts=-1&retryInterval=100");
|
||||||
} else {
|
} else {
|
||||||
cf = new ActiveMQConnectionFactory("tcp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false");
|
cf = new ActiveMQConnectionFactory("tcp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false&ha=true&reconnectAttempts=-1&retryInterval=100");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
package clients
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQException
|
||||||
|
import org.apache.activemq.artemis.api.core.client.FailoverEventListener
|
||||||
|
import org.apache.activemq.artemis.api.core.client.FailoverEventType
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnection
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// Create a client connection factory
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
|
||||||
|
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
((ActiveMQConnection)connectionToFail).setFailoverListener(new FailoverEventListener() {
|
||||||
|
@Override
|
||||||
|
void failoverEvent(FailoverEventType eventType) {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
})
|
||||||
|
((ActiveMQConnection)connectionToFail).getSessionFactory().getConnection().fail(new ActiveMQException("fail"));
|
||||||
|
GroovyRun.assertTrue(latch.await(10, TimeUnit.SECONDS));
|
|
@ -26,7 +26,6 @@ String serverType = arg[0];
|
||||||
String clientType = arg[1];
|
String clientType = arg[1];
|
||||||
String operation = arg[2];
|
String operation = arg[2];
|
||||||
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
legacyOption = legacy;
|
legacyOption = legacy;
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
|
@ -127,8 +126,60 @@ if (operation.equals("sendAckMessages") || operation.equals("sendTopic")) {
|
||||||
plain.setStringProperty("plain", "doce");
|
plain.setStringProperty("plain", "doce");
|
||||||
plain.setIntProperty("order", 15)
|
plain.setIntProperty("order", 15)
|
||||||
producer.send(plain);
|
producer.send(plain);
|
||||||
|
|
||||||
session.commit();
|
session.commit();
|
||||||
|
session.close();
|
||||||
|
|
||||||
|
Session newSession = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
connectionToFail = connection;
|
||||||
|
if (clientType.equals("ARTEMIS-SNAPSHOT")) {
|
||||||
|
// this is validating a bug that could only be fixed in snapshot
|
||||||
|
GroovyRun.evaluate("clients/artemisFail.groovy", "serverArg", serverType);
|
||||||
|
}
|
||||||
|
MessageProducer newProducer = newSession.createProducer(destination);
|
||||||
|
for (int i = 0 ; i < 10; i++) {
|
||||||
|
String bodyText = "This is message " + i;
|
||||||
|
TextMessage textMessage = newSession.createTextMessage(bodyText);
|
||||||
|
int size = 5 + i % 10;
|
||||||
|
StringBuffer variableSize = new StringBuffer();
|
||||||
|
for (int s = 0; s < size; s++) {
|
||||||
|
variableSize.append(" " + i);
|
||||||
|
}
|
||||||
|
textMessage.setStringProperty("inMessageId", variableSize.toString());
|
||||||
|
newProducer.send(textMessage);
|
||||||
|
newSession.commit();
|
||||||
|
|
||||||
|
newSession.close();
|
||||||
|
newSession = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
newProducer = newSession.createProducer(destination);
|
||||||
|
if (i % 2 == 0) {
|
||||||
|
// failing half of the sessions for the snapshots
|
||||||
|
if (clientType.equals("ARTEMIS-SNAPSHOT")) {
|
||||||
|
// this is validating a bug that could only be fixed in snapshot
|
||||||
|
GroovyRun.evaluate("clients/artemisFail.groovy", "serverArg", serverType);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// even if topic, will send a few on queue
|
||||||
|
newProducer = newSession.createProducer(queue);
|
||||||
|
|
||||||
|
for (int i = 0; i < 7; i++) {
|
||||||
|
String bodyText = "This is message " + i;
|
||||||
|
TextMessage textMessage = newSession.createTextMessage(bodyText);
|
||||||
|
int size = 5 + i % 10;
|
||||||
|
StringBuffer variableSize = new StringBuffer();
|
||||||
|
for (int s = 0; s < size; s++) {
|
||||||
|
variableSize.append(" " + i);
|
||||||
|
}
|
||||||
|
textMessage.setStringProperty("inMessageId", variableSize.toString());
|
||||||
|
newProducer.send(textMessage);
|
||||||
|
newSession.commit();
|
||||||
|
}
|
||||||
|
|
||||||
|
newSession.commit();
|
||||||
|
newSession.close();
|
||||||
|
|
||||||
|
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
@ -194,7 +245,26 @@ if (operation.equals("receiveMessages") || operation.equals("receiveNonDurableSu
|
||||||
GroovyRun.assertNotNull(plain);
|
GroovyRun.assertNotNull(plain);
|
||||||
GroovyRun.assertEquals("doce", plain.getStringProperty("plain"));
|
GroovyRun.assertEquals("doce", plain.getStringProperty("plain"));
|
||||||
|
|
||||||
|
|
||||||
|
for (int i = 0 ; i < 10; i++) {
|
||||||
|
TextMessage recMessage = consumer.receive(5000);
|
||||||
|
GroovyRun.assertNotNull(recMessage);
|
||||||
|
GroovyRun.assertEquals("This is message " + i, recMessage.getText());
|
||||||
|
}
|
||||||
|
|
||||||
session.commit();
|
session.commit();
|
||||||
|
|
||||||
|
consumer.close();
|
||||||
|
|
||||||
|
// force a few on the queue even if the test is for topics
|
||||||
|
consumer = session.createConsumer(queue);
|
||||||
|
|
||||||
|
for (int i = 0; i < 7; i++) {
|
||||||
|
TextMessage recMessage = consumer.receive(5000);
|
||||||
|
GroovyRun.assertNotNull(recMessage);
|
||||||
|
GroovyRun.assertEquals("This is message " + i, recMessage.getText());
|
||||||
|
}
|
||||||
|
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,6 @@ for (Object o : queueControls) {
|
||||||
QueueControl c = (QueueControl) o;
|
QueueControl c = (QueueControl) o;
|
||||||
GroovyRun.assertTrue(c.getPersistentSize() > 0);
|
GroovyRun.assertTrue(c.getPersistentSize() > 0);
|
||||||
GroovyRun.assertTrue(c.getDurablePersistentSize() > 0);
|
GroovyRun.assertTrue(c.getDurablePersistentSize() > 0);
|
||||||
GroovyRun.assertEquals(16l, c.getMessageCount());
|
GroovyRun.assertEquals(33l, c.getMessageCount());
|
||||||
GroovyRun.assertEquals(16l, c.getDurableMessageCount());
|
GroovyRun.assertEquals(33l, c.getDurableMessageCount());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue