This closes #538

This commit is contained in:
Clebert Suconic 2016-05-25 09:33:14 -04:00
commit 4a11a631bc
2 changed files with 10 additions and 4 deletions

View File

@ -35,6 +35,8 @@ import javax.jms.TopicSession;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@ -48,6 +50,7 @@ import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.version.Version; import org.apache.activemq.artemis.core.version.Version;
import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.VersionLoader; import org.apache.activemq.artemis.utils.VersionLoader;
@ -113,6 +116,8 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
private final FailoverEventListener failoverListener = new FailoverEventListenerImpl(this); private final FailoverEventListener failoverListener = new FailoverEventListenerImpl(this);
private final ExecutorService failoverListenerExecutor = Executors.newFixedThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
private final Version thisVersion; private final Version thisVersion;
private final int dupsOKBatchSize; private final int dupsOKBatchSize;
@ -353,6 +358,8 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
} }
} }
failoverListenerExecutor.shutdown();
closed = true; closed = true;
} }
catch (ActiveMQException e) { catch (ActiveMQException e) {
@ -759,12 +766,12 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
if (failoverListener != null) { if (failoverListener != null) {
new Thread(new Runnable() { conn.failoverListenerExecutor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
failoverListener.failoverEvent(eventType); failoverListener.failoverEvent(eventType);
} }
}).start(); });
} }
} }
catch (JMSException e) { catch (JMSException e) {

View File

@ -56,7 +56,6 @@ import javax.jms.Queue;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -312,7 +311,7 @@ public class JMSFailoverListenerTest extends ActiveMQTestBase {
private static class MyFailoverListener implements FailoverEventListener { private static class MyFailoverListener implements FailoverEventListener {
private List<FailoverEventType> eventTypeList = Collections.synchronizedList(new ArrayList<FailoverEventType>()); private List<FailoverEventType> eventTypeList = new ArrayList<>();
public FailoverEventType get(int element) { public FailoverEventType get(int element) {
waitForElements(element + 1); waitForElements(element + 1);