ARTEMIS-538 - [Artemis Testsuite] JMSFailoverListenerTest#testManualFailover fails
ActiveMQConnection implements FailoverEventListener which executes client's FailoverEventListeners in separated threads in background. The old implementation does not guarantee ordering of their executions. The commit improves the implementation to guarantee it.
This commit is contained in:
parent
b5d252af2c
commit
fb9d09744d
|
@ -35,6 +35,8 @@ import javax.jms.TopicSession;
|
||||||
import java.lang.ref.WeakReference;
|
import java.lang.ref.WeakReference;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
|
@ -48,6 +50,7 @@ import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
|
||||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
|
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
|
||||||
import org.apache.activemq.artemis.core.version.Version;
|
import org.apache.activemq.artemis.core.version.Version;
|
||||||
import org.apache.activemq.artemis.reader.MessageUtil;
|
import org.apache.activemq.artemis.reader.MessageUtil;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
|
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
|
||||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||||
import org.apache.activemq.artemis.utils.VersionLoader;
|
import org.apache.activemq.artemis.utils.VersionLoader;
|
||||||
|
@ -113,6 +116,8 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
|
||||||
|
|
||||||
private final FailoverEventListener failoverListener = new FailoverEventListenerImpl(this);
|
private final FailoverEventListener failoverListener = new FailoverEventListenerImpl(this);
|
||||||
|
|
||||||
|
private final ExecutorService failoverListenerExecutor = Executors.newFixedThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
|
|
||||||
private final Version thisVersion;
|
private final Version thisVersion;
|
||||||
|
|
||||||
private final int dupsOKBatchSize;
|
private final int dupsOKBatchSize;
|
||||||
|
@ -353,6 +358,8 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
failoverListenerExecutor.shutdown();
|
||||||
|
|
||||||
closed = true;
|
closed = true;
|
||||||
}
|
}
|
||||||
catch (ActiveMQException e) {
|
catch (ActiveMQException e) {
|
||||||
|
@ -759,12 +766,12 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
|
||||||
|
|
||||||
if (failoverListener != null) {
|
if (failoverListener != null) {
|
||||||
|
|
||||||
new Thread(new Runnable() {
|
conn.failoverListenerExecutor.execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
failoverListener.failoverEvent(eventType);
|
failoverListener.failoverEvent(eventType);
|
||||||
}
|
}
|
||||||
}).start();
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (JMSException e) {
|
catch (JMSException e) {
|
||||||
|
|
|
@ -56,7 +56,6 @@ import javax.jms.Queue;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -312,7 +311,7 @@ public class JMSFailoverListenerTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
private static class MyFailoverListener implements FailoverEventListener {
|
private static class MyFailoverListener implements FailoverEventListener {
|
||||||
|
|
||||||
private List<FailoverEventType> eventTypeList = Collections.synchronizedList(new ArrayList<FailoverEventType>());
|
private List<FailoverEventType> eventTypeList = new ArrayList<>();
|
||||||
|
|
||||||
public FailoverEventType get(int element) {
|
public FailoverEventType get(int element) {
|
||||||
waitForElements(element + 1);
|
waitForElements(element + 1);
|
||||||
|
|
Loading…
Reference in New Issue